You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by shubham-pathak22 <gi...@git.apache.org> on 2015/12/31 07:42:42 UTC

[GitHub] incubator-apex-malhar pull request: MLHR-1961

GitHub user shubham-pathak22 opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154

    MLHR-1961

    enhanced existing csv parser to take in schema for validations

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shubham-pathak22/incubator-apex-malhar MLHR-1961

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/154.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #154
    
----
commit fba6612bb7647d4274eacb13ae90bd46069c5e66
Author: shubham <sh...@github.com>
Date:   2015-12-30T12:32:13Z

    enhanced existing csv parser to take in schema for validations

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705310
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Contents of the schema
    +   */
    +  private String schema;
    +  /**
    +   * Complete path where schema resides.
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private transient String schemaPath;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
           }
    -    } catch (IOException e) {
    -      DTThrowable.rethrow(e);
    -    }
    -  }
    -
    -  @Override
    -  public String processErorrTuple(String input)
    -  {
    -    return input;
    -  }
    -
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    +      errorTupleCount++;
    +      return;
         }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    +    String incomingString = new String(tuple);
    +    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) {
    +      if (err.isConnected()) {
    --- End diff --
    
    err.isConnected not required if action is just emit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48795754
  
    --- Diff: contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java ---
    @@ -20,169 +20,425 @@
     
     import java.util.Date;
     
    -import org.joda.time.DateTime;
    +import org.junit.After;
     import org.junit.Assert;
    -import org.junit.Rule;
    +import org.junit.Before;
     import org.junit.Test;
    -import org.junit.rules.TestWatcher;
    -import org.junit.runner.Description;
     
    +import com.datatorrent.lib.appdata.schemas.SchemaUtils;
     import com.datatorrent.lib.testbench.CollectorTestSink;
    -import com.datatorrent.lib.util.TestUtils;
     
     public class CsvPOJOParserTest
     {
     
    -  CsvParser operator;
    -  CollectorTestSink<Object> validDataSink;
    -  CollectorTestSink<String> invalidDataSink;
    +  private static final String filename = "schema.json";
    +  CollectorTestSink<Object> error = new CollectorTestSink<Object>();
    +  CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>();
    +  CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
    +  CsvParser parser = new CsvParser();
     
    -  @Rule
    -  public Watcher watcher = new Watcher();
    +  /*
    +  * adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,isActive,isOptimized,parentCampaign,weatherTargeted,valid
    +  * 1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes
    +  * Constraints are defined in schema.json
    +  */
     
    -  public class Watcher extends TestWatcher
    +  @Before
    +  public void setup()
    --- End diff --
    
    Why TestWatcher can't be used for same thing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48715636
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java ---
    @@ -0,0 +1,456 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.parser;
    +
    +import java.util.Map;
    +
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.cellprocessor.ift.DoubleCellProcessor;
    +import org.supercsv.cellprocessor.ift.LongCellProcessor;
    +import org.supercsv.util.CsvContext;
    +
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.contrib.parser.DelimitedSchema.FIELD_TYPE;
    +
    +/**
    + * Helper class with methods to generate CellProcessor objects. Cell processors
    + * are an integral part of reading and writing with Super CSV - they automate
    + * the data type conversions, and enforce constraints. They implement the chain
    + * of responsibility design pattern - each processor has a single, well-defined
    + * purpose and can be chained together with other processors to fully automate
    + * all of the required conversions and constraint validation for a single
    + * delimited record.
    + * 
    + */
    +public class CellProcessorBuilder
    +{
    +
    +  /**
    +   * Method to get cell processors for given field type and constraints
    +   * 
    +   * @param fieldType
    +   *          data type of the field
    +   * @param constraints
    +   *          a map of constraints
    +   * @return
    +   */
    +  public static CellProcessor getCellProcessor(FIELD_TYPE fieldType, Map<String, Object> constraints)
    +  {
    +    switch (fieldType) {
    +      case STRING:
    +        return getStringCellProcessor(constraints);
    +      case INTEGER:
    +        return getIntegerCellProcessor(constraints);
    +      case LONG:
    +        return getLongCellProcessor(constraints);
    +      case FLOAT:
    +      case DOUBLE:
    +        return getDoubleCellProcessor(constraints);
    +      case CHARACTER:
    +        return getCharCellProcessor(constraints);
    +      case BOOLEAN:
    +        return getBooleanCellProcessor(constraints);
    +      case DATE:
    +        return getDateCellProcessor(constraints);
    +      default:
    +        return null;
    +    }
    +  }
    +
    +  /**
    +   * Method to get cellprocessor for String with constraints. These constraints
    +   * are evaluated against the String field for which this cellprocessor is
    +   * defined.
    +   * 
    +   * @param constraints
    +   *          map of constraints applicable to String
    +   * @return CellProcessor
    +   */
    +  private static CellProcessor getStringCellProcessor(Map<String, Object> constraints)
    +  {
    +    Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
    +        .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
    --- End diff --
    
    Why not have a map ? It is  a Map<String, Object> 
    And methods will have to convert the values to the required types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48715721
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/FileUtils.java ---
    @@ -0,0 +1,51 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.StringWriter;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +/**
    + * This class holds utility methods related to files
    + *
    + */
    +public class FileUtils
    --- End diff --
    
    would not need this anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705848
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java ---
    @@ -0,0 +1,359 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.parser;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.codehaus.jackson.map.ObjectMapper;
    +import org.codehaus.jettison.json.JSONArray;
    +import org.codehaus.jettison.json.JSONException;
    +import org.codehaus.jettison.json.JSONObject;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * <p>
    + * This is schema that defines fields and their constraints for delimited files
    + * The operators use this information to validate the incoming tuples.
    + * Information from JSON schema is saved in this object and is used by the
    + * operators
    + * <p>
    + * <br>
    + * <br>
    + * Example schema <br>
    + * <br>
    + * {@code{ "separator": ",", "quoteChar":"\"", "fields": [ "name": "adId",
    + * "type": "Integer", "constraints": "required": "true" } , { "name": "adName",
    + * "type": "String", "constraints": { "required": "true", "pattern":
    + * "[a-z].*[a-z]$", "maxLength": "20" } }, { "name": "bidPrice", "type":
    + * "Double", "constraints": { "required": "true", "minValue": "0.1", "maxValue":
    + * "3.2" } }, { "name": "startDate", "type": "Date", "constraints": { "format":
    + * "dd/MM/yyyy" } }, { "name": "securityCode", "type": "Long", "constraints": {
    + * "minValue": "10", "maxValue": "30" } }, { "name": "active", "type":
    + * "Boolean", "constraints": { "required": "true" } } ] }}
    + */
    +public class DelimitedSchema
    +{
    +
    +  /**
    +   * JSON key string for separator
    +   */
    +  private static final String SEPARATOR = "separator";
    +  /**
    +   * JSON key string for quote character
    +   */
    +  private static final String QUOTE_CHAR = "quoteChar";
    +  /**
    +   * JSON key string for line delimiter
    +   */
    +  private static final String LINE_DELIMITER = "lineDelimiter";
    +  /**
    +   * JSON key string for fields array
    +   */
    +  private static final String FIELDS = "fields";
    +  /**
    +   * JSON key string for name of the field within fields array
    +   */
    +  private static final String NAME = "name";
    +  /**
    +   * JSON key string for type of the field within fields array
    +   */
    +  private static final String TYPE = "type";
    +  /**
    +   * JSON key string for constraints for each field
    +   */
    +  private static final String CONSTRAINTS = "constraints";
    +  /**
    +   * JSON key string for required constraint
    +   */
    +  public static final String REQUIRED = "required";
    +  /**
    +   * JSON key string for equals constraint
    +   */
    +  public static final String EQUALS = "equals";
    +  /**
    +   * JSON key string for length constraint
    +   */
    +  public static final String LENGTH = "length";
    +  /**
    +   * JSON key string for min length constraint
    +   */
    +  public static final String MIN_LENGTH = "minLength";
    +  /**
    +   * JSON key string for max length constraint
    +   */
    +  public static final String MAX_LENGTH = "maxLength";
    +  /**
    +   * JSON key string for min value constraint
    +   */
    +  public static final String MIN_VALUE = "minValue";
    +  /**
    +   * JSON key string for max value constraint
    +   */
    +  public static final String MAX_VALUE = "maxValue";
    +  /**
    +   * JSON key string for regex pattern constraint
    +   */
    +  public static final String REGEX_PATTERN = "pattern";
    +  /**
    +   * JSON key string for date format constraint
    +   */
    +  public static final String DATE_FORMAT = "format";
    +  /**
    +   * JSON key string for locale constraint
    +   */
    +  public static final String LOCALE = "locale";
    +  /**
    +   * JSON key string for true value constraint
    +   */
    +  public static final String TRUE_VALUE = "trueValue";
    +  /**
    +   * JSON key string for false value constraint
    +   */
    +  public static final String FALSE_VALUE = "falseValue";
    +  /**
    +   * delimiter character provided in schema. Default is ,
    +   */
    +  private int delimiterChar = ',';
    +  /**
    +   * quote character provided in schema. Default is "
    +   */
    +  private char quoteChar = '\"';
    +  /**
    +   * line delimiter character provided in schema. Default is new line character
    +   */
    +  private String lineDelimiter = "\r\n";
    +  /**
    +   * This holds the list of field names in the same order as in the schema
    +   */
    +  private List<String> fieldNames = new LinkedList<String>();
    +  /**
    +   * This holds list of {@link Field}
    +   */
    +  private List<Field> fields = new LinkedList<Field>();
    +
    +  /**
    +   * Supported data types
    +   */
    +  public enum FIELD_TYPE
    +  {
    +    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    +  };
    +
    +  public DelimitedSchema(String json)
    +  {
    +    try {
    +      initialize(json);
    +    } catch (JSONException | IOException e) {
    +      logger.error("{}", e);
    +      throw new IllegalArgumentException(e);
    +    }
    +  }
    +
    +  /**
    +   * For a given json string, this method sets the field members
    +   * 
    +   * @param json
    +   * @throws JSONException
    +   * @throws IOException
    +   */
    +  private void initialize(String json) throws JSONException, IOException
    +  {
    +    JSONObject jo = new JSONObject(json);
    +    if (jo.has(SEPARATOR)) {
    +      delimiterChar = ((String)jo.getString(SEPARATOR)).charAt(0);
    +    }
    +    if (jo.has(QUOTE_CHAR)) {
    +      quoteChar = ((String)jo.getString(QUOTE_CHAR)).charAt(0);
    +    }
    +    if (jo.has(LINE_DELIMITER)) {
    +      lineDelimiter = (String)jo.getString(LINE_DELIMITER);
    +    }
    +
    +    JSONArray fieldArray = jo.getJSONArray(FIELDS);
    +    for (int i = 0; i < fieldArray.length(); i++) {
    +      JSONObject obj = fieldArray.getJSONObject(i);
    +      Field field = new Field(obj.getString(NAME), obj.getString(TYPE));
    +      fields.add(field);
    +      fieldNames.add(field.name);
    +      if (obj.has(CONSTRAINTS)) {
    +        JSONObject constraints = obj.getJSONObject(CONSTRAINTS);
    +        field.constraints = new ObjectMapper().readValue(constraints.toString(), HashMap.class);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Get the list of field names mentioned in schema
    +   * 
    +   * @return fieldNames
    +   */
    +  public List<String> getFieldNames()
    +  {
    +    return Collections.unmodifiableList(fieldNames);
    +  }
    +
    +  /**
    +   * Get the delimiter character
    +   * 
    +   * @return delimiterChar
    +   */
    +  public int getDelimiterChar()
    +  {
    +    return delimiterChar;
    +  }
    +
    +  /**
    +   * Get the quoteChar
    +   * 
    +   * @return quoteChar
    +   */
    +  public char getQuoteChar()
    +  {
    +    return quoteChar;
    +  }
    +
    +  /**
    +   * Get the line delimiter
    +   * 
    +   * @return lineDelimiter
    +   */
    +  public String getLineDelimiter()
    +  {
    +    return lineDelimiter;
    +  }
    +
    +  @Override
    +  public String toString()
    +  {
    +    return "DelimitedSchema [delimiterChar=" + delimiterChar + ", quoteChar=" + quoteChar + ", lineDelimiter="
    +        + lineDelimiter + ", fieldNames=" + fieldNames + ", fields=" + fields + "]";
    +  }
    +
    +  /**
    +   * Get the list of Fields.
    +   * 
    +   * @return fields
    +   */
    +  public List<Field> getFields()
    +  {
    +    return Collections.unmodifiableList(fields);
    +  }
    +
    +  /**
    +   * Objects of this class represents a particular field in the schema. Each
    +   * field has a name, type and a set of associated constraints.
    +   * 
    +   */
    +  public class Field
    +  {
    +    /**
    +     * name of the field
    +     */
    +    String name;
    +    /**
    +     * Data type of the field
    +     */
    +    FIELD_TYPE type;
    +    /**
    +     * constraints associated with the field
    +     */
    +    Map<String, Object> constraints = new HashMap<String, Object>();
    +
    +    public Field(String name, String type)
    +    {
    +      this.name = name;
    +      this.type = FIELD_TYPE.valueOf(type.toUpperCase());
    +    }
    +
    +    /**
    +     * Get the name of the field
    +     * 
    +     * @return name
    +     */
    +    public String getName()
    +    {
    +      return name;
    +    }
    +
    +    /**
    +     * Set the name of the field
    +     * 
    +     * @param name
    +     */
    +    public void setName(String name)
    +    {
    +      this.name = name;
    +    }
    +
    +    /**
    +     * Get {@link FIELD_TYPE}
    +     * 
    +     * @return type
    +     */
    +    public FIELD_TYPE getType()
    +    {
    +      return type;
    +    }
    +
    +    /**
    +     * Set {@link FIELD_TYPE}
    +     * 
    +     * @param type
    +     */
    +    public void setType(FIELD_TYPE type)
    +    {
    +      this.type = type;
    +    }
    +
    +    /**
    +     * Get the map of constraints associated with the field
    +     * 
    +     * @return constraints
    +     */
    +    public Map<String, Object> getConstraints()
    +    {
    +      return constraints;
    +    }
    +
    +    /**
    +     * Sets the map of constraints associated with the field
    +     * 
    +     * @param constraints
    +     */
    +    public void setConstraints(Map<String, Object> constraints)
    +    {
    +      this.constraints = constraints;
    +    }
    --- End diff --
    
    Do you want to provide addConstraint method here as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705087
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
    --- End diff --
    
    Should this be renamed to DelimitedParser rather than CsvParser.
    I believe it can work on delimited strings, correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r49489574
  
    --- Diff: contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java ---
    @@ -20,169 +20,453 @@
     
     import java.util.Date;
     
    -import org.joda.time.DateTime;
     import org.junit.Assert;
     import org.junit.Rule;
     import org.junit.Test;
     import org.junit.rules.TestWatcher;
     import org.junit.runner.Description;
     
    +import com.datatorrent.lib.appdata.schemas.SchemaUtils;
     import com.datatorrent.lib.testbench.CollectorTestSink;
    -import com.datatorrent.lib.util.TestUtils;
     
     public class CsvPOJOParserTest
     {
     
    -  CsvParser operator;
    -  CollectorTestSink<Object> validDataSink;
    -  CollectorTestSink<String> invalidDataSink;
    +  private static final String filename = "schema.json";
    +  CollectorTestSink<Object> error = new CollectorTestSink<Object>();
    +  CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>();
    +  CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
    +  CsvParser parser = new CsvParser();
     
       @Rule
       public Watcher watcher = new Watcher();
     
       public class Watcher extends TestWatcher
       {
    -
         @Override
         protected void starting(Description description)
         {
           super.starting(description);
    -      operator = new CsvParser();
    -      operator.setClazz(EmployeeBean.class);
    -      operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date");
    -      validDataSink = new CollectorTestSink<Object>();
    -      invalidDataSink = new CollectorTestSink<String>();
    -      TestUtils.setSink(operator.out, validDataSink);
    -      TestUtils.setSink(operator.err, invalidDataSink);
    +      parser.setClazz(Ad.class);
    +      parser.setSchema(SchemaUtils.jarResourceFileToString(filename));
    +      parser.setup(null);
    +      parser.err.setSink(error);
    +      parser.parsedOutput.setSink(objectPort);
    +      parser.out.setSink(pojoPort);
         }
     
         @Override
         protected void finished(Description description)
         {
           super.finished(description);
    -      operator.teardown();
    +      error.clear();
    +      objectPort.clear();
    +      pojoPort.clear();
         }
    -
       }
     
    +  /*
    +  * adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,isActive,isOptimized,parentCampaign,weatherTargeted,valid
    +  * 1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes
    +  * Constraints are defined in schema.json
    +  */
       @Test
    -  public void testCsvToPojoWriterDefault()
    +  public void TestParserValidInput()
       {
    -    operator.setup(null);
    -    String tuple = "john,cs,1,01/01/2015";
    -    operator.in.process(tuple);
    -    Assert.assertEquals(1, validDataSink.collectedTuples.size());
    -    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
    -    Object obj = validDataSink.collectedTuples.get(0);
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(1, objectPort.collectedTuples.size());
    +    Assert.assertEquals(1, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(0, error.collectedTuples.size());
    +    Object obj = pojoPort.collectedTuples.get(0);
    +    Ad adPojo = (Ad)obj;
         Assert.assertNotNull(obj);
    -    Assert.assertEquals(EmployeeBean.class, obj.getClass());
    -    EmployeeBean pojo = (EmployeeBean)obj;
    -    Assert.assertEquals("john", pojo.getName());
    -    Assert.assertEquals("cs", pojo.getDept());
    -    Assert.assertEquals(1, pojo.getEid());
    -    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
    -        pojo.getDateOfJoining()));
    +    Assert.assertEquals(Ad.class, obj.getClass());
    +    Assert.assertEquals(1234, adPojo.getAdId());
    +    Assert.assertTrue("adxyz".equals(adPojo.getAdName()));
    +    Assert.assertEquals(0.2, adPojo.getBidPrice(), 0.0);
    +    Assert.assertEquals(Date.class, adPojo.getStartDate().getClass());
    +    Assert.assertEquals(Date.class, adPojo.getEndDate().getClass());
    +    Assert.assertEquals(12, adPojo.getSecurityCode());
    +    Assert.assertTrue("CAMP_AD".equals(adPojo.getParentCampaign()));
    +    Assert.assertTrue(adPojo.isActive());
    +    Assert.assertFalse(adPojo.isOptimized());
    +    Assert.assertTrue("yes".equals(adPojo.getValid()));
    +  }
    +
    +  @Test
    +  public void TestParserValidInputPojoPortNotConnected()
    +  {
    +    parser.out.setSink(null);
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(1, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(0, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserValidInputClassNameNotProvided()
    +  {
    +    parser.setClazz(null);
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(1, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(0, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidAdIdInput()
    +  {
    +    String input = ",98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
       }
     
       @Test
    -  public void testCsvToPojoWriterDateFormat()
    +  public void TestParserNoCampaignIdInput()
       {
    -    operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy");
    -    operator.setup(null);
    -    String tuple = "john,cs,1,01-JAN-2015";
    -    operator.in.process(tuple);
    -    Assert.assertEquals(1, validDataSink.collectedTuples.size());
    -    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
    -    Object obj = validDataSink.collectedTuples.get(0);
    +    String input = "1234,,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(1, objectPort.collectedTuples.size());
    +    Assert.assertEquals(1, pojoPort.collectedTuples.size());
    +    Object obj = pojoPort.collectedTuples.get(0);
         Assert.assertNotNull(obj);
    -    Assert.assertEquals(EmployeeBean.class, obj.getClass());
    -    EmployeeBean pojo = (EmployeeBean)obj;
    -    Assert.assertEquals("john", pojo.getName());
    -    Assert.assertEquals("cs", pojo.getDept());
    -    Assert.assertEquals(1, pojo.getEid());
    -    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
    -        pojo.getDateOfJoining()));
    +    Assert.assertEquals(Ad.class, obj.getClass());
    +    Assert.assertEquals(0, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidCampaignIdInput()
    +  {
    +    String input = "1234,9833,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidAdNameInput()
    +  {
    +    String input = "1234,98233,adxyz123,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidBidPriceInput()
    +  {
    +    String input = "1234,98233,adxyz,3.3,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    --- End diff --
    
    parser.teardown() call should be part of finished() call of TestWatcher


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48877003
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +69,178 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    -
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
    +  /**
    +   * Contents of the schema
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private String schema;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    super.beginWindow(windowId);
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    throw new UnsupportedOperationException("Not supported");
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
           }
    -    } catch (IOException e) {
    -      DTThrowable.rethrow(e);
    +      errorTupleCount++;
    +      return;
         }
    -  }
    -
    -  @Override
    -  public String processErorrTuple(String input)
    -  {
    -    return input;
    -  }
    -
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    -    }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    +    String incomingString = new String(tuple);
    +    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple"));
    +      }
    +      errorTupleCount++;
    +      return;
         }
    +    try {
    +      if (parsedOutput.isConnected()) {
    +        csvStringReader.open(incomingString);
    +        Map<String, Object> map = csvMapReader.read(nameMapping, processors);
    +        parsedOutput.emit(map);
    +        parsedOutputCount++;
    +      }
     
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    +      if (out.isConnected() && clazz != null) {
    +        csvStringReader.open(incomingString);
    +        Object obj = csvBeanReader.read(clazz, nameMapping, processors);
    +        out.emit(obj);
    +        emittedObjectCount++;
    +      }
     
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    +    } catch (SuperCsvException | IOException e) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
    +      }
    +      errorTupleCount++;
    +      logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
         }
    -
    -  }
    -
    -  /**
    -   * Gets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @return An array list of Fields.
    -   */
    -  public ArrayList<Field> getFields()
    -  {
    -    return fields;
       }
     
    -  /**
    -   * Sets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @param fields
    -   *          An array list of Fields.
    -   */
    -  public void setFields(ArrayList<Field> fields)
    -  {
    -    this.fields = fields;
    -  }
    -
    -  /**
    -   * Gets the delimiter which separates fields in incoming data.
    -   * 
    -   * @return fieldDelimiter
    -   */
    -  public int getFieldDelimiter()
    +  @Override
    +  public KeyValPair<String, String> processErrorTuple(byte[] input)
       {
    -    return fieldDelimiter;
    +    throw new UnsupportedOperationException("Not supported");
       }
     
       /**
    -   * Sets the delimiter which separates fields in incoming data.
    -   * 
    -   * @param fieldDelimiter
    +   * Returns array of cellprocessors, one for each field
        */
    -  public void setFieldDelimiter(int fieldDelimiter)
    +  private CellProcessor[] getProcessor(List<Field> fields)
       {
    -    this.fieldDelimiter = fieldDelimiter;
    +    CellProcessor[] processor = new CellProcessor[fields.size()];
    +    int fieldCount = 0;
    +    for (Field field : fields) {
    +      processor[fieldCount++] = CellProcessorBuilder.getCellProcessor(field.getType(), field.getConstraints());
    +    }
    +    return processor;
       }
     
    -  /**
    -   * Gets the delimiter which separates lines in incoming data.
    -   * 
    -   * @return lineDelimiter
    -   */
    -  public String getLineDelimiter()
    +  @Override
    +  public void teardown()
       {
    -    return lineDelimiter;
    +    try {
    +      csvMapReader.close();
    +    } catch (IOException e) {
    +      logger.error("Error while closing csv map reader {}", e.getMessage());
    --- End diff --
    
    It should throw exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48803387
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -19,42 +19,49 @@
     package com.datatorrent.contrib.parser;
     
     import java.io.IOException;
    -import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
     
     import javax.validation.constraints.NotNull;
     
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    -import org.supercsv.cellprocessor.Optional;
    -import org.supercsv.cellprocessor.ParseBool;
    -import org.supercsv.cellprocessor.ParseChar;
    -import org.supercsv.cellprocessor.ParseDate;
    -import org.supercsv.cellprocessor.ParseDouble;
    -import org.supercsv.cellprocessor.ParseInt;
    -import org.supercsv.cellprocessor.ParseLong;
     import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.exception.SuperCsvException;
     import org.supercsv.io.CsvBeanReader;
    +import org.supercsv.io.CsvMapReader;
     import org.supercsv.prefs.CsvPreference;
    +
    +import org.apache.commons.lang.StringUtils;
     import org.apache.hadoop.classification.InterfaceStability;
     
    -import com.datatorrent.api.Context;
    +import com.datatorrent.api.AutoMetric;
     import com.datatorrent.api.Context.OperatorContext;
    +import com.datatorrent.api.DefaultOutputPort;
    +import com.datatorrent.contrib.parser.DelimitedSchema.Field;
     import com.datatorrent.lib.parser.Parser;
    +import com.datatorrent.lib.util.KeyValPair;
     import com.datatorrent.lib.util.ReusableStringReader;
    -import com.datatorrent.netlet.util.DTThrowable;
     
     /**
    - * Operator that converts CSV string to Pojo <br>
    + * Operator that parses a delimited tuple against a specified schema <br>
    + * Schema is specified in a json format as per {@link DelimitedSchema} that
    + * contains field information and constraints for each field <br>
      * Assumption is that each field in the delimited data should map to a simple
      * java type.<br>
      * <br>
      * <b>Properties</b> <br>
    - * <b>fieldInfo</b>:User need to specify fields and their types as a comma
    - * separated string having format &lt;NAME&gt;:&lt;TYPE&gt;|&lt;FORMAT&gt; in
    - * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as
    - * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy <br>
    - * <b>fieldDelimiter</b>: Default is comma <br>
    - * <b>lineDelimiter</b>: Default is '\r\n'
    + * <b>schemaPath</b>:Complete path of schema file in HDFS <br>
    + * <b>clazz</b>:Pojo class <br>
    + * <b>Ports</b> <br>
    --- End diff --
    
    Port names mentioned below don't match actual variable names. Port names are `in` for input, `out` for pojo, `err` for error and `parsedOutput` for  map


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705255
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Contents of the schema
    +   */
    +  private String schema;
    +  /**
    +   * Complete path where schema resides.
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private transient String schemaPath;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    --- End diff --
    
    If condition of err.isConnected is not required when action is just to emit.
    
    By default its connected to Sink.BLACKHOLE on which put object does not do anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48803078
  
    --- Diff: contrib/pom.xml ---
    @@ -614,8 +614,8 @@
         <dependency>
           <!-- required by Csv parser and formatter -->
           <groupId>net.sf.supercsv</groupId>
    -      <artifactId>super-csv-joda</artifactId>
    -      <version>2.3.1</version>
    +      <artifactId>super-csv</artifactId>
    +      <version>2.4.0</version>
    --- End diff --
    
    Make this dependency as optional


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48704882
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java ---
    @@ -0,0 +1,456 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.parser;
    +
    +import java.util.Map;
    +
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.cellprocessor.ift.DoubleCellProcessor;
    +import org.supercsv.cellprocessor.ift.LongCellProcessor;
    +import org.supercsv.util.CsvContext;
    +
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.contrib.parser.DelimitedSchema.FIELD_TYPE;
    +
    +/**
    + * Helper class with methods to generate CellProcessor objects. Cell processors
    + * are an integral part of reading and writing with Super CSV - they automate
    + * the data type conversions, and enforce constraints. They implement the chain
    + * of responsibility design pattern - each processor has a single, well-defined
    + * purpose and can be chained together with other processors to fully automate
    + * all of the required conversions and constraint validation for a single
    + * delimited record.
    + * 
    + */
    +public class CellProcessorBuilder
    +{
    +
    +  /**
    +   * Method to get cell processors for given field type and constraints
    +   * 
    +   * @param fieldType
    +   *          data type of the field
    +   * @param constraints
    +   *          a map of constraints
    +   * @return
    +   */
    +  public static CellProcessor getCellProcessor(FIELD_TYPE fieldType, Map<String, Object> constraints)
    +  {
    +    switch (fieldType) {
    +      case STRING:
    +        return getStringCellProcessor(constraints);
    +      case INTEGER:
    +        return getIntegerCellProcessor(constraints);
    +      case LONG:
    +        return getLongCellProcessor(constraints);
    +      case FLOAT:
    +      case DOUBLE:
    +        return getDoubleCellProcessor(constraints);
    +      case CHARACTER:
    +        return getCharCellProcessor(constraints);
    +      case BOOLEAN:
    +        return getBooleanCellProcessor(constraints);
    +      case DATE:
    +        return getDateCellProcessor(constraints);
    +      default:
    +        return null;
    +    }
    +  }
    +
    +  /**
    +   * Method to get cellprocessor for String with constraints. These constraints
    +   * are evaluated against the String field for which this cellprocessor is
    +   * defined.
    +   * 
    +   * @param constraints
    +   *          map of constraints applicable to String
    +   * @return CellProcessor
    +   */
    +  private static CellProcessor getStringCellProcessor(Map<String, Object> constraints)
    +  {
    +    Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
    +        .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
    +    Integer strLen = constraints.get(DelimitedSchema.LENGTH) == null ? null : Integer.parseInt((String)constraints
    +        .get(DelimitedSchema.LENGTH));
    +    Integer minLength = constraints.get(DelimitedSchema.MIN_LENGTH) == null ? null : Integer
    +        .parseInt((String)constraints.get(DelimitedSchema.MIN_LENGTH));
    +    Integer maxLength = constraints.get(DelimitedSchema.MAX_LENGTH) == null ? null : Integer
    +        .parseInt((String)constraints.get(DelimitedSchema.MAX_LENGTH));
    +    String equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : (String)constraints
    +        .get(DelimitedSchema.EQUALS);
    +    String pattern = constraints.get(DelimitedSchema.REGEX_PATTERN) == null ? null : (String)constraints
    +        .get(DelimitedSchema.REGEX_PATTERN);
    +
    +    CellProcessor cellProcessor = null;
    +    if (StringUtils.isNotBlank(equals)) {
    --- End diff --
    
    For the if/else block:
    Does this mean there is its not allowed to have multiple of them as constraints?
    If yes, why?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48795848
  
    --- Diff: contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java ---
    @@ -20,169 +20,425 @@
     
     import java.util.Date;
     
    -import org.joda.time.DateTime;
    +import org.junit.After;
     import org.junit.Assert;
    -import org.junit.Rule;
    +import org.junit.Before;
     import org.junit.Test;
    -import org.junit.rules.TestWatcher;
    -import org.junit.runner.Description;
     
    +import com.datatorrent.lib.appdata.schemas.SchemaUtils;
     import com.datatorrent.lib.testbench.CollectorTestSink;
    -import com.datatorrent.lib.util.TestUtils;
     
     public class CsvPOJOParserTest
     {
     
    -  CsvParser operator;
    -  CollectorTestSink<Object> validDataSink;
    -  CollectorTestSink<String> invalidDataSink;
    +  private static final String filename = "schema.json";
    +  CollectorTestSink<Object> error = new CollectorTestSink<Object>();
    +  CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>();
    +  CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
    +  CsvParser parser = new CsvParser();
     
    -  @Rule
    -  public Watcher watcher = new Watcher();
    +  /*
    +  * adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,isActive,isOptimized,parentCampaign,weatherTargeted,valid
    +  * 1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes
    +  * Constraints are defined in schema.json
    +  */
     
    -  public class Watcher extends TestWatcher
    +  @Before
    +  public void setup()
       {
    +    parser.err.setSink(error);
    +    parser.parsedOutput.setSink(objectPort);
    +    parser.out.setSink(pojoPort);
    +    parser.setClazz(Ad.class);
    +    parser.setSchema(SchemaUtils.jarResourceFileToString(filename));
    +    parser.setup(null);
    +  }
     
    -    @Override
    -    protected void starting(Description description)
    -    {
    -      super.starting(description);
    -      operator = new CsvParser();
    -      operator.setClazz(EmployeeBean.class);
    -      operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date");
    -      validDataSink = new CollectorTestSink<Object>();
    -      invalidDataSink = new CollectorTestSink<String>();
    -      TestUtils.setSink(operator.out, validDataSink);
    -      TestUtils.setSink(operator.err, invalidDataSink);
    -    }
    +  @After
    +  public void tearDown()
    +  {
    +    error.clear();
    +    objectPort.clear();
    +    pojoPort.clear();
    +  }
     
    -    @Override
    -    protected void finished(Description description)
    -    {
    -      super.finished(description);
    -      operator.teardown();
    -    }
    +  @Test
    +  public void TestParserValidInput()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(1, objectPort.collectedTuples.size());
    +    Assert.assertEquals(1, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(0, error.collectedTuples.size());
    +    Object obj = pojoPort.collectedTuples.get(0);
    +    Ad adPojo = (Ad)obj;
    +    Assert.assertNotNull(obj);
    +    Assert.assertEquals(Ad.class, obj.getClass());
    +    Assert.assertEquals(1234, adPojo.getAdId());
    +    Assert.assertTrue("adxyz".equals(adPojo.getAdName()));
    +    Assert.assertEquals(0.2, adPojo.getBidPrice(), 0.0);
    +    Assert.assertEquals(Date.class, adPojo.getStartDate().getClass());
    +    Assert.assertEquals(Date.class, adPojo.getEndDate().getClass());
    +    Assert.assertEquals(12, adPojo.getSecurityCode());
    +    Assert.assertTrue("CAMP_AD".equals(adPojo.getParentCampaign()));
    +    Assert.assertTrue(adPojo.isActive());
    +    Assert.assertFalse(adPojo.isOptimized());
    +    Assert.assertTrue("yes".equals(adPojo.getValid()));
    +  }
     
    +  @Test
    +  public void TestParserValidInputPojoPortNotConnected()
    +  {
    +    parser.out.setSink(null);
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(1, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(0, error.collectedTuples.size());
       }
     
       @Test
    -  public void testCsvToPojoWriterDefault()
    +  public void TestParserValidInputClassNameNotProvided()
       {
    -    operator.setup(null);
    -    String tuple = "john,cs,1,01/01/2015";
    -    operator.in.process(tuple);
    -    Assert.assertEquals(1, validDataSink.collectedTuples.size());
    -    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
    -    Object obj = validDataSink.collectedTuples.get(0);
    -    Assert.assertNotNull(obj);
    -    Assert.assertEquals(EmployeeBean.class, obj.getClass());
    -    EmployeeBean pojo = (EmployeeBean)obj;
    -    Assert.assertEquals("john", pojo.getName());
    -    Assert.assertEquals("cs", pojo.getDept());
    -    Assert.assertEquals(1, pojo.getEid());
    -    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
    -        pojo.getDateOfJoining()));
    +    parser.setClazz(null);
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(1, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(0, error.collectedTuples.size());
       }
     
       @Test
    -  public void testCsvToPojoWriterDateFormat()
    +  public void TestParserInvalidAdIdInput()
       {
    -    operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy");
    -    operator.setup(null);
    -    String tuple = "john,cs,1,01-JAN-2015";
    -    operator.in.process(tuple);
    -    Assert.assertEquals(1, validDataSink.collectedTuples.size());
    -    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
    -    Object obj = validDataSink.collectedTuples.get(0);
    +    String input = ",98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserNoCampaignIdInput()
    +  {
    +    String input = "1234,,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(1, objectPort.collectedTuples.size());
    +    Assert.assertEquals(1, pojoPort.collectedTuples.size());
    +    Object obj = pojoPort.collectedTuples.get(0);
         Assert.assertNotNull(obj);
    -    Assert.assertEquals(EmployeeBean.class, obj.getClass());
    -    EmployeeBean pojo = (EmployeeBean)obj;
    -    Assert.assertEquals("john", pojo.getName());
    -    Assert.assertEquals("cs", pojo.getDept());
    -    Assert.assertEquals(1, pojo.getEid());
    -    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
    -        pojo.getDateOfJoining()));
    +    Assert.assertEquals(Ad.class, obj.getClass());
    +    Assert.assertEquals(0, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidCampaignIdInput()
    +  {
    +    String input = "1234,9833,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
       }
     
       @Test
    -  public void testCsvToPojoWriterDateFormatMultiple()
    +  public void TestParserInvalidAdNameInput()
       {
    -    operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy,dateOfBirth:date");
    -    operator.setup(null);
    -    String tuple = "john,cs,1,01-JAN-2015,01/01/2015";
    -    operator.in.process(tuple);
    -    Assert.assertEquals(1, validDataSink.collectedTuples.size());
    -    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
    -    Object obj = validDataSink.collectedTuples.get(0);
    +    String input = "1234,98233,adxyz123,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidBidPriceInput()
    +  {
    +    String input = "1234,98233,adxyz,3.3,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidStartDateInput()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-30-08 02:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidSecurityCodeInput()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,85,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidisActiveInput()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,yo,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidParentCampaignInput()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserValidisOptimized()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(1, objectPort.collectedTuples.size());
    +    Assert.assertEquals(1, pojoPort.collectedTuples.size());
    +    Object obj = pojoPort.collectedTuples.get(0);
         Assert.assertNotNull(obj);
    -    Assert.assertEquals(EmployeeBean.class, obj.getClass());
    -    EmployeeBean pojo = (EmployeeBean)obj;
    -    Assert.assertEquals("john", pojo.getName());
    -    Assert.assertEquals("cs", pojo.getDept());
    -    Assert.assertEquals(1, pojo.getEid());
    -    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
    -        pojo.getDateOfJoining()));
    -    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
    -        pojo.getDateOfBirth()));
    +    Assert.assertEquals(Ad.class, obj.getClass());
    +    Assert.assertEquals(0, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInValidisOptimized()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION,CAMP,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInValidWeatherTargeting()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,NO,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserNullOrBlankInput()
    +  {
    +    parser.in.process(null);
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserHeaderAsInput()
    +  {
    +    String input = "adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,active,optimized,parentCampaign,weatherTargeted,valid";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserLessFields()
    +  {
    +    parser.in.process("1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION".getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserMoreFields()
    +  {
    +    parser.in.process("1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION,CAMP_AD,Y,yes,ExtraField"
    +        .getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
       }
     
    -  public static class EmployeeBean
    +  public static class Ad
    --- End diff --
    
    Could we not enhance EmployeeBean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705430
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Contents of the schema
    +   */
    +  private String schema;
    +  /**
    +   * Complete path where schema resides.
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private transient String schemaPath;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
           }
    -    } catch (IOException e) {
    -      DTThrowable.rethrow(e);
    -    }
    -  }
    -
    -  @Override
    -  public String processErorrTuple(String input)
    -  {
    -    return input;
    -  }
    -
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    +      errorTupleCount++;
    +      return;
         }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    +    String incomingString = new String(tuple);
    +    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple"));
    +      }
    +      errorTupleCount++;
    +      return;
         }
    +    try {
    +      if (parsedOutput.isConnected()) {
    +        csvStringReader.open(incomingString);
    +        Map<String, Object> map = csvMapReader.read(nameMapping, processors);
    +        parsedOutput.emit(map);
    +        parsedOutputCount++;
    +      }
     
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    +      if (out.isConnected() && clazz != null) {
    +        csvStringReader.open(incomingString);
    +        Object obj = csvBeanReader.read(clazz, nameMapping, processors);
    +        out.emit(obj);
    +        emittedObjectCount++;
    +      }
     
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    +    } catch (SuperCsvException e) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
    +      }
    +      errorTupleCount++;
    +      logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
    +    } catch (IOException e) {
    +      logger.error("Exception in process method {}", e.getMessage());
    +      DTThrowable.rethrow(e);
         }
    -
    -  }
    -
    -  /**
    -   * Gets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @return An array list of Fields.
    -   */
    -  public ArrayList<Field> getFields()
    -  {
    -    return fields;
    -  }
    -
    -  /**
    -   * Sets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @param fields
    -   *          An array list of Fields.
    -   */
    -  public void setFields(ArrayList<Field> fields)
    -  {
    -    this.fields = fields;
       }
     
    -  /**
    -   * Gets the delimiter which separates fields in incoming data.
    -   * 
    -   * @return fieldDelimiter
    -   */
    -  public int getFieldDelimiter()
    +  @Override
    +  public KeyValPair<String, String> processErorrTuple(byte[] input)
    --- End diff --
    
    Spelling of "Error"...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705873
  
    --- Diff: library/src/main/java/com/datatorrent/lib/util/FileUtils.java ---
    @@ -0,0 +1,51 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.lib.util;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.StringWriter;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +/**
    + * This class holds utility methods related to files
    + *
    + */
    +public class FileUtils
    --- End diff --
    
    Can you please check if there is already a utility like this existing in malhar?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705205
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Contents of the schema
    +   */
    +  private String schema;
    +  /**
    +   * Complete path where schema resides.
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private transient String schemaPath;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    //This method is not invoked for CSV parser
    +    return null;
    --- End diff --
    
    If this is not invoked, please throw UnsupportedOperationException rather then returning any value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48709009
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
    --- End diff --
    
    This is enhancement to existing operator in the library. Not sure whether i can rename the operator itself. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705370
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Contents of the schema
    +   */
    +  private String schema;
    +  /**
    +   * Complete path where schema resides.
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private transient String schemaPath;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
           }
    -    } catch (IOException e) {
    -      DTThrowable.rethrow(e);
    -    }
    -  }
    -
    -  @Override
    -  public String processErorrTuple(String input)
    -  {
    -    return input;
    -  }
    -
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    +      errorTupleCount++;
    +      return;
         }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    +    String incomingString = new String(tuple);
    +    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple"));
    +      }
    +      errorTupleCount++;
    +      return;
         }
    +    try {
    +      if (parsedOutput.isConnected()) {
    +        csvStringReader.open(incomingString);
    +        Map<String, Object> map = csvMapReader.read(nameMapping, processors);
    +        parsedOutput.emit(map);
    +        parsedOutputCount++;
    +      }
     
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    +      if (out.isConnected() && clazz != null) {
    +        csvStringReader.open(incomingString);
    +        Object obj = csvBeanReader.read(clazz, nameMapping, processors);
    +        out.emit(obj);
    +        emittedObjectCount++;
    +      }
     
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    +    } catch (SuperCsvException e) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
    +      }
    +      errorTupleCount++;
    +      logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
    +    } catch (IOException e) {
    +      logger.error("Exception in process method {}", e.getMessage());
    --- End diff --
    
    Why is IOException case distinguished from SuprtCsvException?
    
    In case of SuperCsvException, we know for sure that it failed to parse, hence sent to err port.
    In case of IOException do we know for sure that its not the case for sending to err port?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48875936
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +69,178 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    -
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
    +  /**
    +   * Contents of the schema
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private String schema;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    super.beginWindow(windowId);
    --- End diff --
    
    Please add test case for auto-metric verification


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48796140
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +69,177 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    -
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
    +  /**
    +   * Contents of the schema
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private String schema;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
    --- End diff --
    
    It should call super.beginWindow() to reset other autometric every window


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705810
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java ---
    @@ -0,0 +1,359 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.parser;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.codehaus.jackson.map.ObjectMapper;
    +import org.codehaus.jettison.json.JSONArray;
    +import org.codehaus.jettison.json.JSONException;
    +import org.codehaus.jettison.json.JSONObject;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * <p>
    + * This is schema that defines fields and their constraints for delimited files
    + * The operators use this information to validate the incoming tuples.
    + * Information from JSON schema is saved in this object and is used by the
    + * operators
    + * <p>
    + * <br>
    + * <br>
    + * Example schema <br>
    + * <br>
    + * {@code{ "separator": ",", "quoteChar":"\"", "fields": [ "name": "adId",
    + * "type": "Integer", "constraints": "required": "true" } , { "name": "adName",
    + * "type": "String", "constraints": { "required": "true", "pattern":
    + * "[a-z].*[a-z]$", "maxLength": "20" } }, { "name": "bidPrice", "type":
    + * "Double", "constraints": { "required": "true", "minValue": "0.1", "maxValue":
    + * "3.2" } }, { "name": "startDate", "type": "Date", "constraints": { "format":
    + * "dd/MM/yyyy" } }, { "name": "securityCode", "type": "Long", "constraints": {
    + * "minValue": "10", "maxValue": "30" } }, { "name": "active", "type":
    + * "Boolean", "constraints": { "required": "true" } } ] }}
    + */
    +public class DelimitedSchema
    +{
    +
    +  /**
    +   * JSON key string for separator
    +   */
    +  private static final String SEPARATOR = "separator";
    +  /**
    +   * JSON key string for quote character
    +   */
    +  private static final String QUOTE_CHAR = "quoteChar";
    +  /**
    +   * JSON key string for line delimiter
    +   */
    +  private static final String LINE_DELIMITER = "lineDelimiter";
    +  /**
    +   * JSON key string for fields array
    +   */
    +  private static final String FIELDS = "fields";
    +  /**
    +   * JSON key string for name of the field within fields array
    +   */
    +  private static final String NAME = "name";
    +  /**
    +   * JSON key string for type of the field within fields array
    +   */
    +  private static final String TYPE = "type";
    +  /**
    +   * JSON key string for constraints for each field
    +   */
    +  private static final String CONSTRAINTS = "constraints";
    +  /**
    +   * JSON key string for required constraint
    +   */
    +  public static final String REQUIRED = "required";
    +  /**
    +   * JSON key string for equals constraint
    +   */
    +  public static final String EQUALS = "equals";
    +  /**
    +   * JSON key string for length constraint
    +   */
    +  public static final String LENGTH = "length";
    +  /**
    +   * JSON key string for min length constraint
    +   */
    +  public static final String MIN_LENGTH = "minLength";
    +  /**
    +   * JSON key string for max length constraint
    +   */
    +  public static final String MAX_LENGTH = "maxLength";
    +  /**
    +   * JSON key string for min value constraint
    +   */
    +  public static final String MIN_VALUE = "minValue";
    +  /**
    +   * JSON key string for max value constraint
    +   */
    +  public static final String MAX_VALUE = "maxValue";
    +  /**
    +   * JSON key string for regex pattern constraint
    +   */
    +  public static final String REGEX_PATTERN = "pattern";
    +  /**
    +   * JSON key string for date format constraint
    +   */
    +  public static final String DATE_FORMAT = "format";
    +  /**
    +   * JSON key string for locale constraint
    +   */
    +  public static final String LOCALE = "locale";
    +  /**
    +   * JSON key string for true value constraint
    +   */
    +  public static final String TRUE_VALUE = "trueValue";
    +  /**
    +   * JSON key string for false value constraint
    +   */
    +  public static final String FALSE_VALUE = "falseValue";
    +  /**
    +   * delimiter character provided in schema. Default is ,
    +   */
    +  private int delimiterChar = ',';
    +  /**
    +   * quote character provided in schema. Default is "
    +   */
    +  private char quoteChar = '\"';
    +  /**
    +   * line delimiter character provided in schema. Default is new line character
    +   */
    +  private String lineDelimiter = "\r\n";
    +  /**
    +   * This holds the list of field names in the same order as in the schema
    +   */
    +  private List<String> fieldNames = new LinkedList<String>();
    +  /**
    +   * This holds list of {@link Field}
    +   */
    +  private List<Field> fields = new LinkedList<Field>();
    +
    +  /**
    +   * Supported data types
    +   */
    +  public enum FIELD_TYPE
    --- End diff --
    
    Can you please rename this to "FieldType"
    All caps is usually used for parameterized classes OR static final constants etc.
    
    I actually got confused in CellProcessorBuilder class when I saw FIELD_TYPE as input param type to be method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48708952
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java ---
    @@ -0,0 +1,456 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.parser;
    +
    +import java.util.Map;
    +
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.cellprocessor.ift.DoubleCellProcessor;
    +import org.supercsv.cellprocessor.ift.LongCellProcessor;
    +import org.supercsv.util.CsvContext;
    +
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.contrib.parser.DelimitedSchema.FIELD_TYPE;
    +
    +/**
    + * Helper class with methods to generate CellProcessor objects. Cell processors
    + * are an integral part of reading and writing with Super CSV - they automate
    + * the data type conversions, and enforce constraints. They implement the chain
    + * of responsibility design pattern - each processor has a single, well-defined
    + * purpose and can be chained together with other processors to fully automate
    + * all of the required conversions and constraint validation for a single
    + * delimited record.
    + * 
    + */
    +public class CellProcessorBuilder
    +{
    +
    +  /**
    +   * Method to get cell processors for given field type and constraints
    +   * 
    +   * @param fieldType
    +   *          data type of the field
    +   * @param constraints
    +   *          a map of constraints
    +   * @return
    +   */
    +  public static CellProcessor getCellProcessor(FIELD_TYPE fieldType, Map<String, Object> constraints)
    +  {
    +    switch (fieldType) {
    +      case STRING:
    +        return getStringCellProcessor(constraints);
    +      case INTEGER:
    +        return getIntegerCellProcessor(constraints);
    +      case LONG:
    +        return getLongCellProcessor(constraints);
    +      case FLOAT:
    +      case DOUBLE:
    +        return getDoubleCellProcessor(constraints);
    +      case CHARACTER:
    +        return getCharCellProcessor(constraints);
    +      case BOOLEAN:
    +        return getBooleanCellProcessor(constraints);
    +      case DATE:
    +        return getDateCellProcessor(constraints);
    +      default:
    +        return null;
    +    }
    +  }
    +
    +  /**
    +   * Method to get cellprocessor for String with constraints. These constraints
    +   * are evaluated against the String field for which this cellprocessor is
    +   * defined.
    +   * 
    +   * @param constraints
    +   *          map of constraints applicable to String
    +   * @return CellProcessor
    +   */
    +  private static CellProcessor getStringCellProcessor(Map<String, Object> constraints)
    +  {
    +    Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
    +        .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
    +    Integer strLen = constraints.get(DelimitedSchema.LENGTH) == null ? null : Integer.parseInt((String)constraints
    +        .get(DelimitedSchema.LENGTH));
    +    Integer minLength = constraints.get(DelimitedSchema.MIN_LENGTH) == null ? null : Integer
    +        .parseInt((String)constraints.get(DelimitedSchema.MIN_LENGTH));
    +    Integer maxLength = constraints.get(DelimitedSchema.MAX_LENGTH) == null ? null : Integer
    +        .parseInt((String)constraints.get(DelimitedSchema.MAX_LENGTH));
    +    String equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : (String)constraints
    +        .get(DelimitedSchema.EQUALS);
    +    String pattern = constraints.get(DelimitedSchema.REGEX_PATTERN) == null ? null : (String)constraints
    +        .get(DelimitedSchema.REGEX_PATTERN);
    +
    +    CellProcessor cellProcessor = null;
    +    if (StringUtils.isNotBlank(equals)) {
    --- End diff --
    
    We can surely have multiple constraints. However for String datatype, the constraints we are supporting are  equals, regex, string length, min and max length. For these, specifying multiple wouldn't make much sense. So, even if user provide multiple constraints, e.g regex and equals OR equals and length, the if/else block adds appropriate processors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48704818
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java ---
    @@ -0,0 +1,456 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.parser;
    +
    +import java.util.Map;
    +
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.cellprocessor.ift.DoubleCellProcessor;
    +import org.supercsv.cellprocessor.ift.LongCellProcessor;
    +import org.supercsv.util.CsvContext;
    +
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.contrib.parser.DelimitedSchema.FIELD_TYPE;
    +
    +/**
    + * Helper class with methods to generate CellProcessor objects. Cell processors
    + * are an integral part of reading and writing with Super CSV - they automate
    + * the data type conversions, and enforce constraints. They implement the chain
    + * of responsibility design pattern - each processor has a single, well-defined
    + * purpose and can be chained together with other processors to fully automate
    + * all of the required conversions and constraint validation for a single
    + * delimited record.
    + * 
    + */
    +public class CellProcessorBuilder
    +{
    +
    +  /**
    +   * Method to get cell processors for given field type and constraints
    +   * 
    +   * @param fieldType
    +   *          data type of the field
    +   * @param constraints
    +   *          a map of constraints
    +   * @return
    +   */
    +  public static CellProcessor getCellProcessor(FIELD_TYPE fieldType, Map<String, Object> constraints)
    +  {
    +    switch (fieldType) {
    +      case STRING:
    +        return getStringCellProcessor(constraints);
    +      case INTEGER:
    +        return getIntegerCellProcessor(constraints);
    +      case LONG:
    +        return getLongCellProcessor(constraints);
    +      case FLOAT:
    +      case DOUBLE:
    +        return getDoubleCellProcessor(constraints);
    +      case CHARACTER:
    +        return getCharCellProcessor(constraints);
    +      case BOOLEAN:
    +        return getBooleanCellProcessor(constraints);
    +      case DATE:
    +        return getDateCellProcessor(constraints);
    +      default:
    +        return null;
    +    }
    +  }
    +
    +  /**
    +   * Method to get cellprocessor for String with constraints. These constraints
    +   * are evaluated against the String field for which this cellprocessor is
    +   * defined.
    +   * 
    +   * @param constraints
    +   *          map of constraints applicable to String
    +   * @return CellProcessor
    +   */
    +  private static CellProcessor getStringCellProcessor(Map<String, Object> constraints)
    +  {
    +    Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
    +        .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
    --- End diff --
    
    Why should this class care about the conversion to from strings to respective types?
    
    Also if everything is going to be string, why not have a Map<String, String>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48877866
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +69,178 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    -
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
    +  /**
    +   * Contents of the schema
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private String schema;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
    --- End diff --
    
    Javadoc for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48715692
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Contents of the schema
    +   */
    +  private String schema;
    +  /**
    +   * Complete path where schema resides.
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private transient String schemaPath;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
           }
    -    } catch (IOException e) {
    -      DTThrowable.rethrow(e);
    -    }
    -  }
    -
    -  @Override
    -  public String processErorrTuple(String input)
    -  {
    -    return input;
    -  }
    -
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    +      errorTupleCount++;
    +      return;
         }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    +    String incomingString = new String(tuple);
    +    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple"));
    +      }
    +      errorTupleCount++;
    +      return;
         }
    +    try {
    +      if (parsedOutput.isConnected()) {
    +        csvStringReader.open(incomingString);
    +        Map<String, Object> map = csvMapReader.read(nameMapping, processors);
    +        parsedOutput.emit(map);
    +        parsedOutputCount++;
    +      }
     
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    +      if (out.isConnected() && clazz != null) {
    +        csvStringReader.open(incomingString);
    +        Object obj = csvBeanReader.read(clazz, nameMapping, processors);
    +        out.emit(obj);
    +        emittedObjectCount++;
    +      }
     
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    +    } catch (SuperCsvException e) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
    +      }
    +      errorTupleCount++;
    +      logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
    +    } catch (IOException e) {
    +      logger.error("Exception in process method {}", e.getMessage());
    +      DTThrowable.rethrow(e);
         }
    -
    -  }
    -
    -  /**
    -   * Gets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @return An array list of Fields.
    -   */
    -  public ArrayList<Field> getFields()
    -  {
    -    return fields;
    -  }
    -
    -  /**
    -   * Sets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @param fields
    -   *          An array list of Fields.
    -   */
    -  public void setFields(ArrayList<Field> fields)
    -  {
    -    this.fields = fields;
       }
     
    -  /**
    -   * Gets the delimiter which separates fields in incoming data.
    -   * 
    -   * @return fieldDelimiter
    -   */
    -  public int getFieldDelimiter()
    +  @Override
    +  public KeyValPair<String, String> processErorrTuple(byte[] input)
       {
    -    return fieldDelimiter;
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       /**
    -   * Sets the delimiter which separates fields in incoming data.
    -   * 
    -   * @param fieldDelimiter
    +   * Returns array of cellprocessors, one for each field
        */
    -  public void setFieldDelimiter(int fieldDelimiter)
    +  private CellProcessor[] getProcessor(List<Field> fields)
       {
    -    this.fieldDelimiter = fieldDelimiter;
    +    CellProcessor[] processor = new CellProcessor[fields.size()];
    +    int fieldCount = 0;
    +    for (Field field : fields) {
    +      processor[fieldCount++] = CellProcessorBuilder.getCellProcessor(field.getType(), field.getConstraints());
    +    }
    +    return processor;
       }
     
    -  /**
    -   * Gets the delimiter which separates lines in incoming data.
    -   * 
    -   * @return lineDelimiter
    -   */
    -  public String getLineDelimiter()
    +  @Override
    +  public void teardown()
       {
    -    return lineDelimiter;
    +    try {
    +      csvMapReader.close();
    +    } catch (IOException e) {
    +      logger.error("Error while closing csv map reader {}", e.getMessage());
    +    }
    +    try {
    +      csvBeanReader.close();
    +    } catch (IOException e) {
    +      logger.error("Error while closing csv bean reader {}", e.getMessage());
    +    }
       }
     
       /**
    -   * Sets the delimiter which separates line in incoming data.
    +   * Complete hdfs path of schema
        * 
    -   * @param lineDelimiter
    +   * @return
        */
    -  public void setLineDelimiter(String lineDelimiter)
    +  public String getSchemaPath()
       {
    -    this.lineDelimiter = lineDelimiter;
    +    return schemaPath;
       }
     
       /**
    -   * Gets the name of the fields with type and format ( for date ) as comma
    -   * separated string in same order as incoming data. e.g
    -   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
    +   * Set path of schema
        * 
    -   * @return fieldInfo
    +   * @param schemaPath
    +   *          path of the schema file in hdfs
        */
    -  public String getFieldInfo()
    +  public void setSchemaPath(String schemaPath)
    --- End diff --
    
    Good suggestion. Will make necessary changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705438
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Contents of the schema
    +   */
    +  private String schema;
    +  /**
    +   * Complete path where schema resides.
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private transient String schemaPath;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
           }
    -    } catch (IOException e) {
    -      DTThrowable.rethrow(e);
    -    }
    -  }
    -
    -  @Override
    -  public String processErorrTuple(String input)
    -  {
    -    return input;
    -  }
    -
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    +      errorTupleCount++;
    +      return;
         }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    +    String incomingString = new String(tuple);
    +    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple"));
    +      }
    +      errorTupleCount++;
    +      return;
         }
    +    try {
    +      if (parsedOutput.isConnected()) {
    +        csvStringReader.open(incomingString);
    +        Map<String, Object> map = csvMapReader.read(nameMapping, processors);
    +        parsedOutput.emit(map);
    +        parsedOutputCount++;
    +      }
     
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    +      if (out.isConnected() && clazz != null) {
    +        csvStringReader.open(incomingString);
    +        Object obj = csvBeanReader.read(clazz, nameMapping, processors);
    +        out.emit(obj);
    +        emittedObjectCount++;
    +      }
     
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    +    } catch (SuperCsvException e) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
    +      }
    +      errorTupleCount++;
    +      logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
    +    } catch (IOException e) {
    +      logger.error("Exception in process method {}", e.getMessage());
    +      DTThrowable.rethrow(e);
         }
    -
    -  }
    -
    -  /**
    -   * Gets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @return An array list of Fields.
    -   */
    -  public ArrayList<Field> getFields()
    -  {
    -    return fields;
    -  }
    -
    -  /**
    -   * Sets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @param fields
    -   *          An array list of Fields.
    -   */
    -  public void setFields(ArrayList<Field> fields)
    -  {
    -    this.fields = fields;
       }
     
    -  /**
    -   * Gets the delimiter which separates fields in incoming data.
    -   * 
    -   * @return fieldDelimiter
    -   */
    -  public int getFieldDelimiter()
    +  @Override
    +  public KeyValPair<String, String> processErorrTuple(byte[] input)
       {
    -    return fieldDelimiter;
    +    //This method is not invoked for CSV parser
    +    return null;
    --- End diff --
    
    Please throw UnsupportedOperationException if this is not meant to be called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48718609
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java ---
    @@ -0,0 +1,359 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.parser;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.codehaus.jackson.map.ObjectMapper;
    +import org.codehaus.jettison.json.JSONArray;
    +import org.codehaus.jettison.json.JSONException;
    +import org.codehaus.jettison.json.JSONObject;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * <p>
    + * This is schema that defines fields and their constraints for delimited files
    + * The operators use this information to validate the incoming tuples.
    + * Information from JSON schema is saved in this object and is used by the
    + * operators
    + * <p>
    + * <br>
    + * <br>
    + * Example schema <br>
    + * <br>
    + * {@code{ "separator": ",", "quoteChar":"\"", "fields": [ "name": "adId",
    + * "type": "Integer", "constraints": "required": "true" } , { "name": "adName",
    + * "type": "String", "constraints": { "required": "true", "pattern":
    + * "[a-z].*[a-z]$", "maxLength": "20" } }, { "name": "bidPrice", "type":
    + * "Double", "constraints": { "required": "true", "minValue": "0.1", "maxValue":
    + * "3.2" } }, { "name": "startDate", "type": "Date", "constraints": { "format":
    + * "dd/MM/yyyy" } }, { "name": "securityCode", "type": "Long", "constraints": {
    + * "minValue": "10", "maxValue": "30" } }, { "name": "active", "type":
    + * "Boolean", "constraints": { "required": "true" } } ] }}
    + */
    +public class DelimitedSchema
    +{
    +
    +  /**
    +   * JSON key string for separator
    +   */
    +  private static final String SEPARATOR = "separator";
    +  /**
    +   * JSON key string for quote character
    +   */
    +  private static final String QUOTE_CHAR = "quoteChar";
    +  /**
    +   * JSON key string for line delimiter
    +   */
    +  private static final String LINE_DELIMITER = "lineDelimiter";
    +  /**
    +   * JSON key string for fields array
    +   */
    +  private static final String FIELDS = "fields";
    +  /**
    +   * JSON key string for name of the field within fields array
    +   */
    +  private static final String NAME = "name";
    +  /**
    +   * JSON key string for type of the field within fields array
    +   */
    +  private static final String TYPE = "type";
    +  /**
    +   * JSON key string for constraints for each field
    +   */
    +  private static final String CONSTRAINTS = "constraints";
    +  /**
    +   * JSON key string for required constraint
    +   */
    +  public static final String REQUIRED = "required";
    +  /**
    +   * JSON key string for equals constraint
    +   */
    +  public static final String EQUALS = "equals";
    +  /**
    +   * JSON key string for length constraint
    +   */
    +  public static final String LENGTH = "length";
    +  /**
    +   * JSON key string for min length constraint
    +   */
    +  public static final String MIN_LENGTH = "minLength";
    +  /**
    +   * JSON key string for max length constraint
    +   */
    +  public static final String MAX_LENGTH = "maxLength";
    +  /**
    +   * JSON key string for min value constraint
    +   */
    +  public static final String MIN_VALUE = "minValue";
    +  /**
    +   * JSON key string for max value constraint
    +   */
    +  public static final String MAX_VALUE = "maxValue";
    +  /**
    +   * JSON key string for regex pattern constraint
    +   */
    +  public static final String REGEX_PATTERN = "pattern";
    +  /**
    +   * JSON key string for date format constraint
    +   */
    +  public static final String DATE_FORMAT = "format";
    +  /**
    +   * JSON key string for locale constraint
    +   */
    +  public static final String LOCALE = "locale";
    +  /**
    +   * JSON key string for true value constraint
    +   */
    +  public static final String TRUE_VALUE = "trueValue";
    +  /**
    +   * JSON key string for false value constraint
    +   */
    +  public static final String FALSE_VALUE = "falseValue";
    +  /**
    +   * delimiter character provided in schema. Default is ,
    +   */
    +  private int delimiterChar = ',';
    +  /**
    +   * quote character provided in schema. Default is "
    +   */
    +  private char quoteChar = '\"';
    +  /**
    +   * line delimiter character provided in schema. Default is new line character
    +   */
    +  private String lineDelimiter = "\r\n";
    +  /**
    +   * This holds the list of field names in the same order as in the schema
    +   */
    +  private List<String> fieldNames = new LinkedList<String>();
    +  /**
    +   * This holds list of {@link Field}
    +   */
    +  private List<Field> fields = new LinkedList<Field>();
    +
    +  /**
    +   * Supported data types
    +   */
    +  public enum FIELD_TYPE
    +  {
    +    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    +  };
    +
    +  public DelimitedSchema(String json)
    +  {
    +    try {
    +      initialize(json);
    +    } catch (JSONException | IOException e) {
    +      logger.error("{}", e);
    +      throw new IllegalArgumentException(e);
    +    }
    +  }
    +
    +  /**
    +   * For a given json string, this method sets the field members
    +   * 
    +   * @param json
    +   * @throws JSONException
    +   * @throws IOException
    +   */
    +  private void initialize(String json) throws JSONException, IOException
    +  {
    +    JSONObject jo = new JSONObject(json);
    +    if (jo.has(SEPARATOR)) {
    +      delimiterChar = ((String)jo.getString(SEPARATOR)).charAt(0);
    +    }
    +    if (jo.has(QUOTE_CHAR)) {
    +      quoteChar = ((String)jo.getString(QUOTE_CHAR)).charAt(0);
    +    }
    +    if (jo.has(LINE_DELIMITER)) {
    +      lineDelimiter = (String)jo.getString(LINE_DELIMITER);
    +    }
    +
    +    JSONArray fieldArray = jo.getJSONArray(FIELDS);
    +    for (int i = 0; i < fieldArray.length(); i++) {
    +      JSONObject obj = fieldArray.getJSONObject(i);
    +      Field field = new Field(obj.getString(NAME), obj.getString(TYPE));
    +      fields.add(field);
    +      fieldNames.add(field.name);
    +      if (obj.has(CONSTRAINTS)) {
    +        JSONObject constraints = obj.getJSONObject(CONSTRAINTS);
    +        field.constraints = new ObjectMapper().readValue(constraints.toString(), HashMap.class);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Get the list of field names mentioned in schema
    +   * 
    +   * @return fieldNames
    +   */
    +  public List<String> getFieldNames()
    +  {
    +    return Collections.unmodifiableList(fieldNames);
    +  }
    +
    +  /**
    +   * Get the delimiter character
    +   * 
    +   * @return delimiterChar
    +   */
    +  public int getDelimiterChar()
    +  {
    +    return delimiterChar;
    +  }
    +
    +  /**
    +   * Get the quoteChar
    +   * 
    +   * @return quoteChar
    +   */
    +  public char getQuoteChar()
    +  {
    +    return quoteChar;
    +  }
    +
    +  /**
    +   * Get the line delimiter
    +   * 
    +   * @return lineDelimiter
    +   */
    +  public String getLineDelimiter()
    +  {
    +    return lineDelimiter;
    +  }
    +
    +  @Override
    +  public String toString()
    +  {
    +    return "DelimitedSchema [delimiterChar=" + delimiterChar + ", quoteChar=" + quoteChar + ", lineDelimiter="
    +        + lineDelimiter + ", fieldNames=" + fieldNames + ", fields=" + fields + "]";
    +  }
    +
    +  /**
    +   * Get the list of Fields.
    +   * 
    +   * @return fields
    +   */
    +  public List<Field> getFields()
    +  {
    +    return Collections.unmodifiableList(fields);
    +  }
    +
    +  /**
    +   * Objects of this class represents a particular field in the schema. Each
    +   * field has a name, type and a set of associated constraints.
    +   * 
    +   */
    +  public class Field
    +  {
    +    /**
    +     * name of the field
    +     */
    +    String name;
    +    /**
    +     * Data type of the field
    +     */
    +    FIELD_TYPE type;
    +    /**
    +     * constraints associated with the field
    +     */
    +    Map<String, Object> constraints = new HashMap<String, Object>();
    +
    +    public Field(String name, String type)
    +    {
    +      this.name = name;
    +      this.type = FIELD_TYPE.valueOf(type.toUpperCase());
    +    }
    +
    +    /**
    +     * Get the name of the field
    +     * 
    +     * @return name
    +     */
    +    public String getName()
    +    {
    +      return name;
    +    }
    +
    +    /**
    +     * Set the name of the field
    +     * 
    +     * @param name
    +     */
    +    public void setName(String name)
    +    {
    +      this.name = name;
    +    }
    +
    +    /**
    +     * Get {@link FIELD_TYPE}
    +     * 
    +     * @return type
    +     */
    +    public FIELD_TYPE getType()
    +    {
    +      return type;
    +    }
    +
    +    /**
    +     * Set {@link FIELD_TYPE}
    +     * 
    +     * @param type
    +     */
    +    public void setType(FIELD_TYPE type)
    +    {
    +      this.type = type;
    +    }
    +
    +    /**
    +     * Get the map of constraints associated with the field
    +     * 
    +     * @return constraints
    +     */
    +    public Map<String, Object> getConstraints()
    +    {
    +      return constraints;
    +    }
    +
    +    /**
    +     * Sets the map of constraints associated with the field
    +     * 
    +     * @param constraints
    +     */
    +    public void setConstraints(Map<String, Object> constraints)
    +    {
    +      this.constraints = constraints;
    +    }
    --- End diff --
    
    Currently don't see the need . :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48875507
  
    --- Diff: contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java ---
    @@ -20,169 +20,425 @@
     
     import java.util.Date;
     
    -import org.joda.time.DateTime;
    +import org.junit.After;
     import org.junit.Assert;
    -import org.junit.Rule;
    +import org.junit.Before;
     import org.junit.Test;
    -import org.junit.rules.TestWatcher;
    -import org.junit.runner.Description;
     
    +import com.datatorrent.lib.appdata.schemas.SchemaUtils;
     import com.datatorrent.lib.testbench.CollectorTestSink;
    -import com.datatorrent.lib.util.TestUtils;
     
     public class CsvPOJOParserTest
     {
     
    -  CsvParser operator;
    -  CollectorTestSink<Object> validDataSink;
    -  CollectorTestSink<String> invalidDataSink;
    +  private static final String filename = "schema.json";
    +  CollectorTestSink<Object> error = new CollectorTestSink<Object>();
    +  CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>();
    +  CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
    +  CsvParser parser = new CsvParser();
     
    -  @Rule
    -  public Watcher watcher = new Watcher();
    +  /*
    +  * adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,isActive,isOptimized,parentCampaign,weatherTargeted,valid
    +  * 1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes
    +  * Constraints are defined in schema.json
    +  */
     
    -  public class Watcher extends TestWatcher
    +  @Before
    +  public void setup()
    --- End diff --
    
    If we can use then I think we should use the existing code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705868
  
    --- Diff: contrib/src/test/resources/schema.json ---
    @@ -0,0 +1,96 @@
    +{
    +    "separator": ",",
    +    "quoteChar":"\"",
    +    "fields": [
    +        {
    +            "name": "adId",
    +            "type": "Integer",
    +            "constraints": {
    +                "required": "true"
    +            }
    +        },
    +        {
    +            "name": "campaignId",
    +            "type": "Integer",
    +            "constraints": {
    +                "equals": "98233"
    +            }
    +        },
    +        {
    +            "name": "adName",
    +            "type": "String",
    +            "constraints": {
    +                "required": "true",
    +                "pattern": "[a-z].*[a-z]$",
    +                "maxLength": "10"
    +            }
    +        },
    +        {
    +            "name": "bidPrice",
    +            "type": "Double",
    +            "constraints": {
    +                "required": "true",
    +                "minValue": "0.1",
    +                "maxValue": "3.2"
    +            }
    +        },
    +        {
    +            "name": "startDate",
    +            "type": "Date",
    +            "constraints": {
    +                "format": "yyyy-MM-dd HH:mm:ss",
    +                "locale":"IN"
    +            }
    +        },
    +        {
    +            "name": "endDate",
    +            "type": "Date",
    +            "constraints": {
    +                "format": "dd/MM/yyyy"
    +            }
    +        },
    +        {
    +            "name": "securityCode",
    +            "type": "Long",
    +            "constraints": {
    +                "minValue": "10",
    +                "maxValue": "30"
    +            }
    +        },
    +        {
    +            "name": "active",
    +            "type": "Boolean",
    +            "constraints": {
    +                "required": "true"                
    +            }
    +        },
    +        {
    +            "name": "optimized",
    +            "type": "Boolean",
    +            "constraints": {                  
    +                "trueValue":"OPTIMIZE",
    +                "falseValue":"NO_OPTIMIZE"                
    +            }
    +        },
    +        {
    +            "name": "parentCampaign",
    +            "type": "String",
    +            "constraints": {
    +                "required": "true",
    +                "equals": "CAMP_AD"
    +            }
    +        },
    +        {
    +            "name": "weatherTargeted",
    +            "type": "Character",
    +            "constraints": {
    +                "required": "true",
    +                "equals": "Y"
    +            }
    +        },
    +        {
    +            "name": "valid",
    +            "type": "String"            
    +        }
    +    ]
    +}
    --- End diff --
    
    New line missing at the end of the file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48803433
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Contents of the schema
    +   */
    +  private String schema;
    +  /**
    +   * Complete path where schema resides.
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private transient String schemaPath;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
           }
    -    } catch (IOException e) {
    -      DTThrowable.rethrow(e);
    -    }
    -  }
    -
    -  @Override
    -  public String processErorrTuple(String input)
    -  {
    -    return input;
    -  }
    -
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    +      errorTupleCount++;
    +      return;
         }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    +    String incomingString = new String(tuple);
    +    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple"));
    +      }
    +      errorTupleCount++;
    +      return;
         }
    +    try {
    +      if (parsedOutput.isConnected()) {
    +        csvStringReader.open(incomingString);
    +        Map<String, Object> map = csvMapReader.read(nameMapping, processors);
    +        parsedOutput.emit(map);
    +        parsedOutputCount++;
    +      }
     
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    +      if (out.isConnected() && clazz != null) {
    +        csvStringReader.open(incomingString);
    +        Object obj = csvBeanReader.read(clazz, nameMapping, processors);
    +        out.emit(obj);
    +        emittedObjectCount++;
    +      }
     
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    +    } catch (SuperCsvException e) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
    +      }
    +      errorTupleCount++;
    +      logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
    +    } catch (IOException e) {
    +      logger.error("Exception in process method {}", e.getMessage());
    +      DTThrowable.rethrow(e);
         }
    -
    -  }
    -
    -  /**
    -   * Gets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @return An array list of Fields.
    -   */
    -  public ArrayList<Field> getFields()
    -  {
    -    return fields;
    -  }
    -
    -  /**
    -   * Sets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @param fields
    -   *          An array list of Fields.
    -   */
    -  public void setFields(ArrayList<Field> fields)
    -  {
    -    this.fields = fields;
       }
     
    -  /**
    -   * Gets the delimiter which separates fields in incoming data.
    -   * 
    -   * @return fieldDelimiter
    -   */
    -  public int getFieldDelimiter()
    +  @Override
    +  public KeyValPair<String, String> processErorrTuple(byte[] input)
       {
    -    return fieldDelimiter;
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       /**
    -   * Sets the delimiter which separates fields in incoming data.
    -   * 
    -   * @param fieldDelimiter
    +   * Returns array of cellprocessors, one for each field
        */
    -  public void setFieldDelimiter(int fieldDelimiter)
    +  private CellProcessor[] getProcessor(List<Field> fields)
       {
    -    this.fieldDelimiter = fieldDelimiter;
    +    CellProcessor[] processor = new CellProcessor[fields.size()];
    +    int fieldCount = 0;
    +    for (Field field : fields) {
    +      processor[fieldCount++] = CellProcessorBuilder.getCellProcessor(field.getType(), field.getConstraints());
    +    }
    +    return processor;
       }
     
    -  /**
    -   * Gets the delimiter which separates lines in incoming data.
    -   * 
    -   * @return lineDelimiter
    -   */
    -  public String getLineDelimiter()
    +  @Override
    +  public void teardown()
       {
    -    return lineDelimiter;
    +    try {
    +      csvMapReader.close();
    +    } catch (IOException e) {
    +      logger.error("Error while closing csv map reader {}", e.getMessage());
    +    }
    +    try {
    +      csvBeanReader.close();
    +    } catch (IOException e) {
    +      logger.error("Error while closing csv bean reader {}", e.getMessage());
    +    }
       }
     
       /**
    -   * Sets the delimiter which separates line in incoming data.
    +   * Complete hdfs path of schema
        * 
    -   * @param lineDelimiter
    +   * @return
        */
    -  public void setLineDelimiter(String lineDelimiter)
    +  public String getSchemaPath()
       {
    -    this.lineDelimiter = lineDelimiter;
    +    return schemaPath;
       }
     
       /**
    -   * Gets the name of the fields with type and format ( for date ) as comma
    -   * separated string in same order as incoming data. e.g
    -   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
    +   * Set path of schema
        * 
    -   * @return fieldInfo
    +   * @param schemaPath
    +   *          path of the schema file in hdfs
        */
    -  public String getFieldInfo()
    +  public void setSchemaPath(String schemaPath)
       {
    -    return fieldInfo;
    +    this.schemaPath = schemaPath;
    +    try {
    +      this.schema = FileUtils.readFromHDFS(schemaPath);
    +    } catch (IOException e) {
    +      DTThrowable.rethrow(e);
    +    }
       }
     
       /**
    -   * Sets the name of the fields with type and format ( for date ) as comma
    -   * separated string in same order as incoming data. e.g
    -   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
    -   * 
    -   * @param fieldInfo
    +   * output port to emit validate records as map
        */
    -  public void setFieldInfo(String fieldInfo)
    -  {
    -    this.fieldInfo = fieldInfo;
    -  }
    -
    -  private static final Logger logger = LoggerFactory.getLogger(CsvParser.class);
    +  public final transient DefaultOutputPort<Map<String, Object>> parsedOutput = new DefaultOutputPort<Map<String, Object>>();
    +  private static final Logger logger = LoggerFactory.getLogger(Parser.class);
    --- End diff --
    
    LoggerFactory.getLogger(CsvParser.class)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48877848
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +69,178 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    -
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
    +  /**
    +   * Contents of the schema
    --- End diff --
    
    can you add example for schema?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48836388
  
    --- Diff: contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java ---
    @@ -20,169 +20,425 @@
     
     import java.util.Date;
     
    -import org.joda.time.DateTime;
    +import org.junit.After;
     import org.junit.Assert;
    -import org.junit.Rule;
    +import org.junit.Before;
     import org.junit.Test;
    -import org.junit.rules.TestWatcher;
    -import org.junit.runner.Description;
     
    +import com.datatorrent.lib.appdata.schemas.SchemaUtils;
     import com.datatorrent.lib.testbench.CollectorTestSink;
    -import com.datatorrent.lib.util.TestUtils;
     
     public class CsvPOJOParserTest
     {
     
    -  CsvParser operator;
    -  CollectorTestSink<Object> validDataSink;
    -  CollectorTestSink<String> invalidDataSink;
    +  private static final String filename = "schema.json";
    +  CollectorTestSink<Object> error = new CollectorTestSink<Object>();
    +  CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>();
    +  CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
    +  CsvParser parser = new CsvParser();
     
    -  @Rule
    -  public Watcher watcher = new Watcher();
    +  /*
    +  * adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,isActive,isOptimized,parentCampaign,weatherTargeted,valid
    +  * 1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes
    +  * Constraints are defined in schema.json
    +  */
     
    -  public class Watcher extends TestWatcher
    +  @Before
    +  public void setup()
    --- End diff --
    
    It can be. I used the same code that i had already written for enhanced parser.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/154


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48814776
  
    --- Diff: contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java ---
    @@ -20,169 +20,425 @@
     
     import java.util.Date;
     
    -import org.joda.time.DateTime;
    +import org.junit.After;
     import org.junit.Assert;
    -import org.junit.Rule;
    +import org.junit.Before;
     import org.junit.Test;
    -import org.junit.rules.TestWatcher;
    -import org.junit.runner.Description;
     
    +import com.datatorrent.lib.appdata.schemas.SchemaUtils;
     import com.datatorrent.lib.testbench.CollectorTestSink;
    -import com.datatorrent.lib.util.TestUtils;
     
     public class CsvPOJOParserTest
     {
     
    -  CsvParser operator;
    -  CollectorTestSink<Object> validDataSink;
    -  CollectorTestSink<String> invalidDataSink;
    +  private static final String filename = "schema.json";
    +  CollectorTestSink<Object> error = new CollectorTestSink<Object>();
    +  CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>();
    +  CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
    +  CsvParser parser = new CsvParser();
     
    -  @Rule
    -  public Watcher watcher = new Watcher();
    +  /*
    +  * adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,isActive,isOptimized,parentCampaign,weatherTargeted,valid
    +  * 1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes
    +  * Constraints are defined in schema.json
    +  */
     
    -  public class Watcher extends TestWatcher
    +  @Before
    +  public void setup()
       {
    +    parser.err.setSink(error);
    +    parser.parsedOutput.setSink(objectPort);
    +    parser.out.setSink(pojoPort);
    +    parser.setClazz(Ad.class);
    +    parser.setSchema(SchemaUtils.jarResourceFileToString(filename));
    +    parser.setup(null);
    +  }
     
    -    @Override
    -    protected void starting(Description description)
    -    {
    -      super.starting(description);
    -      operator = new CsvParser();
    -      operator.setClazz(EmployeeBean.class);
    -      operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date");
    -      validDataSink = new CollectorTestSink<Object>();
    -      invalidDataSink = new CollectorTestSink<String>();
    -      TestUtils.setSink(operator.out, validDataSink);
    -      TestUtils.setSink(operator.err, invalidDataSink);
    -    }
    +  @After
    +  public void tearDown()
    +  {
    +    error.clear();
    +    objectPort.clear();
    +    pojoPort.clear();
    +  }
     
    -    @Override
    -    protected void finished(Description description)
    -    {
    -      super.finished(description);
    -      operator.teardown();
    -    }
    +  @Test
    +  public void TestParserValidInput()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(1, objectPort.collectedTuples.size());
    +    Assert.assertEquals(1, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(0, error.collectedTuples.size());
    +    Object obj = pojoPort.collectedTuples.get(0);
    +    Ad adPojo = (Ad)obj;
    +    Assert.assertNotNull(obj);
    +    Assert.assertEquals(Ad.class, obj.getClass());
    +    Assert.assertEquals(1234, adPojo.getAdId());
    +    Assert.assertTrue("adxyz".equals(adPojo.getAdName()));
    +    Assert.assertEquals(0.2, adPojo.getBidPrice(), 0.0);
    +    Assert.assertEquals(Date.class, adPojo.getStartDate().getClass());
    +    Assert.assertEquals(Date.class, adPojo.getEndDate().getClass());
    +    Assert.assertEquals(12, adPojo.getSecurityCode());
    +    Assert.assertTrue("CAMP_AD".equals(adPojo.getParentCampaign()));
    +    Assert.assertTrue(adPojo.isActive());
    +    Assert.assertFalse(adPojo.isOptimized());
    +    Assert.assertTrue("yes".equals(adPojo.getValid()));
    +  }
     
    +  @Test
    +  public void TestParserValidInputPojoPortNotConnected()
    +  {
    +    parser.out.setSink(null);
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(1, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(0, error.collectedTuples.size());
       }
     
       @Test
    -  public void testCsvToPojoWriterDefault()
    +  public void TestParserValidInputClassNameNotProvided()
       {
    -    operator.setup(null);
    -    String tuple = "john,cs,1,01/01/2015";
    -    operator.in.process(tuple);
    -    Assert.assertEquals(1, validDataSink.collectedTuples.size());
    -    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
    -    Object obj = validDataSink.collectedTuples.get(0);
    -    Assert.assertNotNull(obj);
    -    Assert.assertEquals(EmployeeBean.class, obj.getClass());
    -    EmployeeBean pojo = (EmployeeBean)obj;
    -    Assert.assertEquals("john", pojo.getName());
    -    Assert.assertEquals("cs", pojo.getDept());
    -    Assert.assertEquals(1, pojo.getEid());
    -    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
    -        pojo.getDateOfJoining()));
    +    parser.setClazz(null);
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(1, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(0, error.collectedTuples.size());
       }
     
       @Test
    -  public void testCsvToPojoWriterDateFormat()
    +  public void TestParserInvalidAdIdInput()
       {
    -    operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy");
    -    operator.setup(null);
    -    String tuple = "john,cs,1,01-JAN-2015";
    -    operator.in.process(tuple);
    -    Assert.assertEquals(1, validDataSink.collectedTuples.size());
    -    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
    -    Object obj = validDataSink.collectedTuples.get(0);
    +    String input = ",98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserNoCampaignIdInput()
    +  {
    +    String input = "1234,,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(1, objectPort.collectedTuples.size());
    +    Assert.assertEquals(1, pojoPort.collectedTuples.size());
    +    Object obj = pojoPort.collectedTuples.get(0);
         Assert.assertNotNull(obj);
    -    Assert.assertEquals(EmployeeBean.class, obj.getClass());
    -    EmployeeBean pojo = (EmployeeBean)obj;
    -    Assert.assertEquals("john", pojo.getName());
    -    Assert.assertEquals("cs", pojo.getDept());
    -    Assert.assertEquals(1, pojo.getEid());
    -    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
    -        pojo.getDateOfJoining()));
    +    Assert.assertEquals(Ad.class, obj.getClass());
    +    Assert.assertEquals(0, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidCampaignIdInput()
    +  {
    +    String input = "1234,9833,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
       }
     
       @Test
    -  public void testCsvToPojoWriterDateFormatMultiple()
    +  public void TestParserInvalidAdNameInput()
       {
    -    operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy,dateOfBirth:date");
    -    operator.setup(null);
    -    String tuple = "john,cs,1,01-JAN-2015,01/01/2015";
    -    operator.in.process(tuple);
    -    Assert.assertEquals(1, validDataSink.collectedTuples.size());
    -    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
    -    Object obj = validDataSink.collectedTuples.get(0);
    +    String input = "1234,98233,adxyz123,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidBidPriceInput()
    +  {
    +    String input = "1234,98233,adxyz,3.3,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidStartDateInput()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-30-08 02:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidSecurityCodeInput()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,85,y,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidisActiveInput()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,yo,,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInvalidParentCampaignInput()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserValidisOptimized()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(1, objectPort.collectedTuples.size());
    +    Assert.assertEquals(1, pojoPort.collectedTuples.size());
    +    Object obj = pojoPort.collectedTuples.get(0);
         Assert.assertNotNull(obj);
    -    Assert.assertEquals(EmployeeBean.class, obj.getClass());
    -    EmployeeBean pojo = (EmployeeBean)obj;
    -    Assert.assertEquals("john", pojo.getName());
    -    Assert.assertEquals("cs", pojo.getDept());
    -    Assert.assertEquals(1, pojo.getEid());
    -    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
    -        pojo.getDateOfJoining()));
    -    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
    -        pojo.getDateOfBirth()));
    +    Assert.assertEquals(Ad.class, obj.getClass());
    +    Assert.assertEquals(0, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInValidisOptimized()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION,CAMP,Y,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserInValidWeatherTargeting()
    +  {
    +    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,NO,yes";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserNullOrBlankInput()
    +  {
    +    parser.in.process(null);
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserHeaderAsInput()
    +  {
    +    String input = "adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,active,optimized,parentCampaign,weatherTargeted,valid";
    +    parser.in.process(input.getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserLessFields()
    +  {
    +    parser.in.process("1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION".getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
    +  }
    +
    +  @Test
    +  public void TestParserMoreFields()
    +  {
    +    parser.in.process("1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION,CAMP_AD,Y,yes,ExtraField"
    +        .getBytes());
    +    parser.teardown();
    +    Assert.assertEquals(0, objectPort.collectedTuples.size());
    +    Assert.assertEquals(0, pojoPort.collectedTuples.size());
    +    Assert.assertEquals(1, error.collectedTuples.size());
       }
     
    -  public static class EmployeeBean
    +  public static class Ad
    --- End diff --
    
    I could have , however the schema  and unit tests already existed for Ad class, hence used those.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48796308
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +69,177 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    -
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
    +  /**
    +   * Contents of the schema
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private String schema;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    throw new UnsupportedOperationException("Not supported");
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
           }
    -    } catch (IOException e) {
    -      DTThrowable.rethrow(e);
    +      errorTupleCount++;
    +      return;
         }
    -  }
    -
    -  @Override
    -  public String processErorrTuple(String input)
    -  {
    -    return input;
    -  }
    -
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    -    }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    +    String incomingString = new String(tuple);
    +    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple"));
    +      }
    +      errorTupleCount++;
    +      return;
         }
    +    try {
    +      if (parsedOutput.isConnected()) {
    +        csvStringReader.open(incomingString);
    +        Map<String, Object> map = csvMapReader.read(nameMapping, processors);
    +        parsedOutput.emit(map);
    +        parsedOutputCount++;
    +      }
     
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    +      if (out.isConnected() && clazz != null) {
    +        csvStringReader.open(incomingString);
    +        Object obj = csvBeanReader.read(clazz, nameMapping, processors);
    +        out.emit(obj);
    +        emittedObjectCount++;
    +      }
     
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    +    } catch (SuperCsvException | IOException e) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
    +      }
    +      errorTupleCount++;
    +      logger.error("Tuple could not be parsed. Reason {} Stacktrace {}", e.getMessage(), e.getStackTrace());
    --- End diff --
    
    simply do `logger.error("Tuple could not be parsed. Exception: ", e);`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705496
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Contents of the schema
    +   */
    +  private String schema;
    +  /**
    +   * Complete path where schema resides.
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private transient String schemaPath;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
           }
    -    } catch (IOException e) {
    -      DTThrowable.rethrow(e);
    -    }
    -  }
    -
    -  @Override
    -  public String processErorrTuple(String input)
    -  {
    -    return input;
    -  }
    -
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    +      errorTupleCount++;
    +      return;
         }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    +    String incomingString = new String(tuple);
    +    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple"));
    +      }
    +      errorTupleCount++;
    +      return;
         }
    +    try {
    +      if (parsedOutput.isConnected()) {
    +        csvStringReader.open(incomingString);
    +        Map<String, Object> map = csvMapReader.read(nameMapping, processors);
    +        parsedOutput.emit(map);
    +        parsedOutputCount++;
    +      }
     
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    +      if (out.isConnected() && clazz != null) {
    +        csvStringReader.open(incomingString);
    +        Object obj = csvBeanReader.read(clazz, nameMapping, processors);
    +        out.emit(obj);
    +        emittedObjectCount++;
    +      }
     
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    +    } catch (SuperCsvException e) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
    +      }
    +      errorTupleCount++;
    +      logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
    +    } catch (IOException e) {
    +      logger.error("Exception in process method {}", e.getMessage());
    +      DTThrowable.rethrow(e);
         }
    -
    -  }
    -
    -  /**
    -   * Gets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @return An array list of Fields.
    -   */
    -  public ArrayList<Field> getFields()
    -  {
    -    return fields;
    -  }
    -
    -  /**
    -   * Sets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @param fields
    -   *          An array list of Fields.
    -   */
    -  public void setFields(ArrayList<Field> fields)
    -  {
    -    this.fields = fields;
       }
     
    -  /**
    -   * Gets the delimiter which separates fields in incoming data.
    -   * 
    -   * @return fieldDelimiter
    -   */
    -  public int getFieldDelimiter()
    +  @Override
    +  public KeyValPair<String, String> processErorrTuple(byte[] input)
       {
    -    return fieldDelimiter;
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       /**
    -   * Sets the delimiter which separates fields in incoming data.
    -   * 
    -   * @param fieldDelimiter
    +   * Returns array of cellprocessors, one for each field
        */
    -  public void setFieldDelimiter(int fieldDelimiter)
    +  private CellProcessor[] getProcessor(List<Field> fields)
       {
    -    this.fieldDelimiter = fieldDelimiter;
    +    CellProcessor[] processor = new CellProcessor[fields.size()];
    +    int fieldCount = 0;
    +    for (Field field : fields) {
    +      processor[fieldCount++] = CellProcessorBuilder.getCellProcessor(field.getType(), field.getConstraints());
    +    }
    +    return processor;
       }
     
    -  /**
    -   * Gets the delimiter which separates lines in incoming data.
    -   * 
    -   * @return lineDelimiter
    -   */
    -  public String getLineDelimiter()
    +  @Override
    +  public void teardown()
       {
    -    return lineDelimiter;
    +    try {
    +      csvMapReader.close();
    +    } catch (IOException e) {
    +      logger.error("Error while closing csv map reader {}", e.getMessage());
    +    }
    +    try {
    +      csvBeanReader.close();
    +    } catch (IOException e) {
    +      logger.error("Error while closing csv bean reader {}", e.getMessage());
    +    }
       }
     
       /**
    -   * Sets the delimiter which separates line in incoming data.
    +   * Complete hdfs path of schema
        * 
    -   * @param lineDelimiter
    +   * @return
        */
    -  public void setLineDelimiter(String lineDelimiter)
    +  public String getSchemaPath()
       {
    -    this.lineDelimiter = lineDelimiter;
    +    return schemaPath;
       }
     
       /**
    -   * Gets the name of the fields with type and format ( for date ) as comma
    -   * separated string in same order as incoming data. e.g
    -   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
    +   * Set path of schema
        * 
    -   * @return fieldInfo
    +   * @param schemaPath
    +   *          path of the schema file in hdfs
        */
    -  public String getFieldInfo()
    +  public void setSchemaPath(String schemaPath)
       {
    -    return fieldInfo;
    +    this.schemaPath = schemaPath;
    +    try {
    +      this.schema = FileUtils.readFromHDFS(schemaPath);
    +    } catch (IOException e) {
    +      DTThrowable.rethrow(e);
    +    }
       }
     
       /**
    -   * Sets the name of the fields with type and format ( for date ) as comma
    -   * separated string in same order as incoming data. e.g
    -   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
    -   * 
    -   * @param fieldInfo
    +   * output port to emit validate records as map
        */
    -  public void setFieldInfo(String fieldInfo)
    -  {
    -    this.fieldInfo = fieldInfo;
    -  }
    -
    -  private static final Logger logger = LoggerFactory.getLogger(CsvParser.class);
    +  public final transient DefaultOutputPort<Map<String, Object>> parsedOutput = new DefaultOutputPort<Map<String, Object>>();
    +  private static final Logger logger = LoggerFactory.getLogger(Parser.class);
    --- End diff --
    
    Not sure if there is any convention here.. But I have seen most of the files declaring the variables are the top of the file. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by gauravgopi123 <gi...@git.apache.org>.
Github user gauravgopi123 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48877026
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +69,178 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    -
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
    +  /**
    +   * Contents of the schema
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private String schema;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    super.beginWindow(windowId);
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    throw new UnsupportedOperationException("Not supported");
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
           }
    -    } catch (IOException e) {
    -      DTThrowable.rethrow(e);
    +      errorTupleCount++;
    +      return;
         }
    -  }
    -
    -  @Override
    -  public String processErorrTuple(String input)
    -  {
    -    return input;
    -  }
    -
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    -    }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    +    String incomingString = new String(tuple);
    +    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple"));
    +      }
    +      errorTupleCount++;
    +      return;
         }
    +    try {
    +      if (parsedOutput.isConnected()) {
    +        csvStringReader.open(incomingString);
    +        Map<String, Object> map = csvMapReader.read(nameMapping, processors);
    +        parsedOutput.emit(map);
    +        parsedOutputCount++;
    +      }
     
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    +      if (out.isConnected() && clazz != null) {
    +        csvStringReader.open(incomingString);
    +        Object obj = csvBeanReader.read(clazz, nameMapping, processors);
    +        out.emit(obj);
    +        emittedObjectCount++;
    +      }
     
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    +    } catch (SuperCsvException | IOException e) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
    +      }
    +      errorTupleCount++;
    +      logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
         }
    -
    -  }
    -
    -  /**
    -   * Gets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @return An array list of Fields.
    -   */
    -  public ArrayList<Field> getFields()
    -  {
    -    return fields;
       }
     
    -  /**
    -   * Sets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @param fields
    -   *          An array list of Fields.
    -   */
    -  public void setFields(ArrayList<Field> fields)
    -  {
    -    this.fields = fields;
    -  }
    -
    -  /**
    -   * Gets the delimiter which separates fields in incoming data.
    -   * 
    -   * @return fieldDelimiter
    -   */
    -  public int getFieldDelimiter()
    +  @Override
    +  public KeyValPair<String, String> processErrorTuple(byte[] input)
       {
    -    return fieldDelimiter;
    +    throw new UnsupportedOperationException("Not supported");
       }
     
       /**
    -   * Sets the delimiter which separates fields in incoming data.
    -   * 
    -   * @param fieldDelimiter
    +   * Returns array of cellprocessors, one for each field
        */
    -  public void setFieldDelimiter(int fieldDelimiter)
    +  private CellProcessor[] getProcessor(List<Field> fields)
       {
    -    this.fieldDelimiter = fieldDelimiter;
    +    CellProcessor[] processor = new CellProcessor[fields.size()];
    +    int fieldCount = 0;
    +    for (Field field : fields) {
    +      processor[fieldCount++] = CellProcessorBuilder.getCellProcessor(field.getType(), field.getConstraints());
    +    }
    +    return processor;
       }
     
    -  /**
    -   * Gets the delimiter which separates lines in incoming data.
    -   * 
    -   * @return lineDelimiter
    -   */
    -  public String getLineDelimiter()
    +  @Override
    +  public void teardown()
       {
    -    return lineDelimiter;
    +    try {
    +      csvMapReader.close();
    +    } catch (IOException e) {
    +      logger.error("Error while closing csv map reader {}", e.getMessage());
    +    }
    +    try {
    +      csvBeanReader.close();
    +    } catch (IOException e) {
    +      logger.error("Error while closing csv bean reader {}", e.getMessage());
    --- End diff --
    
    It should throw exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by shubham-pathak22 <gi...@git.apache.org>.
Github user shubham-pathak22 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48715664
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Contents of the schema
    +   */
    +  private String schema;
    +  /**
    +   * Complete path where schema resides.
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private transient String schemaPath;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
           }
    -    } catch (IOException e) {
    -      DTThrowable.rethrow(e);
    -    }
    -  }
    -
    -  @Override
    -  public String processErorrTuple(String input)
    -  {
    -    return input;
    -  }
    -
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    +      errorTupleCount++;
    +      return;
         }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    +    String incomingString = new String(tuple);
    +    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple"));
    +      }
    +      errorTupleCount++;
    +      return;
         }
    +    try {
    +      if (parsedOutput.isConnected()) {
    +        csvStringReader.open(incomingString);
    +        Map<String, Object> map = csvMapReader.read(nameMapping, processors);
    +        parsedOutput.emit(map);
    +        parsedOutputCount++;
    +      }
     
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    +      if (out.isConnected() && clazz != null) {
    +        csvStringReader.open(incomingString);
    +        Object obj = csvBeanReader.read(clazz, nameMapping, processors);
    +        out.emit(obj);
    +        emittedObjectCount++;
    +      }
     
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    +    } catch (SuperCsvException e) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
    +      }
    +      errorTupleCount++;
    +      logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
    +    } catch (IOException e) {
    +      logger.error("Exception in process method {}", e.getMessage());
    +      DTThrowable.rethrow(e);
         }
    -
    -  }
    -
    -  /**
    -   * Gets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @return An array list of Fields.
    -   */
    -  public ArrayList<Field> getFields()
    -  {
    -    return fields;
    -  }
    -
    -  /**
    -   * Sets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @param fields
    -   *          An array list of Fields.
    -   */
    -  public void setFields(ArrayList<Field> fields)
    -  {
    -    this.fields = fields;
       }
     
    -  /**
    -   * Gets the delimiter which separates fields in incoming data.
    -   * 
    -   * @return fieldDelimiter
    -   */
    -  public int getFieldDelimiter()
    +  @Override
    +  public KeyValPair<String, String> processErorrTuple(byte[] input)
    --- End diff --
    
    Good catch. Will have to fix this in parent class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705421
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Contents of the schema
    +   */
    +  private String schema;
    +  /**
    +   * Complete path where schema resides.
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private transient String schemaPath;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
    --- End diff --
    
    Lot of code duplicate here.
    
    Can you please create 2 methods, emitError and emitSuccess... Let these 2 methods emit on respective ports and also do other activities like increment the counter. This way the tasks are associated and never missed in any future code changes. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48705719
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---
    @@ -62,247 +71,193 @@
      * @since 3.2.0
      */
     @InterfaceStability.Evolving
    -public class CsvParser extends Parser<String, String>
    +public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
     {
    +  /**
    +   * Map Reader to read delimited records
    +   */
    +  private transient CsvMapReader csvMapReader;
    +  /**
    +   * Bean Reader to read delimited records
    +   */
    +  private transient CsvBeanReader csvBeanReader;
    +  /**
    +   * Reader used by csvMapReader and csvBeanReader
    +   */
    +  private transient ReusableStringReader csvStringReader;
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    -
    +  /**
    +   * Contents of the schema
    +   */
    +  private String schema;
    +  /**
    +   * Complete path where schema resides.
    +   */
       @NotNull
    -  protected String fieldInfo;
    -
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  private transient CsvBeanReader csvReader;
    -
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  private transient String schemaPath;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Names of all the fields in the same order of incoming records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * header-this will be delimiter separated string of field names
    +   */
    +  private transient String header;
    +  /**
    +   * Reading preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    -  @NotNull
    -  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
    +  @AutoMetric
    +  long parsedOutputCount;
     
    -  public CsvParser()
    +  @Override
    +  public void beginWindow(long windowId)
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +    parsedOutputCount = 0;
       }
     
       @Override
       public void setup(OperatorContext context)
       {
    -    super.setup(context);
    -
    -    logger.info("field info {}", fieldInfo);
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -
    -    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    csvReader = new CsvBeanReader(csvStringReader, preference);
    -    int countKeyValue = getFields().size();
    -    logger.info("countKeyValue {}", countKeyValue);
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames().toArray(
    +        new String[delimitedParserSchema.getFieldNames().size()]);
    +    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
    +    processors = getProcessor(delimitedParserSchema.getFields());
    +    csvStringReader = new ReusableStringReader();
    +    csvMapReader = new CsvMapReader(csvStringReader, preference);
    +    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    -  {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DOUBLE) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.INTEGER) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.FLOAT) {
    -        processors[i] = new Optional(new ParseDouble());
    -      } else if (type == FIELD_TYPE.LONG) {
    -        processors[i] = new Optional(new ParseLong());
    -      } else if (type == FIELD_TYPE.SHORT) {
    -        processors[i] = new Optional(new ParseInt());
    -      } else if (type == FIELD_TYPE.STRING) {
    -        processors[i] = new Optional();
    -      } else if (type == FIELD_TYPE.CHARACTER) {
    -        processors[i] = new Optional(new ParseChar());
    -      } else if (type == FIELD_TYPE.BOOLEAN) {
    -        processors[i] = new Optional(new ParseBool());
    -      } else if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    -      }
    -    }
    -  }
       @Override
    -  public Object convert(String tuple)
    +  public Object convert(byte[] tuple)
       {
    -    try {
    -      csvStringReader.open(tuple);
    -      return csvReader.read(clazz, nameMapping, processors);
    -    } catch (IOException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    -      return null;
    -    }
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       @Override
    -  public void teardown()
    +  public void processTuple(byte[] tuple)
       {
    -    try {
    -      if (csvReader != null) {
    -        csvReader.close();
    +    if (tuple == null) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
           }
    -    } catch (IOException e) {
    -      DTThrowable.rethrow(e);
    -    }
    -  }
    -
    -  @Override
    -  public String processErorrTuple(String input)
    -  {
    -    return input;
    -  }
    -
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    +      errorTupleCount++;
    +      return;
         }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    +    String incomingString = new String(tuple);
    +    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple"));
    +      }
    +      errorTupleCount++;
    +      return;
         }
    +    try {
    +      if (parsedOutput.isConnected()) {
    +        csvStringReader.open(incomingString);
    +        Map<String, Object> map = csvMapReader.read(nameMapping, processors);
    +        parsedOutput.emit(map);
    +        parsedOutputCount++;
    +      }
     
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    +      if (out.isConnected() && clazz != null) {
    +        csvStringReader.open(incomingString);
    +        Object obj = csvBeanReader.read(clazz, nameMapping, processors);
    +        out.emit(obj);
    +        emittedObjectCount++;
    +      }
     
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    +    } catch (SuperCsvException e) {
    +      if (err.isConnected()) {
    +        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
    +      }
    +      errorTupleCount++;
    +      logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
    +    } catch (IOException e) {
    +      logger.error("Exception in process method {}", e.getMessage());
    +      DTThrowable.rethrow(e);
         }
    -
    -  }
    -
    -  /**
    -   * Gets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @return An array list of Fields.
    -   */
    -  public ArrayList<Field> getFields()
    -  {
    -    return fields;
    -  }
    -
    -  /**
    -   * Sets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    -   * 
    -   * @param fields
    -   *          An array list of Fields.
    -   */
    -  public void setFields(ArrayList<Field> fields)
    -  {
    -    this.fields = fields;
       }
     
    -  /**
    -   * Gets the delimiter which separates fields in incoming data.
    -   * 
    -   * @return fieldDelimiter
    -   */
    -  public int getFieldDelimiter()
    +  @Override
    +  public KeyValPair<String, String> processErorrTuple(byte[] input)
       {
    -    return fieldDelimiter;
    +    //This method is not invoked for CSV parser
    +    return null;
       }
     
       /**
    -   * Sets the delimiter which separates fields in incoming data.
    -   * 
    -   * @param fieldDelimiter
    +   * Returns array of cellprocessors, one for each field
        */
    -  public void setFieldDelimiter(int fieldDelimiter)
    +  private CellProcessor[] getProcessor(List<Field> fields)
       {
    -    this.fieldDelimiter = fieldDelimiter;
    +    CellProcessor[] processor = new CellProcessor[fields.size()];
    +    int fieldCount = 0;
    +    for (Field field : fields) {
    +      processor[fieldCount++] = CellProcessorBuilder.getCellProcessor(field.getType(), field.getConstraints());
    +    }
    +    return processor;
       }
     
    -  /**
    -   * Gets the delimiter which separates lines in incoming data.
    -   * 
    -   * @return lineDelimiter
    -   */
    -  public String getLineDelimiter()
    +  @Override
    +  public void teardown()
       {
    -    return lineDelimiter;
    +    try {
    +      csvMapReader.close();
    +    } catch (IOException e) {
    +      logger.error("Error while closing csv map reader {}", e.getMessage());
    +    }
    +    try {
    +      csvBeanReader.close();
    +    } catch (IOException e) {
    +      logger.error("Error while closing csv bean reader {}", e.getMessage());
    +    }
       }
     
       /**
    -   * Sets the delimiter which separates line in incoming data.
    +   * Complete hdfs path of schema
        * 
    -   * @param lineDelimiter
    +   * @return
        */
    -  public void setLineDelimiter(String lineDelimiter)
    +  public String getSchemaPath()
       {
    -    this.lineDelimiter = lineDelimiter;
    +    return schemaPath;
       }
     
       /**
    -   * Gets the name of the fields with type and format ( for date ) as comma
    -   * separated string in same order as incoming data. e.g
    -   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
    +   * Set path of schema
        * 
    -   * @return fieldInfo
    +   * @param schemaPath
    +   *          path of the schema file in hdfs
        */
    -  public String getFieldInfo()
    +  public void setSchemaPath(String schemaPath)
    --- End diff --
    
    I it a requirement that this needs to be retrieved from HDFS. User might have a file in local file system.
    
    I would suggest, reading config files from FileSystems in not responsibility of any operator. The file should be read by application and the content in string/byte[] format should be passed to the operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1961 enhanced existing cs...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/154#discussion_r48724044
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java ---
    @@ -0,0 +1,456 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.parser;
    +
    +import java.util.Map;
    +
    +import org.supercsv.cellprocessor.Optional;
    +import org.supercsv.cellprocessor.ParseBool;
    +import org.supercsv.cellprocessor.ParseChar;
    +import org.supercsv.cellprocessor.ParseDate;
    +import org.supercsv.cellprocessor.ParseDouble;
    +import org.supercsv.cellprocessor.ParseInt;
    +import org.supercsv.cellprocessor.ParseLong;
    +import org.supercsv.cellprocessor.constraint.DMinMax;
    +import org.supercsv.cellprocessor.constraint.Equals;
    +import org.supercsv.cellprocessor.constraint.LMinMax;
    +import org.supercsv.cellprocessor.constraint.StrMinMax;
    +import org.supercsv.cellprocessor.constraint.StrRegEx;
    +import org.supercsv.cellprocessor.constraint.Strlen;
    +import org.supercsv.cellprocessor.ift.CellProcessor;
    +import org.supercsv.cellprocessor.ift.DoubleCellProcessor;
    +import org.supercsv.cellprocessor.ift.LongCellProcessor;
    +import org.supercsv.util.CsvContext;
    +
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.contrib.parser.DelimitedSchema.FIELD_TYPE;
    +
    +/**
    + * Helper class with methods to generate CellProcessor objects. Cell processors
    + * are an integral part of reading and writing with Super CSV - they automate
    + * the data type conversions, and enforce constraints. They implement the chain
    + * of responsibility design pattern - each processor has a single, well-defined
    + * purpose and can be chained together with other processors to fully automate
    + * all of the required conversions and constraint validation for a single
    + * delimited record.
    + * 
    + */
    +public class CellProcessorBuilder
    +{
    +
    +  /**
    +   * Method to get cell processors for given field type and constraints
    +   * 
    +   * @param fieldType
    +   *          data type of the field
    +   * @param constraints
    +   *          a map of constraints
    +   * @return
    +   */
    +  public static CellProcessor getCellProcessor(FIELD_TYPE fieldType, Map<String, Object> constraints)
    +  {
    +    switch (fieldType) {
    +      case STRING:
    +        return getStringCellProcessor(constraints);
    +      case INTEGER:
    +        return getIntegerCellProcessor(constraints);
    +      case LONG:
    +        return getLongCellProcessor(constraints);
    +      case FLOAT:
    +      case DOUBLE:
    +        return getDoubleCellProcessor(constraints);
    +      case CHARACTER:
    +        return getCharCellProcessor(constraints);
    +      case BOOLEAN:
    +        return getBooleanCellProcessor(constraints);
    +      case DATE:
    +        return getDateCellProcessor(constraints);
    +      default:
    +        return null;
    +    }
    +  }
    +
    +  /**
    +   * Method to get cellprocessor for String with constraints. These constraints
    +   * are evaluated against the String field for which this cellprocessor is
    +   * defined.
    +   * 
    +   * @param constraints
    +   *          map of constraints applicable to String
    +   * @return CellProcessor
    +   */
    +  private static CellProcessor getStringCellProcessor(Map<String, Object> constraints)
    +  {
    +    Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
    +        .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
    --- End diff --
    
    I think the comment did not take Map<String, String>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---