You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by shubham-pathak22 <gi...@git.apache.org> on 2016/06/01 13:22:28 UTC

[GitHub] incubator-apex-malhar pull request: APEXMALHAR-2105 enhancing CSV formatter ...

GitHub user shubham-pathak22 opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/306

    APEXMALHAR-2105 enhancing CSV formatter to read field info from schema

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shubham-pathak22/incubator-apex-malhar APEXMALHAR-2105

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/306.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #306
    
----
commit a2a863746613c9b4c594949b5b188b138b145395
Author: shubham <sh...@github.com>
Date:   2016-06-01T13:17:36Z

    APEXMALHAR-2105 enhancing CSV formatter to read field info from schema

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request #306: APEXMALHAR-2105 enhancing CSV forma...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65411669
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
    @@ -58,215 +68,156 @@
     public class CsvFormatter extends Formatter<String>
     {
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected String classname;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    +  /**
    +   * Names of all the fields in the same order that would appear in output
    +   * records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Writing preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    +  /**
    +   * Contents of the schema.Schema is specified in a json format as per
    +   * {@link DelimitedSchema}
    +   */
       @NotNull
    -  protected String fieldInfo;
    +  private String schema;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
     
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  /**
    +   * metric to keep count of number of tuples emitted on error port port
    +   */
    +  @AutoMetric
    +  protected long errorTupleCount;
    +
    +  /**
    +   * metric to keep count of number of tuples emitted on out port
    +   */
    +  @AutoMetric
    +  protected long emittedObjectCount;
     
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  protected transient CsvPreference preference;
    +  /**
    +   * metric to keep count of number of tuples emitted on input port
    +   */
    +  @AutoMetric
    +  protected long incomingTuplesCount;
     
       public CsvFormatter()
    --- End diff --
    
    You can remove the constructor as nothing is there in it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request #306: APEXMALHAR-2105 enhancing CSV forma...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65411560
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
    @@ -58,215 +68,156 @@
     public class CsvFormatter extends Formatter<String>
     {
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected String classname;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    +  /**
    +   * Names of all the fields in the same order that would appear in output
    +   * records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Writing preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    +  /**
    +   * Contents of the schema.Schema is specified in a json format as per
    +   * {@link DelimitedSchema}
    +   */
       @NotNull
    -  protected String fieldInfo;
    +  private String schema;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
     
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  /**
    +   * metric to keep count of number of tuples emitted on error port port
    +   */
    +  @AutoMetric
    +  protected long errorTupleCount;
    +
    +  /**
    +   * metric to keep count of number of tuples emitted on out port
    +   */
    +  @AutoMetric
    +  protected long emittedObjectCount;
    --- End diff --
    
    Can you please make this private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request #306: APEXMALHAR-2105 enhancing CSV forma...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65412409
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
    @@ -58,215 +68,156 @@
     public class CsvFormatter extends Formatter<String>
     {
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected String classname;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    +  /**
    +   * Names of all the fields in the same order that would appear in output
    +   * records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Writing preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    +  /**
    +   * Contents of the schema.Schema is specified in a json format as per
    +   * {@link DelimitedSchema}
    +   */
       @NotNull
    -  protected String fieldInfo;
    +  private String schema;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
     
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  /**
    +   * metric to keep count of number of tuples emitted on error port port
    +   */
    +  @AutoMetric
    +  protected long errorTupleCount;
    +
    +  /**
    +   * metric to keep count of number of tuples emitted on out port
    +   */
    +  @AutoMetric
    +  protected long emittedObjectCount;
     
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  protected transient CsvPreference preference;
    +  /**
    +   * metric to keep count of number of tuples emitted on input port
    +   */
    +  @AutoMetric
    +  protected long incomingTuplesCount;
     
       public CsvFormatter()
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +  }
     
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    super.beginWindow(windowId);
    +    errorTupleCount = 0;
    +    emittedObjectCount = 0;
    +    incomingTuplesCount = 0;
       }
     
       @Override
       public void setup(Context.OperatorContext context)
       {
         super.setup(context);
    -
    -    //fieldInfo information
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -    preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    int countKeyValue = getFields().size();
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    -
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames()
    +        .toArray(new String[delimitedParserSchema.getFieldNames().size()]);
    +    processors = getProcessor(delimitedParserSchema.getFields());
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    +  /**
    +   * Returns array of cellprocessors, one for each field
    +   */
    +  private CellProcessor[] getProcessor(List<Field> fields)
       {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    +    CellProcessor[] processor = new CellProcessor[fields.size()];
    +    int fieldCount = 0;
    +    for (Field field : fields) {
    +      if (field.getType() == FieldType.DATE) {
    +        String format = field.getConstraints().get(DelimitedSchema.DATE_FORMAT) == null ? null
    +            : (String)field.getConstraints().get(DelimitedSchema.DATE_FORMAT);
    +        processor[fieldCount++] = new Optional(new FmtDate(format == null ? "dd/MM/yyyy" : format));
           } else {
    -        processors[i] = new Optional();
    +        processor[fieldCount++] = new Optional();
           }
         }
    -
    +    return processor;
       }
     
       @Override
       public String convert(Object tuple)
       {
    +    incomingTuplesCount++;
    +    if (tuple == null) {
    +      errorTupleCount++;
    +      logger.error(" Null tuple", tuple);
    +      return null;
    +    }
         try {
           StringWriter stringWriter = new StringWriter();
           ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter, preference);
           beanWriter.write(tuple, nameMapping, processors);
           beanWriter.flush();
           beanWriter.close();
    +      emittedObjectCount++;
           return stringWriter.toString();
         } catch (SuperCsvException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    +      logger.error("Error while converting tuple {} {}", tuple, e.getMessage());
    +      errorTupleCount++;
         } catch (IOException e) {
           DTThrowable.rethrow(e);
         }
         return null;
       }
     
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    -    }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    -    }
    -
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    -
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    -    }
    -  }
    -
       /**
    -   * Gets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    +   * Get the schema
        * 
    -   * @return An array list of Fields.
    +   * @return schema
        */
    -  public ArrayList<Field> getFields()
    +  public String getSchema()
       {
    -    return fields;
    +    return schema;
       }
     
       /**
    -   * Sets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    +   * Set the schema
        * 
    -   * @param fields
    -   *          An array list of Fields.
    +   * @param schema
        */
    -  public void setFields(ArrayList<Field> fields)
    +  public void setSchema(String schema)
       {
    -    this.fields = fields;
    +    this.schema = schema;
       }
     
    -  /**
    -   * Gets the delimiter which separates fields in incoming data.
    -   * 
    -   * @return fieldDelimiter
    -   */
    -  public int getFieldDelimiter()
    +  @VisibleForTesting
    +  public long getErrorTupleCount()
       {
    -    return fieldDelimiter;
    +    return errorTupleCount;
       }
     
    -  /**
    -   * Sets the delimiter which separates fields in incoming data.
    -   * 
    -   * @param fieldDelimiter
    -   */
    -  public void setFieldDelimiter(int fieldDelimiter)
    +  @VisibleForTesting
    +  public long getEmittedObjectCount()
       {
    -    this.fieldDelimiter = fieldDelimiter;
    +    return emittedObjectCount;
       }
     
    -  /**
    -   * Gets the delimiter which separates lines in incoming data.
    -   * 
    -   * @return lineDelimiter
    -   */
    -  public String getLineDelimiter()
    -  {
    -    return lineDelimiter;
    -  }
    -
    -  /**
    -   * Sets the delimiter which separates line in incoming data.
    -   * 
    -   * @param lineDelimiter
    -   */
    -  public void setLineDelimiter(String lineDelimiter)
    -  {
    -    this.lineDelimiter = lineDelimiter;
    -  }
    -
    -  /**
    -   * Gets the name of the fields with type and format in data as comma separated
    -   * string in same order as incoming data. e.g
    -   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
    -   * 
    -   * @return fieldInfo
    -   */
    -  public String getFieldInfo()
    -  {
    -    return fieldInfo;
    -  }
    -
    -  /**
    -   * Sets the name of the fields with type and format in data as comma separated
    -   * string in same order as incoming data. e.g
    -   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
    -   * 
    -   * @param fieldInfo
    -   */
    -  public void setFieldInfo(String fieldInfo)
    +  @VisibleForTesting
    +  public long getIncomingTuplesCount()
    --- End diff --
    
    Please make this protected... No need for this to be private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request #306: APEXMALHAR-2105 enhancing CSV forma...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65412319
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
    @@ -58,215 +68,156 @@
     public class CsvFormatter extends Formatter<String>
     {
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected String classname;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    +  /**
    +   * Names of all the fields in the same order that would appear in output
    +   * records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Writing preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    +  /**
    +   * Contents of the schema.Schema is specified in a json format as per
    +   * {@link DelimitedSchema}
    +   */
       @NotNull
    -  protected String fieldInfo;
    +  private String schema;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
     
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  /**
    +   * metric to keep count of number of tuples emitted on error port port
    +   */
    +  @AutoMetric
    +  protected long errorTupleCount;
    +
    +  /**
    +   * metric to keep count of number of tuples emitted on out port
    +   */
    +  @AutoMetric
    +  protected long emittedObjectCount;
     
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  protected transient CsvPreference preference;
    +  /**
    +   * metric to keep count of number of tuples emitted on input port
    +   */
    +  @AutoMetric
    +  protected long incomingTuplesCount;
     
       public CsvFormatter()
       {
    -    fields = new ArrayList<Field>();
    -    fieldDelimiter = ',';
    -    lineDelimiter = "\r\n";
    +  }
     
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    super.beginWindow(windowId);
    +    errorTupleCount = 0;
    +    emittedObjectCount = 0;
    +    incomingTuplesCount = 0;
       }
     
       @Override
       public void setup(Context.OperatorContext context)
       {
         super.setup(context);
    -
    -    //fieldInfo information
    -    fields = new ArrayList<Field>();
    -    String[] fieldInfoTuple = fieldInfo.split(",");
    -    for (int i = 0; i < fieldInfoTuple.length; i++) {
    -      String[] fieldTuple = fieldInfoTuple[i].split(":");
    -      Field field = new Field();
    -      field.setName(fieldTuple[0]);
    -      String[] typeFormat = fieldTuple[1].split("\\|");
    -      field.setType(typeFormat[0].toUpperCase());
    -      if (typeFormat.length > 1) {
    -        field.setFormat(typeFormat[1]);
    -      }
    -      getFields().add(field);
    -    }
    -    preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
    -    int countKeyValue = getFields().size();
    -    nameMapping = new String[countKeyValue];
    -    processors = new CellProcessor[countKeyValue];
    -    initialise(nameMapping, processors);
    -
    +    delimitedParserSchema = new DelimitedSchema(schema);
    +    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
    +        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
    +    nameMapping = delimitedParserSchema.getFieldNames()
    +        .toArray(new String[delimitedParserSchema.getFieldNames().size()]);
    +    processors = getProcessor(delimitedParserSchema.getFields());
       }
     
    -  private void initialise(String[] nameMapping, CellProcessor[] processors)
    +  /**
    +   * Returns array of cellprocessors, one for each field
    +   */
    +  private CellProcessor[] getProcessor(List<Field> fields)
       {
    -    for (int i = 0; i < getFields().size(); i++) {
    -      FIELD_TYPE type = getFields().get(i).type;
    -      nameMapping[i] = getFields().get(i).name;
    -      if (type == FIELD_TYPE.DATE) {
    -        String dateFormat = getFields().get(i).format;
    -        processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
    +    CellProcessor[] processor = new CellProcessor[fields.size()];
    +    int fieldCount = 0;
    +    for (Field field : fields) {
    +      if (field.getType() == FieldType.DATE) {
    +        String format = field.getConstraints().get(DelimitedSchema.DATE_FORMAT) == null ? null
    +            : (String)field.getConstraints().get(DelimitedSchema.DATE_FORMAT);
    +        processor[fieldCount++] = new Optional(new FmtDate(format == null ? "dd/MM/yyyy" : format));
           } else {
    -        processors[i] = new Optional();
    +        processor[fieldCount++] = new Optional();
           }
         }
    -
    +    return processor;
       }
     
       @Override
       public String convert(Object tuple)
       {
    +    incomingTuplesCount++;
    +    if (tuple == null) {
    +      errorTupleCount++;
    +      logger.error(" Null tuple", tuple);
    +      return null;
    +    }
         try {
           StringWriter stringWriter = new StringWriter();
           ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter, preference);
           beanWriter.write(tuple, nameMapping, processors);
           beanWriter.flush();
           beanWriter.close();
    +      emittedObjectCount++;
           return stringWriter.toString();
         } catch (SuperCsvException e) {
    -      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
    +      logger.error("Error while converting tuple {} {}", tuple, e.getMessage());
    +      errorTupleCount++;
         } catch (IOException e) {
           DTThrowable.rethrow(e);
         }
         return null;
       }
     
    -  public static class Field
    -  {
    -    String name;
    -    String format;
    -    FIELD_TYPE type;
    -
    -    public String getName()
    -    {
    -      return name;
    -    }
    -
    -    public void setName(String name)
    -    {
    -      this.name = name;
    -    }
    -
    -    public FIELD_TYPE getType()
    -    {
    -      return type;
    -    }
    -
    -    public void setType(String type)
    -    {
    -      this.type = FIELD_TYPE.valueOf(type);
    -    }
    -
    -    public String getFormat()
    -    {
    -      return format;
    -    }
    -
    -    public void setFormat(String format)
    -    {
    -      this.format = format;
    -    }
    -  }
    -
       /**
    -   * Gets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    +   * Get the schema
        * 
    -   * @return An array list of Fields.
    +   * @return schema
        */
    -  public ArrayList<Field> getFields()
    +  public String getSchema()
       {
    -    return fields;
    +    return schema;
       }
     
       /**
    -   * Sets the array list of the fields, a field being a POJO containing the name
    -   * of the field and type of field.
    +   * Set the schema
        * 
    -   * @param fields
    -   *          An array list of Fields.
    +   * @param schema
        */
    -  public void setFields(ArrayList<Field> fields)
    +  public void setSchema(String schema)
       {
    -    this.fields = fields;
    +    this.schema = schema;
       }
     
    -  /**
    -   * Gets the delimiter which separates fields in incoming data.
    -   * 
    -   * @return fieldDelimiter
    -   */
    -  public int getFieldDelimiter()
    +  @VisibleForTesting
    +  public long getErrorTupleCount()
       {
    -    return fieldDelimiter;
    +    return errorTupleCount;
       }
     
    -  /**
    -   * Sets the delimiter which separates fields in incoming data.
    -   * 
    -   * @param fieldDelimiter
    -   */
    -  public void setFieldDelimiter(int fieldDelimiter)
    +  @VisibleForTesting
    +  public long getEmittedObjectCount()
    --- End diff --
    
    You can make this protected? As the testcase will be in the same package, it'll be accesible via protected as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request #306: APEXMALHAR-2105 enhancing CSV forma...

Posted by chinmaykolhatkar <gi...@git.apache.org>.
Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/306#discussion_r65411586
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---
    @@ -58,215 +68,156 @@
     public class CsvFormatter extends Formatter<String>
     {
     
    -  private ArrayList<Field> fields;
    -  @NotNull
    -  protected String classname;
    -  @NotNull
    -  protected int fieldDelimiter;
    -  protected String lineDelimiter;
    +  /**
    +   * Names of all the fields in the same order that would appear in output
    +   * records
    +   */
    +  private transient String[] nameMapping;
    +  /**
    +   * Cell processors are an integral part of reading and writing with Super CSV
    +   * they automate the data type conversions, and enforce constraints.
    +   */
    +  private transient CellProcessor[] processors;
    +  /**
    +   * Writing preferences that are passed through schema
    +   */
    +  private transient CsvPreference preference;
     
    +  /**
    +   * Contents of the schema.Schema is specified in a json format as per
    +   * {@link DelimitedSchema}
    +   */
       @NotNull
    -  protected String fieldInfo;
    +  private String schema;
    +  /**
    +   * Schema is read into this object to access fields
    +   */
    +  private transient DelimitedSchema delimitedParserSchema;
     
    -  public enum FIELD_TYPE
    -  {
    -    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
    -  };
    +  /**
    +   * metric to keep count of number of tuples emitted on error port port
    +   */
    +  @AutoMetric
    +  protected long errorTupleCount;
    +
    +  /**
    +   * metric to keep count of number of tuples emitted on out port
    +   */
    +  @AutoMetric
    +  protected long emittedObjectCount;
     
    -  protected transient String[] nameMapping;
    -  protected transient CellProcessor[] processors;
    -  protected transient CsvPreference preference;
    +  /**
    +   * metric to keep count of number of tuples emitted on input port
    +   */
    +  @AutoMetric
    +  protected long incomingTuplesCount;
    --- End diff --
    
    Can you make this private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---