You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:13:05 UTC

[48/50] [abbrv] incubator-apex-malhar git commit: APEXMALHAR-1961 #comment enhanced existing csv parser to take in schema for validations

APEXMALHAR-1961 #comment enhanced existing csv parser to take in schema for validations


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9e77ef7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9e77ef7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9e77ef7b

Branch: refs/heads/master
Commit: 9e77ef7bd194b7c79c72f301a850ce5cac229f2c
Parents: 49b8556
Author: shubham <sh...@github.com>
Authored: Wed Dec 30 18:02:13 2015 +0530
Committer: Gaurav Gupta <ga...@apache.org>
Committed: Tue Jan 19 09:27:30 2016 -0800

----------------------------------------------------------------------
 contrib/pom.xml                                 |  10 +-
 .../contrib/parser/CellProcessorBuilder.java    | 456 ++++++++++++++++
 .../datatorrent/contrib/parser/CsvParser.java   | 362 +++++++------
 .../contrib/parser/DelimitedSchema.java         | 359 +++++++++++++
 .../contrib/parser/CsvPOJOParserTest.java       | 519 +++++++++++++++----
 contrib/src/test/resources/schema.json          |  96 ++++
 .../com/datatorrent/lib/parser/JsonParser.java  |   2 +-
 .../java/com/datatorrent/lib/parser/Parser.java |   4 +-
 .../com/datatorrent/lib/parser/XmlParser.java   |   2 +-
 9 files changed, 1522 insertions(+), 288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 7331940..16e28c8 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -477,7 +477,7 @@
     <dependency>
       <groupId>net.sf.supercsv</groupId>
       <artifactId>super-csv</artifactId>
-      <version>2.2.0</version>
+      <version>2.4.0</version>
       <optional>true</optional>
     </dependency>
     <dependency>
@@ -610,12 +610,6 @@
       <artifactId>apex-common</artifactId>
       <version>${apex.core.version}</version>
       <type>jar</type>
-    </dependency>
-    <dependency>
-      <!-- required by Csv parser and formatter -->
-      <groupId>net.sf.supercsv</groupId>
-      <artifactId>super-csv-joda</artifactId>
-      <version>2.3.1</version>
-    </dependency>
+    </dependency>  
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
new file mode 100644
index 0000000..1993d94
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.util.Map;
+
+import org.supercsv.cellprocessor.Optional;
+import org.supercsv.cellprocessor.ParseBool;
+import org.supercsv.cellprocessor.ParseChar;
+import org.supercsv.cellprocessor.ParseDate;
+import org.supercsv.cellprocessor.ParseDouble;
+import org.supercsv.cellprocessor.ParseInt;
+import org.supercsv.cellprocessor.ParseLong;
+import org.supercsv.cellprocessor.constraint.DMinMax;
+import org.supercsv.cellprocessor.constraint.Equals;
+import org.supercsv.cellprocessor.constraint.LMinMax;
+import org.supercsv.cellprocessor.constraint.StrMinMax;
+import org.supercsv.cellprocessor.constraint.StrRegEx;
+import org.supercsv.cellprocessor.constraint.Strlen;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.cellprocessor.ift.DoubleCellProcessor;
+import org.supercsv.cellprocessor.ift.LongCellProcessor;
+import org.supercsv.util.CsvContext;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.datatorrent.contrib.parser.DelimitedSchema.FieldType;
+
+/**
+ * Helper class with methods to generate CellProcessor objects. Cell processors
+ * are an integral part of reading and writing with Super CSV - they automate
+ * the data type conversions, and enforce constraints. They implement the chain
+ * of responsibility design pattern - each processor has a single, well-defined
+ * purpose and can be chained together with other processors to fully automate
+ * all of the required conversions and constraint validation for a single
+ * delimited record.
+ * 
+ */
+public class CellProcessorBuilder
+{
+
+  /**
+   * Method to get cell processors for given field type and constraints
+   * 
+   * @param fieldType
+   *          data type of the field
+   * @param constraints
+   *          a map of constraints
+   * @return
+   */
+  public static CellProcessor getCellProcessor(FieldType fieldType, Map<String, Object> constraints)
+  {
+    switch (fieldType) {
+      case STRING:
+        return getStringCellProcessor(constraints);
+      case INTEGER:
+        return getIntegerCellProcessor(constraints);
+      case LONG:
+        return getLongCellProcessor(constraints);
+      case FLOAT:
+      case DOUBLE:
+        return getDoubleCellProcessor(constraints);
+      case CHARACTER:
+        return getCharCellProcessor(constraints);
+      case BOOLEAN:
+        return getBooleanCellProcessor(constraints);
+      case DATE:
+        return getDateCellProcessor(constraints);
+      default:
+        return null;
+    }
+  }
+
+  /**
+   * Method to get cellprocessor for String with constraints. These constraints
+   * are evaluated against the String field for which this cellprocessor is
+   * defined.
+   * 
+   * @param constraints
+   *          map of constraints applicable to String
+   * @return CellProcessor
+   */
+  private static CellProcessor getStringCellProcessor(Map<String, Object> constraints)
+  {
+    Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
+        .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
+    Integer strLen = constraints.get(DelimitedSchema.LENGTH) == null ? null : Integer.parseInt((String)constraints
+        .get(DelimitedSchema.LENGTH));
+    Integer minLength = constraints.get(DelimitedSchema.MIN_LENGTH) == null ? null : Integer
+        .parseInt((String)constraints.get(DelimitedSchema.MIN_LENGTH));
+    Integer maxLength = constraints.get(DelimitedSchema.MAX_LENGTH) == null ? null : Integer
+        .parseInt((String)constraints.get(DelimitedSchema.MAX_LENGTH));
+    String equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : (String)constraints
+        .get(DelimitedSchema.EQUALS);
+    String pattern = constraints.get(DelimitedSchema.REGEX_PATTERN) == null ? null : (String)constraints
+        .get(DelimitedSchema.REGEX_PATTERN);
+
+    CellProcessor cellProcessor = null;
+    if (StringUtils.isNotBlank(equals)) {
+      cellProcessor = new Equals(equals);
+    } else if (StringUtils.isNotBlank(pattern)) {
+      cellProcessor = new StrRegEx(pattern);
+    } else if (strLen != null) {
+      cellProcessor = new Strlen(strLen);
+    } else if (maxLength != null || minLength != null) {
+      Long min = minLength == null ? 0L : minLength;
+      Long max = maxLength == null ? LMinMax.MAX_LONG : maxLength;
+      cellProcessor = new StrMinMax(min, max);
+    }
+    if (required == null || !required) {
+      cellProcessor = addOptional(cellProcessor);
+    }
+    return cellProcessor;
+  }
+
+  /**
+   * Method to get cellprocessor for Integer with constraints. These constraints
+   * are evaluated against the Integer field for which this cellprocessor is
+   * defined.
+   * 
+   * @param constraints
+   *          map of constraints applicable to Integer
+   * @return CellProcessor
+   */
+  private static CellProcessor getIntegerCellProcessor(Map<String, Object> constraints)
+  {
+    Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
+        .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
+    Integer equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : Integer.parseInt((String)constraints
+        .get(DelimitedSchema.EQUALS));
+    Integer minValue = constraints.get(DelimitedSchema.MIN_VALUE) == null ? null : Integer.parseInt((String)constraints
+        .get(DelimitedSchema.MIN_VALUE));
+    Integer maxValue = constraints.get(DelimitedSchema.MAX_VALUE) == null ? null : Integer.parseInt((String)constraints
+        .get(DelimitedSchema.MAX_VALUE));
+
+    CellProcessor cellProcessor = null;
+    if (equals != null) {
+      cellProcessor = new Equals(equals);
+      cellProcessor = addParseInt(cellProcessor);
+    } else if (minValue != null || maxValue != null) {
+      cellProcessor = addIntMinMax(minValue, maxValue);
+    } else {
+      cellProcessor = addParseInt(null);
+    }
+    if (required == null || !required) {
+      cellProcessor = addOptional(cellProcessor);
+    }
+    return cellProcessor;
+  }
+
+  /**
+   * Method to get cellprocessor for Long with constraints. These constraints
+   * are evaluated against the Long field for which this cellprocessor is
+   * defined.
+   * 
+   * @param constraints
+   *          map of constraints applicable to Long
+   * @return CellProcessor
+   */
+  private static CellProcessor getLongCellProcessor(Map<String, Object> constraints)
+  {
+    Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
+        .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
+    Long equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : Long.parseLong((String)constraints
+        .get(DelimitedSchema.EQUALS));
+    Long minValue = constraints.get(DelimitedSchema.MIN_VALUE) == null ? null : Long.parseLong((String)constraints
+        .get(DelimitedSchema.MIN_VALUE));
+    Long maxValue = constraints.get(DelimitedSchema.MAX_VALUE) == null ? null : Long.parseLong((String)constraints
+        .get(DelimitedSchema.MAX_VALUE));
+    CellProcessor cellProcessor = null;
+    if (equals != null) {
+      cellProcessor = new Equals(equals);
+      cellProcessor = addParseLong(cellProcessor);
+    } else if (minValue != null || maxValue != null) {
+      cellProcessor = addLongMinMax(minValue, maxValue);
+    } else {
+      cellProcessor = addParseLong(null);
+    }
+    if (required == null || !required) {
+      cellProcessor = addOptional(cellProcessor);
+    }
+    return cellProcessor;
+  }
+
+  /**
+   * Method to get cellprocessor for Float/Double with constraints. These
+   * constraints are evaluated against the Float/Double field for which this
+   * cellprocessor is defined.
+   * 
+   * @param constraints
+   *          map of constraints applicable to Float/Double
+   * @return CellProcessor
+   */
+  private static CellProcessor getDoubleCellProcessor(Map<String, Object> constraints)
+  {
+    Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
+        .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
+    Double equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : Double.parseDouble((String)constraints
+        .get(DelimitedSchema.EQUALS));
+    Double minValue = constraints.get(DelimitedSchema.MIN_VALUE) == null ? null : Double
+        .parseDouble((String)constraints.get(DelimitedSchema.MIN_VALUE));
+    Double maxValue = constraints.get(DelimitedSchema.MAX_VALUE) == null ? null : Double
+        .parseDouble((String)constraints.get(DelimitedSchema.MAX_VALUE));
+    CellProcessor cellProcessor = null;
+    if (equals != null) {
+      cellProcessor = new Equals(equals);
+      cellProcessor = addParseDouble(cellProcessor);
+    } else if (minValue != null || maxValue != null) {
+      cellProcessor = addDoubleMinMax(minValue, maxValue);
+    } else {
+      cellProcessor = addParseDouble(null);
+    }
+    if (required == null || !required) {
+      cellProcessor = addOptional(cellProcessor);
+    }
+    return cellProcessor;
+  }
+
+  /**
+   * Method to get cellprocessor for Boolean with constraints. These constraints
+   * are evaluated against the Boolean field for which this cellprocessor is
+   * defined.
+   * 
+   * @param constraints
+   *          map of constraints applicable to Boolean
+   * @return CellProcessor
+   */
+  private static CellProcessor getBooleanCellProcessor(Map<String, Object> constraints)
+  {
+    Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
+        .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
+    String trueValue = constraints.get(DelimitedSchema.TRUE_VALUE) == null ? null : (String)constraints
+        .get(DelimitedSchema.TRUE_VALUE);
+    String falseValue = constraints.get(DelimitedSchema.FALSE_VALUE) == null ? null : (String)constraints
+        .get(DelimitedSchema.FALSE_VALUE);
+    CellProcessor cellProcessor = null;
+    if (StringUtils.isNotBlank(trueValue) && StringUtils.isNotBlank(falseValue)) {
+      cellProcessor = new ParseBool(trueValue, falseValue);
+    } else {
+      cellProcessor = new ParseBool();
+    }
+    if (required == null || !required) {
+      cellProcessor = addOptional(cellProcessor);
+    }
+    return cellProcessor;
+  }
+
+  /**
+   * Method to get cellprocessor for Date with constraints. These constraints
+   * are evaluated against the Date field for which this cellprocessor is
+   * defined.
+   * 
+   * @param constraints
+   *          map of constraints applicable to Date
+   * @return CellProcessor
+   */
+  private static CellProcessor getDateCellProcessor(Map<String, Object> constraints)
+  {
+    Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
+        .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
+    String format = constraints.get(DelimitedSchema.DATE_FORMAT) == null ? null : (String)constraints
+        .get(DelimitedSchema.DATE_FORMAT);
+    CellProcessor cellProcessor = null;
+    String fmt = StringUtils.isNotBlank(format) ? format : "dd/MM/yyyy";
+    cellProcessor = new ParseDate(fmt, false);
+    if (required == null || !required) {
+      cellProcessor = addOptional(cellProcessor);
+    }
+    return cellProcessor;
+  }
+
+  /**
+   * Method to get cellprocessor for Char with constraints. These constraints
+   * are evaluated against the Char field for which this cellprocessor is
+   * defined.
+   * 
+   * @param constraints
+   *          map of constraints applicable to Char
+   * @return CellProcessor
+   */
+  private static CellProcessor getCharCellProcessor(Map<String, Object> constraints)
+  {
+    Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
+        .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
+    Character equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : ((String)constraints
+        .get(DelimitedSchema.EQUALS)).charAt(0);
+
+    CellProcessor cellProcessor = null;
+    if (equals != null) {
+      cellProcessor = new Equals(equals);
+    }
+    cellProcessor = addParseChar(cellProcessor);
+    if (required == null || !required) {
+      cellProcessor = addOptional(cellProcessor);
+    }
+    return cellProcessor;
+  }
+
+  /**
+   * Get a Double Min Max cellprocessor.
+   * 
+   * @param minValue
+   *          minimum value.
+   * @param maxValue
+   *          maximum value.
+   * @return CellProcessor
+   */
+  private static CellProcessor addDoubleMinMax(Double minValue, Double maxValue)
+  {
+    Double min = minValue == null ? DMinMax.MIN_DOUBLE : minValue;
+    Double max = maxValue == null ? DMinMax.MAX_DOUBLE : maxValue;
+    return new DMinMax(min, max);
+  }
+
+  /**
+   * Get a Long Min Max cellprocessor.
+   * 
+   * @param minValue
+   *          minimum value.
+   * @param maxValue
+   *          maximum value.
+   * @return CellProcessor
+   */
+  private static CellProcessor addLongMinMax(Long minValue, Long maxValue)
+  {
+    Long min = minValue == null ? LMinMax.MIN_LONG : minValue;
+    Long max = maxValue == null ? LMinMax.MAX_LONG : maxValue;
+    return new LMinMax(min, max);
+  }
+
+  /**
+   * Get a Int Min Max cellprocessor.
+   * 
+   * @param minValue
+   *          minimum value.
+   * @param maxValue
+   *          maximum value.
+   * @return CellProcessor
+   */
+  private static CellProcessor addIntMinMax(Integer minValue, Integer maxValue)
+  {
+    Integer min = minValue == null ? Integer.MIN_VALUE : minValue;
+    Integer max = maxValue == null ? Integer.MAX_VALUE : maxValue;
+    return new IntMinMax(min, max);
+  }
+
+  /**
+   * Get Optional cellprocessor which means field is not mandatory.
+   * 
+   * @param cellProcessor
+   *          next processor in the chain.
+   * @return CellProcessor
+   */
+  private static CellProcessor addOptional(CellProcessor cellProcessor)
+  {
+    if (cellProcessor == null) {
+      return new Optional();
+    }
+    return new Optional(cellProcessor);
+  }
+
+  /**
+   * Get cellprocessor to parse String as Integer.
+   * 
+   * @param cellProcessor
+   *          next processor in the chain.
+   * @return CellProcessor
+   */
+  private static CellProcessor addParseInt(CellProcessor cellProcessor)
+  {
+    if (cellProcessor == null) {
+      return new ParseInt();
+    }
+    return new ParseInt((LongCellProcessor)cellProcessor);
+  }
+
+  /**
+   * Get cellprocessor to parse String as Long.
+   * 
+   * @param cellProcessor
+   *          next processor in the chain.
+   * @return CellProcessor
+   */
+  private static CellProcessor addParseLong(CellProcessor cellProcessor)
+  {
+    if (cellProcessor == null) {
+      return new ParseLong();
+    }
+    return new ParseLong((LongCellProcessor)cellProcessor);
+  }
+
+  /**
+   * Get cellprocessor to parse String as Double.
+   * 
+   * @param cellProcessor
+   *          next processor in the chain.
+   * @return CellProcessor
+   */
+  private static CellProcessor addParseDouble(CellProcessor cellProcessor)
+  {
+    if (cellProcessor == null) {
+      return new ParseDouble();
+    }
+    return new ParseDouble((DoubleCellProcessor)cellProcessor);
+  }
+
+  /**
+   * Get cellprocessor to parse String as Character.
+   * 
+   * @param cellProcessor
+   *          next processor in the chain.
+   * @return CellProcessor
+   */
+  private static CellProcessor addParseChar(CellProcessor cellProcessor)
+  {
+    if (cellProcessor == null) {
+      return new ParseChar();
+    }
+    return new ParseChar((DoubleCellProcessor)cellProcessor);
+  }
+
+  /**
+   * Custom Cell processor to handle min max constraints for Integers
+   */
+  private static class IntMinMax extends LMinMax
+  {
+    public IntMinMax(int min, int max)
+    {
+      super(min, max);
+    }
+
+    @Override
+    public Object execute(Object value, CsvContext context)
+    {
+      Long result = (Long)super.execute(value, context);
+      return result.intValue();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
index af1eebe..bb72e82 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
@@ -19,287 +19,271 @@
 package com.datatorrent.contrib.parser;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 import javax.validation.constraints.NotNull;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.supercsv.cellprocessor.Optional;
-import org.supercsv.cellprocessor.ParseBool;
-import org.supercsv.cellprocessor.ParseChar;
-import org.supercsv.cellprocessor.ParseDate;
-import org.supercsv.cellprocessor.ParseDouble;
-import org.supercsv.cellprocessor.ParseInt;
-import org.supercsv.cellprocessor.ParseLong;
 import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.exception.SuperCsvException;
 import org.supercsv.io.CsvBeanReader;
+import org.supercsv.io.CsvMapReader;
 import org.supercsv.prefs.CsvPreference;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.contrib.parser.DelimitedSchema.Field;
 import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.KeyValPair;
 import com.datatorrent.lib.util.ReusableStringReader;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
- * Operator that converts CSV string to Pojo <br>
+ * Operator that parses a delimited tuple against a specified schema <br>
+ * Schema is specified in a json format as per {@link DelimitedSchema} that
+ * contains field information and constraints for each field.<br>
  * Assumption is that each field in the delimited data should map to a simple
  * java type.<br>
  * <br>
  * <b>Properties</b> <br>
- * <b>fieldInfo</b>:User need to specify fields and their types as a comma
- * separated string having format &lt;NAME&gt;:&lt;TYPE&gt;|&lt;FORMAT&gt; in
- * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as
- * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy <br>
- * <b>fieldDelimiter</b>: Default is comma <br>
- * <b>lineDelimiter</b>: Default is '\r\n'
+ * <b>schema</b>:schema as a string<br>
+ * <b>clazz</b>:Pojo class <br>
+ * <b>Ports</b> <br>
+ * <b>in</b>:input tuple as a byte array. Each tuple represents a record<br>
+ * <b>parsedOutput</b>:tuples that are validated against the schema are emitted
+ * as Map<String,Object> on this port<br>
+ * <b>out</b>:tuples that are validated against the schema are emitted as pojo
+ * on this port<br>
+ * <b>err</b>:tuples that do not confine to schema are emitted on this port as
+ * KeyValPair<String,String><br>
+ * Key being the tuple and Val being the reason.
  * 
  * @displayName CsvParser
  * @category Parsers
  * @tags csv pojo parser
  * @since 3.2.0
  */
-public class CsvParser extends Parser<String, String>
+@InterfaceStability.Evolving
+public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
 {
-
-  private ArrayList<Field> fields;
-  @NotNull
-  protected int fieldDelimiter;
-  protected String lineDelimiter;
-
+  /**
+   * Map Reader to read delimited records
+   */
+  private transient CsvMapReader csvMapReader;
+  /**
+   * Bean Reader to read delimited records
+   */
+  private transient CsvBeanReader csvBeanReader;
+  /**
+   * Reader used by csvMapReader and csvBeanReader
+   */
+  private transient ReusableStringReader csvStringReader;
+  /**
+   * Contents of the schema.Schema is specified in a json format as per
+   * {@link DelimitedSchema}
+   */
   @NotNull
-  protected String fieldInfo;
-
-  protected transient String[] nameMapping;
-  protected transient CellProcessor[] processors;
-  private transient CsvBeanReader csvReader;
-
-  public enum FIELD_TYPE
-  {
-    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
-  };
+  private String schema;
+  /**
+   * Schema is read into this object to access fields
+   */
+  private transient DelimitedSchema delimitedParserSchema;
+  /**
+   * Cell processors are an integral part of reading and writing with Super CSV
+   * they automate the data type conversions, and enforce constraints.
+   */
+  private transient CellProcessor[] processors;
+  /**
+   * Names of all the fields in the same order of incoming records
+   */
+  private transient String[] nameMapping;
+  /**
+   * header-this will be delimiter separated string of field names
+   */
+  private transient String header;
+  /**
+   * Reading preferences that are passed through schema
+   */
+  private transient CsvPreference preference;
 
-  @NotNull
-  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
+  /**
+   * metric to keep count of number of tuples emitted on {@link #parsedOutput}
+   * port
+   */
+  @AutoMetric
+  long parsedOutputCount;
 
-  public CsvParser()
+  @Override
+  public void beginWindow(long windowId)
   {
-    fields = new ArrayList<Field>();
-    fieldDelimiter = ',';
-    lineDelimiter = "\r\n";
+    super.beginWindow(windowId);
+    parsedOutputCount = 0;
   }
 
   @Override
   public void setup(OperatorContext context)
   {
-    super.setup(context);
-
-    logger.info("field info {}", fieldInfo);
-    fields = new ArrayList<Field>();
-    String[] fieldInfoTuple = fieldInfo.split(",");
-    for (int i = 0; i < fieldInfoTuple.length; i++) {
-      String[] fieldTuple = fieldInfoTuple[i].split(":");
-      Field field = new Field();
-      field.setName(fieldTuple[0]);
-      String[] typeFormat = fieldTuple[1].split("\\|");
-      field.setType(typeFormat[0].toUpperCase());
-      if (typeFormat.length > 1) {
-        field.setFormat(typeFormat[1]);
-      }
-      getFields().add(field);
-    }
-
-    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
-    csvReader = new CsvBeanReader(csvStringReader, preference);
-    int countKeyValue = getFields().size();
-    logger.info("countKeyValue {}", countKeyValue);
-    nameMapping = new String[countKeyValue];
-    processors = new CellProcessor[countKeyValue];
-    initialise(nameMapping, processors);
+    delimitedParserSchema = new DelimitedSchema(schema);
+    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
+        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
+    nameMapping = delimitedParserSchema.getFieldNames().toArray(
+        new String[delimitedParserSchema.getFieldNames().size()]);
+    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
+    processors = getProcessor(delimitedParserSchema.getFields());
+    csvStringReader = new ReusableStringReader();
+    csvMapReader = new CsvMapReader(csvStringReader, preference);
+    csvBeanReader = new CsvBeanReader(csvStringReader, preference);
   }
 
-  private void initialise(String[] nameMapping, CellProcessor[] processors)
-  {
-    for (int i = 0; i < getFields().size(); i++) {
-      FIELD_TYPE type = getFields().get(i).type;
-      nameMapping[i] = getFields().get(i).name;
-      if (type == FIELD_TYPE.DOUBLE) {
-        processors[i] = new Optional(new ParseDouble());
-      } else if (type == FIELD_TYPE.INTEGER) {
-        processors[i] = new Optional(new ParseInt());
-      } else if (type == FIELD_TYPE.FLOAT) {
-        processors[i] = new Optional(new ParseDouble());
-      } else if (type == FIELD_TYPE.LONG) {
-        processors[i] = new Optional(new ParseLong());
-      } else if (type == FIELD_TYPE.SHORT) {
-        processors[i] = new Optional(new ParseInt());
-      } else if (type == FIELD_TYPE.STRING) {
-        processors[i] = new Optional();
-      } else if (type == FIELD_TYPE.CHARACTER) {
-        processors[i] = new Optional(new ParseChar());
-      } else if (type == FIELD_TYPE.BOOLEAN) {
-        processors[i] = new Optional(new ParseBool());
-      } else if (type == FIELD_TYPE.DATE) {
-        String dateFormat = getFields().get(i).format;
-        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
-      }
-    }
-  }
   @Override
-  public Object convert(String tuple)
+  public Object convert(byte[] tuple)
   {
-    try {
-      csvStringReader.open(tuple);
-      return csvReader.read(clazz, nameMapping, processors);
-    } catch (IOException e) {
-      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
-      return null;
-    }
+    throw new UnsupportedOperationException("Not supported");
   }
 
   @Override
-  public void teardown()
+  public void processTuple(byte[] tuple)
   {
-    try {
-      if (csvReader != null) {
-        csvReader.close();
+    if (tuple == null) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
       }
-    } catch (IOException e) {
-      DTThrowable.rethrow(e);
-    }
-  }
-
-  @Override
-  public String processErorrTuple(String input)
-  {
-    return input;
-  }
-
-  public static class Field
-  {
-    String name;
-    String format;
-    FIELD_TYPE type;
-
-    public String getName()
-    {
-      return name;
+      errorTupleCount++;
+      return;
     }
-
-    public void setName(String name)
-    {
-      this.name = name;
-    }
-
-    public FIELD_TYPE getType()
-    {
-      return type;
-    }
-
-    public void setType(String type)
-    {
-      this.type = FIELD_TYPE.valueOf(type);
+    String incomingString = new String(tuple);
+    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple"));
+      }
+      errorTupleCount++;
+      return;
     }
+    try {
+      if (parsedOutput.isConnected()) {
+        csvStringReader.open(incomingString);
+        Map<String, Object> map = csvMapReader.read(nameMapping, processors);
+        parsedOutput.emit(map);
+        parsedOutputCount++;
+      }
 
-    public String getFormat()
-    {
-      return format;
-    }
+      if (out.isConnected() && clazz != null) {
+        csvStringReader.open(incomingString);
+        Object obj = csvBeanReader.read(clazz, nameMapping, processors);
+        out.emit(obj);
+        emittedObjectCount++;
+      }
 
-    public void setFormat(String format)
-    {
-      this.format = format;
+    } catch (SuperCsvException | IOException e) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
+      }
+      errorTupleCount++;
+      logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
     }
-
   }
 
-  /**
-   * Gets the array list of the fields, a field being a POJO containing the name
-   * of the field and type of field.
-   * 
-   * @return An array list of Fields.
-   */
-  public ArrayList<Field> getFields()
+  @Override
+  public KeyValPair<String, String> processErrorTuple(byte[] input)
   {
-    return fields;
+    throw new UnsupportedOperationException("Not supported");
   }
 
   /**
-   * Sets the array list of the fields, a field being a POJO containing the name
-   * of the field and type of field.
-   * 
-   * @param fields
-   *          An array list of Fields.
+   * Returns array of cellprocessors, one for each field
    */
-  public void setFields(ArrayList<Field> fields)
+  private CellProcessor[] getProcessor(List<Field> fields)
   {
-    this.fields = fields;
+    CellProcessor[] processor = new CellProcessor[fields.size()];
+    int fieldCount = 0;
+    for (Field field : fields) {
+      processor[fieldCount++] = CellProcessorBuilder.getCellProcessor(field.getType(), field.getConstraints());
+    }
+    return processor;
   }
 
-  /**
-   * Gets the delimiter which separates fields in incoming data.
-   * 
-   * @return fieldDelimiter
-   */
-  public int getFieldDelimiter()
+  @Override
+  public void teardown()
   {
-    return fieldDelimiter;
+    try {
+      csvMapReader.close();
+    } catch (IOException e) {
+      logger.error("Error while closing csv map reader {}", e.getMessage());
+      DTThrowable.wrapIfChecked(e);
+    }
+    try {
+      csvBeanReader.close();
+    } catch (IOException e) {
+      logger.error("Error while closing csv bean reader {}", e.getMessage());
+      DTThrowable.wrapIfChecked(e);
+    }
   }
 
   /**
-   * Sets the delimiter which separates fields in incoming data.
+   * Get the schema
    * 
-   * @param fieldDelimiter
+   * @return
    */
-  public void setFieldDelimiter(int fieldDelimiter)
+  public String getSchema()
   {
-    this.fieldDelimiter = fieldDelimiter;
+    return schema;
   }
 
   /**
-   * Gets the delimiter which separates lines in incoming data.
+   * Set the schema
    * 
-   * @return lineDelimiter
+   * @param schema
    */
-  public String getLineDelimiter()
+  public void setSchema(String schema)
   {
-    return lineDelimiter;
+    this.schema = schema;
   }
 
   /**
-   * Sets the delimiter which separates line in incoming data.
+   * Get errorTupleCount
    * 
-   * @param lineDelimiter
+   * @return errorTupleCount
    */
-  public void setLineDelimiter(String lineDelimiter)
+  @VisibleForTesting
+  public long getErrorTupleCount()
   {
-    this.lineDelimiter = lineDelimiter;
+    return errorTupleCount;
   }
 
   /**
-   * Gets the name of the fields with type and format ( for date ) as comma
-   * separated string in same order as incoming data. e.g
-   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+   * Get emittedObjectCount
    * 
-   * @return fieldInfo
+   * @return emittedObjectCount
    */
-  public String getFieldInfo()
+  @VisibleForTesting
+  public long getEmittedObjectCount()
   {
-    return fieldInfo;
+    return emittedObjectCount;
   }
 
   /**
-   * Sets the name of the fields with type and format ( for date ) as comma
-   * separated string in same order as incoming data. e.g
-   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+   * Get incomingTuplesCount
    * 
-   * @param fieldInfo
+   * @return incomingTuplesCount
    */
-  public void setFieldInfo(String fieldInfo)
+  @VisibleForTesting
+  public long getIncomingTuplesCount()
   {
-    this.fieldInfo = fieldInfo;
+    return incomingTuplesCount;
   }
 
+  /**
+   * output port to emit validate records as map
+   */
+  public final transient DefaultOutputPort<Map<String, Object>> parsedOutput = new DefaultOutputPort<Map<String, Object>>();
   private static final Logger logger = LoggerFactory.getLogger(CsvParser.class);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
new file mode 100644
index 0000000..19f6a4b
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * This is schema that defines fields and their constraints for delimited files
+ * The operators use this information to validate the incoming tuples.
+ * Information from JSON schema is saved in this object and is used by the
+ * operators
+ * <p>
+ * <br>
+ * <br>
+ * Example schema <br>
+ * <br>
+ * {@code{ "separator": ",", "quoteChar":"\"", "fields": [ "name": "adId",
+ * "type": "Integer", "constraints": "required": "true" } , { "name": "adName",
+ * "type": "String", "constraints": { "required": "true", "pattern":
+ * "[a-z].*[a-z]$", "maxLength": "20" } }, { "name": "bidPrice", "type":
+ * "Double", "constraints": { "required": "true", "minValue": "0.1", "maxValue":
+ * "3.2" } }, { "name": "startDate", "type": "Date", "constraints": { "format":
+ * "dd/MM/yyyy" } }, { "name": "securityCode", "type": "Long", "constraints": {
+ * "minValue": "10", "maxValue": "30" } }, { "name": "active", "type":
+ * "Boolean", "constraints": { "required": "true" } } ] }}
+ */
+public class DelimitedSchema
+{
+
+  /**
+   * JSON key string for separator
+   */
+  private static final String SEPARATOR = "separator";
+  /**
+   * JSON key string for quote character
+   */
+  private static final String QUOTE_CHAR = "quoteChar";
+  /**
+   * JSON key string for line delimiter
+   */
+  private static final String LINE_DELIMITER = "lineDelimiter";
+  /**
+   * JSON key string for fields array
+   */
+  private static final String FIELDS = "fields";
+  /**
+   * JSON key string for name of the field within fields array
+   */
+  private static final String NAME = "name";
+  /**
+   * JSON key string for type of the field within fields array
+   */
+  private static final String TYPE = "type";
+  /**
+   * JSON key string for constraints for each field
+   */
+  private static final String CONSTRAINTS = "constraints";
+  /**
+   * JSON key string for required constraint
+   */
+  public static final String REQUIRED = "required";
+  /**
+   * JSON key string for equals constraint
+   */
+  public static final String EQUALS = "equals";
+  /**
+   * JSON key string for length constraint
+   */
+  public static final String LENGTH = "length";
+  /**
+   * JSON key string for min length constraint
+   */
+  public static final String MIN_LENGTH = "minLength";
+  /**
+   * JSON key string for max length constraint
+   */
+  public static final String MAX_LENGTH = "maxLength";
+  /**
+   * JSON key string for min value constraint
+   */
+  public static final String MIN_VALUE = "minValue";
+  /**
+   * JSON key string for max value constraint
+   */
+  public static final String MAX_VALUE = "maxValue";
+  /**
+   * JSON key string for regex pattern constraint
+   */
+  public static final String REGEX_PATTERN = "pattern";
+  /**
+   * JSON key string for date format constraint
+   */
+  public static final String DATE_FORMAT = "format";
+  /**
+   * JSON key string for locale constraint
+   */
+  public static final String LOCALE = "locale";
+  /**
+   * JSON key string for true value constraint
+   */
+  public static final String TRUE_VALUE = "trueValue";
+  /**
+   * JSON key string for false value constraint
+   */
+  public static final String FALSE_VALUE = "falseValue";
+  /**
+   * delimiter character provided in schema. Default is ,
+   */
+  private int delimiterChar = ',';
+  /**
+   * quote character provided in schema. Default is "
+   */
+  private char quoteChar = '\"';
+  /**
+   * line delimiter character provided in schema. Default is new line character
+   */
+  private String lineDelimiter = "\r\n";
+  /**
+   * This holds the list of field names in the same order as in the schema
+   */
+  private List<String> fieldNames = new LinkedList<String>();
+  /**
+   * This holds list of {@link Field}
+   */
+  private List<Field> fields = new LinkedList<Field>();
+
+  /**
+   * Supported data types
+   */
+  public enum FieldType
+  {
+    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
+  };
+
+  public DelimitedSchema(String json)
+  {
+    try {
+      initialize(json);
+    } catch (JSONException | IOException e) {
+      logger.error("{}", e);
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  /**
+   * For a given json string, this method sets the field members
+   * 
+   * @param json
+   * @throws JSONException
+   * @throws IOException
+   */
+  private void initialize(String json) throws JSONException, IOException
+  {
+    JSONObject jo = new JSONObject(json);
+    if (jo.has(SEPARATOR)) {
+      delimiterChar = ((String)jo.getString(SEPARATOR)).charAt(0);
+    }
+    if (jo.has(QUOTE_CHAR)) {
+      quoteChar = ((String)jo.getString(QUOTE_CHAR)).charAt(0);
+    }
+    if (jo.has(LINE_DELIMITER)) {
+      lineDelimiter = (String)jo.getString(LINE_DELIMITER);
+    }
+
+    JSONArray fieldArray = jo.getJSONArray(FIELDS);
+    for (int i = 0; i < fieldArray.length(); i++) {
+      JSONObject obj = fieldArray.getJSONObject(i);
+      Field field = new Field(obj.getString(NAME), obj.getString(TYPE));
+      fields.add(field);
+      fieldNames.add(field.name);
+      if (obj.has(CONSTRAINTS)) {
+        JSONObject constraints = obj.getJSONObject(CONSTRAINTS);
+        field.constraints = new ObjectMapper().readValue(constraints.toString(), HashMap.class);
+      }
+    }
+  }
+
+  /**
+   * Get the list of field names mentioned in schema
+   * 
+   * @return fieldNames
+   */
+  public List<String> getFieldNames()
+  {
+    return Collections.unmodifiableList(fieldNames);
+  }
+
+  /**
+   * Get the delimiter character
+   * 
+   * @return delimiterChar
+   */
+  public int getDelimiterChar()
+  {
+    return delimiterChar;
+  }
+
+  /**
+   * Get the quoteChar
+   * 
+   * @return quoteChar
+   */
+  public char getQuoteChar()
+  {
+    return quoteChar;
+  }
+
+  /**
+   * Get the line delimiter
+   * 
+   * @return lineDelimiter
+   */
+  public String getLineDelimiter()
+  {
+    return lineDelimiter;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DelimitedSchema [delimiterChar=" + delimiterChar + ", quoteChar=" + quoteChar + ", lineDelimiter="
+        + lineDelimiter + ", fieldNames=" + fieldNames + ", fields=" + fields + "]";
+  }
+
+  /**
+   * Get the list of Fields.
+   * 
+   * @return fields
+   */
+  public List<Field> getFields()
+  {
+    return Collections.unmodifiableList(fields);
+  }
+
+  /**
+   * Objects of this class represents a particular field in the schema. Each
+   * field has a name, type and a set of associated constraints.
+   * 
+   */
+  public class Field
+  {
+    /**
+     * name of the field
+     */
+    String name;
+    /**
+     * Data type of the field
+     */
+    FieldType type;
+    /**
+     * constraints associated with the field
+     */
+    Map<String, Object> constraints = new HashMap<String, Object>();
+
+    public Field(String name, String type)
+    {
+      this.name = name;
+      this.type = FieldType.valueOf(type.toUpperCase());
+    }
+
+    /**
+     * Get the name of the field
+     * 
+     * @return name
+     */
+    public String getName()
+    {
+      return name;
+    }
+
+    /**
+     * Set the name of the field
+     * 
+     * @param name
+     */
+    public void setName(String name)
+    {
+      this.name = name;
+    }
+
+    /**
+     * Get {@link FieldType}
+     * 
+     * @return type
+     */
+    public FieldType getType()
+    {
+      return type;
+    }
+
+    /**
+     * Set {@link FieldType}
+     * 
+     * @param type
+     */
+    public void setType(FieldType type)
+    {
+      this.type = type;
+    }
+
+    /**
+     * Get the map of constraints associated with the field
+     * 
+     * @return constraints
+     */
+    public Map<String, Object> getConstraints()
+    {
+      return constraints;
+    }
+
+    /**
+     * Sets the map of constraints associated with the field
+     * 
+     * @param constraints
+     */
+    public void setConstraints(Map<String, Object> constraints)
+    {
+      this.constraints = constraints;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Fields [name=" + name + ", type=" + type + ", constraints=" + constraints + "]";
+    }
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(DelimitedSchema.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java
index c9a4179..dc77aa8 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java
@@ -20,169 +20,514 @@ package com.datatorrent.contrib.parser;
 
 import java.util.Date;
 
-import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
 import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
 
 public class CsvPOJOParserTest
 {
 
-  CsvParser operator;
-  CollectorTestSink<Object> validDataSink;
-  CollectorTestSink<String> invalidDataSink;
+  private static final String filename = "schema.json";
+  CollectorTestSink<Object> error = new CollectorTestSink<Object>();
+  CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>();
+  CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
+  CsvParser parser = new CsvParser();
 
   @Rule
   public Watcher watcher = new Watcher();
 
   public class Watcher extends TestWatcher
   {
-
     @Override
     protected void starting(Description description)
     {
       super.starting(description);
-      operator = new CsvParser();
-      operator.setClazz(EmployeeBean.class);
-      operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date");
-      validDataSink = new CollectorTestSink<Object>();
-      invalidDataSink = new CollectorTestSink<String>();
-      TestUtils.setSink(operator.out, validDataSink);
-      TestUtils.setSink(operator.err, invalidDataSink);
+      parser.setClazz(Ad.class);
+      parser.setSchema(SchemaUtils.jarResourceFileToString(filename));
+      parser.setup(null);
+      parser.err.setSink(error);
+      parser.parsedOutput.setSink(objectPort);
+      parser.out.setSink(pojoPort);
     }
 
     @Override
     protected void finished(Description description)
     {
       super.finished(description);
-      operator.teardown();
+      error.clear();
+      objectPort.clear();
+      pojoPort.clear();
+      parser.teardown();
     }
-
   }
 
+  /*
+  * adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,isActive,isOptimized,parentCampaign,weatherTargeted,valid
+  * 1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes
+  * Constraints are defined in schema.json
+  */
   @Test
-  public void testCsvToPojoWriterDefault()
+  public void TestParserValidInput()
   {
-    operator.setup(null);
-    String tuple = "john,cs,1,01/01/2015";
-    operator.in.process(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
+    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, objectPort.collectedTuples.size());
+    Assert.assertEquals(1, pojoPort.collectedTuples.size());
+    Assert.assertEquals(0, error.collectedTuples.size());
+    Object obj = pojoPort.collectedTuples.get(0);
+    Ad adPojo = (Ad)obj;
     Assert.assertNotNull(obj);
-    Assert.assertEquals(EmployeeBean.class, obj.getClass());
-    EmployeeBean pojo = (EmployeeBean)obj;
-    Assert.assertEquals("john", pojo.getName());
-    Assert.assertEquals("cs", pojo.getDept());
-    Assert.assertEquals(1, pojo.getEid());
-    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
-        pojo.getDateOfJoining()));
+    Assert.assertEquals(Ad.class, obj.getClass());
+    Assert.assertEquals(1234, adPojo.getAdId());
+    Assert.assertTrue("adxyz".equals(adPojo.getAdName()));
+    Assert.assertEquals(0.2, adPojo.getBidPrice(), 0.0);
+    Assert.assertEquals(Date.class, adPojo.getStartDate().getClass());
+    Assert.assertEquals(Date.class, adPojo.getEndDate().getClass());
+    Assert.assertEquals(12, adPojo.getSecurityCode());
+    Assert.assertTrue("CAMP_AD".equals(adPojo.getParentCampaign()));
+    Assert.assertTrue(adPojo.isActive());
+    Assert.assertFalse(adPojo.isOptimized());
+    Assert.assertTrue("yes".equals(adPojo.getValid()));
   }
 
   @Test
-  public void testCsvToPojoWriterDateFormat()
+  public void TestParserValidInputPojoPortNotConnected()
   {
-    operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy");
-    operator.setup(null);
-    String tuple = "john,cs,1,01-JAN-2015";
-    operator.in.process(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
+    parser.out.setSink(null);
+    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(0, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserValidInputClassNameNotProvided()
+  {
+    parser.setClazz(null);
+    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(0, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserInvalidAdIdInput()
+  {
+    String input = ",98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserNoCampaignIdInput()
+  {
+    String input = "1234,,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, objectPort.collectedTuples.size());
+    Assert.assertEquals(1, pojoPort.collectedTuples.size());
+    Object obj = pojoPort.collectedTuples.get(0);
     Assert.assertNotNull(obj);
-    Assert.assertEquals(EmployeeBean.class, obj.getClass());
-    EmployeeBean pojo = (EmployeeBean)obj;
-    Assert.assertEquals("john", pojo.getName());
-    Assert.assertEquals("cs", pojo.getDept());
-    Assert.assertEquals(1, pojo.getEid());
-    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
-        pojo.getDateOfJoining()));
+    Assert.assertEquals(Ad.class, obj.getClass());
+    Assert.assertEquals(0, error.collectedTuples.size());
   }
 
   @Test
-  public void testCsvToPojoWriterDateFormatMultiple()
+  public void TestParserInvalidCampaignIdInput()
   {
-    operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy,dateOfBirth:date");
-    operator.setup(null);
-    String tuple = "john,cs,1,01-JAN-2015,01/01/2015";
-    operator.in.process(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
+    String input = "1234,9833,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserInvalidAdNameInput()
+  {
+    String input = "1234,98233,adxyz123,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserInvalidBidPriceInput()
+  {
+    String input = "1234,98233,adxyz,3.3,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserInvalidStartDateInput()
+  {
+    String input = "1234,98233,adxyz,0.2,2015-30-08 02:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserInvalidSecurityCodeInput()
+  {
+    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,85,y,,CAMP_AD,Y,yes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserInvalidisActiveInput()
+  {
+    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,yo,,CAMP_AD,Y,yes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserInvalidParentCampaignInput()
+  {
+    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP,Y,yes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserValidisOptimized()
+  {
+    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes";
+    parser.in.process(input.getBytes());
+    Assert.assertEquals(1, objectPort.collectedTuples.size());
+    Assert.assertEquals(1, pojoPort.collectedTuples.size());
+    Object obj = pojoPort.collectedTuples.get(0);
     Assert.assertNotNull(obj);
-    Assert.assertEquals(EmployeeBean.class, obj.getClass());
-    EmployeeBean pojo = (EmployeeBean)obj;
-    Assert.assertEquals("john", pojo.getName());
-    Assert.assertEquals("cs", pojo.getDept());
-    Assert.assertEquals(1, pojo.getEid());
-    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
-        pojo.getDateOfJoining()));
-    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
-        pojo.getDateOfBirth()));
+    Assert.assertEquals(Ad.class, obj.getClass());
+    Assert.assertEquals(0, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserInValidisOptimized()
+  {
+    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION,CAMP,Y,yes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserInValidWeatherTargeting()
+  {
+    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,NO,yes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserNullOrBlankInput()
+  {
+    parser.beginWindow(0);
+    parser.in.process(null);
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserHeaderAsInput()
+  {
+    String input = "adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,active,optimized,parentCampaign,weatherTargeted,valid";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserLessFields()
+  {
+    parser.beginWindow(0);
+    parser.in.process("1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION".getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
   }
 
-  public static class EmployeeBean
+  @Test
+  public void TestParserMoreFields()
   {
 
-    private String name;
-    private String dept;
-    private int eid;
-    private Date dateOfJoining;
-    private Date dateOfBirth;
+    parser.beginWindow(0);
+    parser.in.process("1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION,CAMP_AD,Y,yes,ExtraField"
+        .getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
 
-    public String getName()
+  @Test
+  public void TestParserValidInputMetricVerification()
+  {
+    parser.beginWindow(0);
+    Assert.assertEquals(0, parser.parsedOutputCount);
+    Assert.assertEquals(0, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, parser.parsedOutputCount);
+    Assert.assertEquals(1, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(1, parser.getEmittedObjectCount());
+  }
+
+  @Test
+  public void TestParserInvalidInputMetricVerification()
+  {
+    parser.beginWindow(0);
+    Assert.assertEquals(0, parser.parsedOutputCount);
+    Assert.assertEquals(0, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+    parser.in.process("1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION,CAMP_AD,Y,yes,ExtraField"
+        .getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, parser.parsedOutputCount);
+    Assert.assertEquals(1, parser.getIncomingTuplesCount());
+    Assert.assertEquals(1, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+  }
+
+  @Test
+  public void TestParserValidInputMetricResetCheck()
+  {
+    parser.beginWindow(0);
+    Assert.assertEquals(0, parser.parsedOutputCount);
+    Assert.assertEquals(0, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+    String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, parser.parsedOutputCount);
+    Assert.assertEquals(1, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(1, parser.getEmittedObjectCount());
+    parser.beginWindow(1);
+    Assert.assertEquals(0, parser.parsedOutputCount);
+    Assert.assertEquals(0, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+    parser.in.process(input.getBytes());
+    Assert.assertEquals(1, parser.parsedOutputCount);
+    Assert.assertEquals(1, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(1, parser.getEmittedObjectCount());
+    parser.endWindow();
+  }
+
+  public static class Ad
+  {
+
+    private int adId;
+    private int campaignId;
+    private String adName;
+    private double bidPrice;
+    private Date startDate;
+    private Date endDate;
+    private long securityCode;
+    private boolean active;
+    private boolean optimized;
+    private String parentCampaign;
+    private Character weatherTargeted;
+    private String valid;
+
+    public Ad()
+    {
+
+    }
+
+    public int getAdId()
+    {
+      return adId;
+    }
+
+    public void setAdId(int adId)
+    {
+      this.adId = adId;
+    }
+
+    public int getCampaignId()
+    {
+      return campaignId;
+    }
+
+    public void setCampaignId(int campaignId)
+    {
+      this.campaignId = campaignId;
+    }
+
+    public String getAdName()
+    {
+      return adName;
+    }
+
+    public void setAdName(String adName)
+    {
+      this.adName = adName;
+    }
+
+    public double getBidPrice()
+    {
+      return bidPrice;
+    }
+
+    public void setBidPrice(double bidPrice)
+    {
+      this.bidPrice = bidPrice;
+    }
+
+    public Date getStartDate()
+    {
+      return startDate;
+    }
+
+    public void setStartDate(Date startDate)
+    {
+      this.startDate = startDate;
+    }
+
+    public Date getEndDate()
     {
-      return name;
+      return endDate;
     }
 
-    public void setName(String name)
+    public void setEndDate(Date endDate)
     {
-      this.name = name;
+      this.endDate = endDate;
     }
 
-    public String getDept()
+    public long getSecurityCode()
     {
-      return dept;
+      return securityCode;
     }
 
-    public void setDept(String dept)
+    public void setSecurityCode(long securityCode)
     {
-      this.dept = dept;
+      this.securityCode = securityCode;
     }
 
-    public int getEid()
+    public boolean isActive()
     {
-      return eid;
+      return active;
     }
 
-    public void setEid(int eid)
+    public void setActive(boolean active)
     {
-      this.eid = eid;
+      this.active = active;
     }
 
-    public Date getDateOfJoining()
+    public boolean isOptimized()
     {
-      return dateOfJoining;
+      return optimized;
     }
 
-    public void setDateOfJoining(Date dateOfJoining)
+    public void setOptimized(boolean optimized)
     {
-      this.dateOfJoining = dateOfJoining;
+      this.optimized = optimized;
     }
 
-    public Date getDateOfBirth()
+    public String getParentCampaign()
     {
-      return dateOfBirth;
+      return parentCampaign;
     }
 
-    public void setDateOfBirth(Date dateOfBirth)
+    public void setParentCampaign(String parentCampaign)
+    {
+      this.parentCampaign = parentCampaign;
+    }
+
+    public Character getWeatherTargeted()
+    {
+      return weatherTargeted;
+    }
+
+    public void setWeatherTargeted(Character weatherTargeted)
+    {
+      this.weatherTargeted = weatherTargeted;
+    }
+
+    public String getValid()
+    {
+      return valid;
+    }
+
+    public void setValid(String valid)
+    {
+      this.valid = valid;
+    }
+
+    @Override
+    public String toString()
     {
-      this.dateOfBirth = dateOfBirth;
+      return "Ad [adId=" + adId + ", campaignId=" + campaignId + ", adName=" + adName + ", bidPrice=" + bidPrice
+          + ", startDate=" + startDate + ", endDate=" + endDate + ", securityCode=" + securityCode + ", active="
+          + active + ", optimized=" + optimized + ", parentCampaign=" + parentCampaign + ", weatherTargeted="
+          + weatherTargeted + ", valid=" + valid + "]";
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/test/resources/schema.json
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/schema.json b/contrib/src/test/resources/schema.json
new file mode 100644
index 0000000..13e2789
--- /dev/null
+++ b/contrib/src/test/resources/schema.json
@@ -0,0 +1,96 @@
+{
+    "separator": ",",
+    "quoteChar":"\"",
+    "fields": [
+        {
+            "name": "adId",
+            "type": "Integer",
+            "constraints": {
+                "required": "true"
+            }
+        },
+        {
+            "name": "campaignId",
+            "type": "Integer",
+            "constraints": {
+                "equals": "98233"
+            }
+        },
+        {
+            "name": "adName",
+            "type": "String",
+            "constraints": {
+                "required": "true",
+                "pattern": "[a-z].*[a-z]$",
+                "maxLength": "10"
+            }
+        },
+        {
+            "name": "bidPrice",
+            "type": "Double",
+            "constraints": {
+                "required": "true",
+                "minValue": "0.1",
+                "maxValue": "3.2"
+            }
+        },
+        {
+            "name": "startDate",
+            "type": "Date",
+            "constraints": {
+                "format": "yyyy-MM-dd HH:mm:ss",
+                "locale":"IN"
+            }
+        },
+        {
+            "name": "endDate",
+            "type": "Date",
+            "constraints": {
+                "format": "dd/MM/yyyy"
+            }
+        },
+        {
+            "name": "securityCode",
+            "type": "Long",
+            "constraints": {
+                "minValue": "10",
+                "maxValue": "30"
+            }
+        },
+        {
+            "name": "active",
+            "type": "Boolean",
+            "constraints": {
+                "required": "true"                
+            }
+        },
+        {
+            "name": "optimized",
+            "type": "Boolean",
+            "constraints": {                  
+                "trueValue":"OPTIMIZE",
+                "falseValue":"NO_OPTIMIZE"                
+            }
+        },
+        {
+            "name": "parentCampaign",
+            "type": "String",
+            "constraints": {
+                "required": "true",
+                "equals": "CAMP_AD"
+            }
+        },
+        {
+            "name": "weatherTargeted",
+            "type": "Character",
+            "constraints": {
+                "required": "true",
+                "equals": "Y"
+            }
+        },
+        {
+            "name": "valid",
+            "type": "String"            
+        }
+    ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
index 9e8ee0f..4e9800a 100644
--- a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
+++ b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
@@ -81,7 +81,7 @@ public class JsonParser extends Parser<String, String>
   }
 
   @Override
-  public String processErorrTuple(String input)
+  public String processErrorTuple(String input)
   {
     return input;
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/library/src/main/java/com/datatorrent/lib/parser/Parser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/Parser.java b/library/src/main/java/com/datatorrent/lib/parser/Parser.java
index 5bcc1c5..4f591f1 100644
--- a/library/src/main/java/com/datatorrent/lib/parser/Parser.java
+++ b/library/src/main/java/com/datatorrent/lib/parser/Parser.java
@@ -82,7 +82,7 @@ public abstract class Parser<INPUT, ERROROUT> extends BaseOperator implements Co
     Object tuple = convert(inputTuple);
     if (tuple == null && err.isConnected()) {
       errorTupleCount++;
-      err.emit(processErorrTuple(inputTuple));
+      err.emit(processErrorTuple(inputTuple));
       return;
     }
     if (out.isConnected()) {
@@ -91,7 +91,7 @@ public abstract class Parser<INPUT, ERROROUT> extends BaseOperator implements Co
     }
   }
 
-  public abstract ERROROUT processErorrTuple(INPUT input);
+  public abstract ERROROUT processErrorTuple(INPUT input);
 
   @Override
   public void beginWindow(long windowId)

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
index 3d416e1..c8eeacc 100644
--- a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
+++ b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
@@ -123,7 +123,7 @@ public class XmlParser extends Parser<String, String>
   }
 
   @Override
-  public String processErorrTuple(String input)
+  public String processErrorTuple(String input)
   {
     return input;
   }