You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/01/10 12:02:29 UTC

apex-malhar git commit: APEXMALHAR-2259 Code changes to add fixed width parser

Repository: apex-malhar
Updated Branches:
  refs/heads/master 16b15c21b -> 0852c6594


APEXMALHAR-2259 Code changes to add fixed width parser


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0852c659
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0852c659
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0852c659

Branch: refs/heads/master
Commit: 0852c659400a2bd484a72d4539cdbaced1c5239f
Parents: 16b15c2
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Fri Sep 23 11:43:16 2016 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Tue Jan 10 16:30:50 2017 +0530

----------------------------------------------------------------------
 .../contrib/formatter/CsvFormatter.java         |   2 +-
 .../contrib/parser/CellProcessorBuilder.java    |   2 +-
 .../contrib/parser/DelimitedSchema.java         | 131 +----
 .../contrib/parser/FixedWidthParser.java        | 493 +++++++++++++++++
 .../contrib/parser/FixedWidthSchema.java        | 379 +++++++++++++
 .../com/datatorrent/contrib/parser/Schema.java  | 176 +++++++
 .../contrib/parser/FixedWidthTest.java          | 526 +++++++++++++++++++
 .../src/test/resources/FixedWidthSchema.json    |  78 +++
 8 files changed, 1670 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
index 2979b44..2bd0e67 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
@@ -40,7 +40,7 @@ import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
 import com.datatorrent.contrib.parser.DelimitedSchema;
 import com.datatorrent.contrib.parser.DelimitedSchema.Field;
-import com.datatorrent.contrib.parser.DelimitedSchema.FieldType;
+import com.datatorrent.contrib.parser.Schema.FieldType;
 import com.datatorrent.lib.formatter.Formatter;
 import com.datatorrent.netlet.util.DTThrowable;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
index e7840aa..292031e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
@@ -40,7 +40,7 @@ import org.supercsv.util.CsvContext;
 
 import org.apache.commons.lang3.StringUtils;
 
-import com.datatorrent.contrib.parser.DelimitedSchema.FieldType;
+import com.datatorrent.contrib.parser.Schema.FieldType;
 
 /**
  * Helper class with methods to generate CellProcessor objects. Cell processors

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
index 29b2c92..a47e138 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
@@ -52,40 +52,9 @@ import org.slf4j.LoggerFactory;
  * "dd/MM/yyyy" } }, { "name": "securityCode", "type": "Long", "constraints": {
  * "minValue": "10", "maxValue": "30" } }, { "name": "active", "type":
  * "Boolean", "constraints": { "required": "true" } } ] }}
- *
- * @since 3.4.0
  */
-public class DelimitedSchema
+public class DelimitedSchema extends Schema
 {
-
-  /**
-   * JSON key string for separator
-   */
-  private static final String SEPARATOR = "separator";
-  /**
-   * JSON key string for quote character
-   */
-  private static final String QUOTE_CHAR = "quoteChar";
-  /**
-   * JSON key string for line delimiter
-   */
-  private static final String LINE_DELIMITER = "lineDelimiter";
-  /**
-   * JSON key string for fields array
-   */
-  private static final String FIELDS = "fields";
-  /**
-   * JSON key string for name of the field within fields array
-   */
-  private static final String NAME = "name";
-  /**
-   * JSON key string for type of the field within fields array
-   */
-  private static final String TYPE = "type";
-  /**
-   * JSON key string for constraints for each field
-   */
-  private static final String CONSTRAINTS = "constraints";
   /**
    * JSON key string for required constraint
    */
@@ -119,21 +88,26 @@ public class DelimitedSchema
    */
   public static final String REGEX_PATTERN = "pattern";
   /**
-   * JSON key string for date format constraint
-   */
-  public static final String DATE_FORMAT = "format";
-  /**
    * JSON key string for locale constraint
    */
   public static final String LOCALE = "locale";
   /**
-   * JSON key string for true value constraint
+   * JSON key string for separator
+   */
+  private static final String SEPARATOR = "separator";
+  /**
+   * JSON key string for quote character
+   */
+  private static final String QUOTE_CHAR = "quoteChar";
+  /**
+   * JSON key string for line delimiter
    */
-  public static final String TRUE_VALUE = "trueValue";
+  private static final String LINE_DELIMITER = "lineDelimiter";
   /**
-   * JSON key string for false value constraint
+   * JSON key string for constraints for each field
    */
-  public static final String FALSE_VALUE = "falseValue";
+  private static final String CONSTRAINTS = "constraints";
+  private static final Logger logger = LoggerFactory.getLogger(DelimitedSchema.class);
   /**
    * delimiter character provided in schema. Default is ,
    */
@@ -147,22 +121,10 @@ public class DelimitedSchema
    */
   private String lineDelimiter = "\r\n";
   /**
-   * This holds the list of field names in the same order as in the schema
-   */
-  private List<String> fieldNames = new LinkedList<String>();
-  /**
    * This holds list of {@link Field}
    */
   private List<Field> fields = new LinkedList<Field>();
 
-  /**
-   * Supported data types
-   */
-  public enum FieldType
-  {
-    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
-  };
-
   public DelimitedSchema(String json)
   {
     try {
@@ -207,16 +169,6 @@ public class DelimitedSchema
   }
 
   /**
-   * Get the list of field names mentioned in schema
-   *
-   * @return fieldNames
-   */
-  public List<String> getFieldNames()
-  {
-    return Collections.unmodifiableList(fieldNames);
-  }
-
-  /**
    * Get the delimiter character
    *
    * @return delimiterChar
@@ -268,65 +220,16 @@ public class DelimitedSchema
    * field has a name, type and a set of associated constraints.
    *
    */
-  public class Field
+  public class Field extends Schema.Field
   {
     /**
-     * name of the field
-     */
-    String name;
-    /**
-     * Data type of the field
-     */
-    FieldType type;
-    /**
      * constraints associated with the field
      */
     Map<String, Object> constraints = new HashMap<String, Object>();
 
     public Field(String name, String type)
     {
-      this.name = name;
-      this.type = FieldType.valueOf(type.toUpperCase());
-    }
-
-    /**
-     * Get the name of the field
-     *
-     * @return name
-     */
-    public String getName()
-    {
-      return name;
-    }
-
-    /**
-     * Set the name of the field
-     *
-     * @param name
-     */
-    public void setName(String name)
-    {
-      this.name = name;
-    }
-
-    /**
-     * Get {@link FieldType}
-     *
-     * @return type
-     */
-    public FieldType getType()
-    {
-      return type;
-    }
-
-    /**
-     * Set {@link FieldType}
-     *
-     * @param type
-     */
-    public void setType(FieldType type)
-    {
-      this.type = type;
+      super(name, type);
     }
 
     /**
@@ -356,6 +259,4 @@ public class DelimitedSchema
     }
   }
 
-  private static final Logger logger = LoggerFactory.getLogger(DelimitedSchema.class);
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthParser.java
new file mode 100644
index 0000000..9ee556e
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthParser.java
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.contrib.parser;
+
+import java.lang.reflect.Field;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.ClassUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.univocity.parsers.fixed.FieldAlignment;
+import com.univocity.parsers.fixed.FixedWidthFields;
+import com.univocity.parsers.fixed.FixedWidthParserSettings;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Operator that parses a fixed width record against a specified schema <br>
+ * Schema is specified in a json format as per {@link FixedWidthSchema} that
+ * contains field information for each field.<br>
+ * Assumption is that each field in the data should map to a simple
+ * java type.<br>
+ * <br>
+ * <b>Properties</b> <br>
+ * <b>schema</b>:schema as a string<br>
+ * <b>clazz</b>:Pojo class <br>
+ * <b>Ports</b> <br>
+ * <b>in</b>:input tuple as a byte array. Each tuple represents a record<br>
+ * <b>parsedOutput</b>:tuples that are validated against the schema are emitted
+ * as HashMap<String,Object> on this port<br>
+ * Key being the name of the field and Val being the value of the field.
+ * <b>out</b>:tuples that are validated against the schema are emitted as pojo
+ * on this port<br>
+ * <b>err</b>:tuples that do not confine to schema are emitted on this port as
+ * KeyValPair<String,String><br>
+ * Key being the tuple and Val being the reason.
+ *
+ * @displayName FixedWidthParser
+ * @category Parsers
+ * @tags fixedwidth pojo parser
+ */
+public class FixedWidthParser extends Parser<byte[], KeyValPair<String, String>> implements Operator.ActivationListener<Context>
+{
+  private static final Logger logger = LoggerFactory.getLogger(FixedWidthParser.class);
+  public final transient DefaultOutputPort<HashMap<String, Object>> parsedOutput = new DefaultOutputPort<HashMap<String, Object>>();
+  /**
+   * Metric to keep count of number of tuples emitted on {@link #parsedOutput}
+   * port
+   */
+  @AutoMetric
+  private long parsedOutputCount;
+  /**
+   * Contents of the schema.Schema is specified in a json format as per
+   * {@link FixedWidthSchema}
+   */
+  @NotNull
+  private String jsonSchema;
+  /**
+   * Total length of the record
+   */
+  private int recordLength;
+  /**
+   * Schema is read into this object to access fields
+   */
+  private transient FixedWidthSchema schema;
+  /**
+   * List of setters to set the value in POJO to be emitted
+   */
+  private transient List<FixedWidthParser.TypeInfo> setters;
+  /**
+   * header- This will be string of field names, padded with padding character (if required)
+   */
+  private transient String header;
+  /**
+   * Univocity Parser to parse the input tuples
+   */
+  private com.univocity.parsers.fixed.FixedWidthParser univocityParser;
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    parsedOutputCount = 0;
+  }
+
+  @Override
+  public void processTuple(byte[] tuple)
+  {
+    if (tuple == null) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
+        logger.error("Tuple could not be parsed. Reason Blank/null tuple");
+      }
+      errorTupleCount++;
+      return;
+    }
+    String incomingString = new String(tuple);
+    if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, getHeader())) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<>(incomingString, "Blank/header tuple"));
+        logger.error("Tuple could not be parsed. Reason Blank/header tuple");
+      }
+      errorTupleCount++;
+      return;
+    }
+    if (incomingString.length() < recordLength) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<>(incomingString, "Record length mis-match/shorter tuple"));
+      }
+      logger.error("Tuple could not be parsed. Reason Record length mis-match/shorter tuple. " +
+        "Expected length " + recordLength + " Actual length " + incomingString.length());
+      errorTupleCount++;
+      return;
+    }
+    if (incomingString.length() > recordLength) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<>(incomingString, "Record length mis-match/longer tuple"));
+      }
+      logger.error("Tuple could not be parsed. Reason Record length mis-match/longer tuple. " +
+        "Expected length " + recordLength + " Actual length " + incomingString.length());
+      errorTupleCount++;
+      return;
+    }
+    try {
+      String[] values = univocityParser.parseLine(incomingString);
+      HashMap<String, Object> toEmit = new HashMap();
+      Object pojo = validateAndSet(values, toEmit);
+      if (parsedOutput.isConnected()) {
+        parsedOutput.emit(toEmit);
+        parsedOutputCount++;
+      }
+      if (out.isConnected() && clazz != null) {
+        out.emit(pojo);
+        emittedObjectCount++;
+      }
+    } catch (Exception e) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<>(incomingString, e.getMessage()));
+      }
+      errorTupleCount++;
+      logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
+    }
+  }
+
+  @Override
+  public KeyValPair<String, String> processErrorTuple(byte[] input)
+  {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public Object convert(byte[] tuple)
+  {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    try {
+      schema = new FixedWidthSchema(jsonSchema);
+      recordLength = 0;
+      List<FixedWidthSchema.Field> fields = schema.getFields();
+      for (int i = 0; i < fields.size(); i++) {
+        recordLength += fields.get(i).getFieldLength();
+      }
+      createUnivocityParser();
+    } catch (Exception e) {
+      logger.error("Cannot setup Parser Reason {}", e.getMessage());
+      throw e;
+    }
+  }
+
+  /**
+   * Activate the Parser
+   */
+  @Override
+  public void activate(Context context)
+  {
+    try {
+      if (clazz != null) {
+        setters = new ArrayList<>();
+        List<String> fieldNames = schema.getFieldNames();
+        if (fieldNames != null) {
+          for (String fieldName : fieldNames) {
+            addSetter(fieldName);
+          }
+        }
+      }
+    } catch (Exception e) {
+      logger.error("Cannot activate Parser Reason {}", e.getMessage());
+      throw e;
+    }
+  }
+
+  /**
+   * Function to create a univocity Parser
+   */
+  private void createUnivocityParser()
+  {
+    List<FixedWidthSchema.Field> fields = schema.getFields();
+    FixedWidthFields fieldWidthFields = new FixedWidthFields();
+
+    for (int i = 0; i < fields.size(); i++) {
+      FixedWidthSchema.Field currentField = fields.get(i);
+      int fieldLength = currentField.getFieldLength();
+      FieldAlignment currentFieldAlignment;
+
+      if (currentField.getAlignment().equalsIgnoreCase("centre")) {
+        currentFieldAlignment = FieldAlignment.CENTER;
+      } else if (currentField.getAlignment().equalsIgnoreCase("left")) {
+        currentFieldAlignment = FieldAlignment.LEFT;
+      } else {
+        currentFieldAlignment = FieldAlignment.RIGHT;
+      }
+      fieldWidthFields.addField(currentField.getName(), fieldLength, currentFieldAlignment, currentField.getPadding());
+    }
+    FixedWidthParserSettings settings = new FixedWidthParserSettings(fieldWidthFields);
+    univocityParser = new com.univocity.parsers.fixed.FixedWidthParser(settings);
+  }
+
+  @Override
+  public void deactivate()
+  {
+
+  }
+
+  /**
+   * Function to add a setter for a field and add it
+   * to the List of setters
+   *
+   * @param fieldName name of the field for which setter is to be added
+   */
+  private void addSetter(String fieldName)
+  {
+    try {
+      Field f = clazz.getDeclaredField(fieldName);
+      FixedWidthParser.TypeInfo t = new FixedWidthParser.TypeInfo(f.getName(),
+        ClassUtils.primitiveToWrapper(f.getType()));
+      t.setter = PojoUtils.createSetter(clazz, t.name, t.type);
+      setters.add(t);
+    } catch (NoSuchFieldException e) {
+      throw new RuntimeException("Field " + fieldName + " not found in class " + clazz, e);
+    } catch (Exception e) {
+      throw new RuntimeException("Exception while adding a setter" + e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Function to validate individual parsed values and set the objects to be emitted
+   * @param values array of String containing individual parsed values
+   * @param toEmit the map to be emitted
+   * @return POJO the object to be returned (if the tuple class is set)
+   */
+  private Object validateAndSet(String[] values, HashMap toEmit)
+  {
+    Object pojoObject = null;
+    try {
+      List<FixedWidthSchema.Field> fields = schema.getFields();
+      try {
+        if (clazz != null) {
+          pojoObject = clazz.newInstance();
+        }
+      } catch (InstantiationException ie) {
+        throw new RuntimeException("Exception in instantiating", ie);
+      }
+      for (int i = 0; i < fields.size(); i++) {
+        FixedWidthSchema.Field currentField = fields.get(i);
+        FixedWidthParser.TypeInfo typeInfo = setters.get(i);
+        validateAndSetCurrentField(currentField,
+          values[i], typeInfo, pojoObject, toEmit);
+      }
+    } catch (StringIndexOutOfBoundsException e) {
+      throw new RuntimeException("Record length and tuple length mismatch ", e);
+    } catch (IllegalAccessException ie) {
+      throw new RuntimeException("Illegal Access ", ie);
+    } catch (Exception e) {
+      throw new RuntimeException("Exception in validation", e);
+    }
+    return pojoObject;
+  }
+
+  /**
+   * Function to validate and set the current field.
+   * @param currentField the field which is to be validated and set
+   * @param value the parsed value of the field
+   * @param typeInfo information about the field in POJO
+   * @param pojoObject POJO which is to be set
+   * @param toEmit the map to be emitted
+   */
+  private void validateAndSetCurrentField(FixedWidthSchema.Field currentField,
+    String value, FixedWidthParser.TypeInfo typeInfo, Object pojoObject, HashMap toEmit)
+  {
+    try {
+      String fieldName = currentField.getName();
+      if (value != null && !value.isEmpty()) {
+        Object result;
+        switch (currentField.getType()) {
+          case INTEGER:
+            result = Integer.parseInt(value);
+            break;
+          case DOUBLE:
+            result = Double.parseDouble(value);
+            break;
+          case STRING:
+            result = value;
+            break;
+          case CHARACTER:
+            result = value.charAt(0);
+            break;
+          case FLOAT:
+            result = Float.parseFloat(value);
+            break;
+          case LONG:
+            result = Long.parseLong(value);
+            break;
+          case SHORT:
+            result = Short.parseShort(value);
+            break;
+          case BOOLEAN:
+            if (value.compareToIgnoreCase(currentField.getTrueValue()) == 0) {
+              result = Boolean.parseBoolean("true");
+            } else if (value.compareToIgnoreCase(currentField.getFalseValue()) == 0) {
+              result = Boolean.parseBoolean("false");
+            } else {
+              throw new NumberFormatException();
+            }
+            break;
+          case DATE:
+            DateFormat df = new SimpleDateFormat(currentField.getDateFormat());
+            df.setLenient(false);
+            result = df.parse(value);
+            break;
+          default:
+            throw new RuntimeException("Invalid Type in Field", new Exception());
+        }
+        toEmit.put(fieldName,result);
+        if (typeInfo != null && pojoObject != null) {
+          typeInfo.setter.set(pojoObject, result);
+        }
+      } else {
+        toEmit.put(fieldName,value);
+      }
+    } catch (NumberFormatException e) {
+      throw new RuntimeException("Error parsing" + value + " to Integer type", e);
+    } catch (ParseException e) {
+      throw new RuntimeException("Error parsing" + value, e);
+    }catch (Exception e) {
+      throw new RuntimeException("Error setting " + value + " in the given class" + typeInfo.toString(), e);
+    }
+  }
+
+  /**
+   * Get the schema
+   *
+   * @return the Json schema
+   */
+  public String getJsonSchema()
+  {
+    return jsonSchema;
+  }
+
+  /**
+   * Set the jsonSchema
+   *
+   * @param jsonSchema schema to be set.
+   */
+  public void setJsonSchema(String jsonSchema)
+  {
+    this.jsonSchema = jsonSchema;
+  }
+
+  /**
+   * Get the header
+   *
+   * @return header- This will be string of field names, padded with padding character (if required)
+   */
+  public String getHeader()
+  {
+    return header;
+  }
+
+  /**
+   * Set the header
+   *
+   * @param header- This will be string of field names, padded with padding character (if required)
+   */
+  public void setHeader(String header)
+  {
+    this.header = header;
+  }
+
+  /**
+   * Get errorTupleCount
+   *
+   * @return errorTupleCount number of erroneous tuples.
+   */
+  @VisibleForTesting
+  public long getErrorTupleCount()
+  {
+    return errorTupleCount;
+  }
+
+  /**
+   * Get emittedObjectCount
+   *
+   * @return emittedObjectCount count of objects emitted.
+   */
+  @VisibleForTesting
+  public long getEmittedObjectCount()
+  {
+    return emittedObjectCount;
+  }
+
+  /**
+   * Get incomingTuplesCount
+   *
+   * @return incomingTuplesCount number of incoming tuples.
+   */
+  @VisibleForTesting
+  public long getIncomingTuplesCount()
+  {
+    return incomingTuplesCount;
+  }
+
+  /**
+   * Get parsedOutputCount
+   *
+   * @return parsedOutPutCount count of well parsed tuples.
+   */
+  @VisibleForTesting
+  public long getParsedOutputCount()
+  {
+    return parsedOutputCount;
+  }
+
+  /**
+   * Objects of this class represents a particular data member of the Class to be emitted.
+   * Each data member  has a name, type and a accessor(setter) function associated with it.
+   */
+  static class TypeInfo
+  {
+    String name;
+    Class type;
+    PojoUtils.Setter setter;
+
+    public TypeInfo(String name, Class<?> type)
+    {
+      this.name = name;
+      this.type = type;
+    }
+
+    public String toString()
+    {
+      return "'name': " + name + " 'type': " + type;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthSchema.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthSchema.java b/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthSchema.java
new file mode 100644
index 0000000..e64125b
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthSchema.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.contrib.parser;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * This is schema that defines fields and their constraints for fixed width files
+ * The operators use this information to validate the incoming tuples.
+ * Information from JSON schema is saved in this object and is used by the
+ * operators
+ * <p>
+ * <br>
+ * <br>
+ * Example schema <br>
+ * <br>
+ * {{ "padding": " ","alignment"="left","fields": [ {"name": "adId",
+ * "type": "Integer","padding":"0", "length": 3} , { "name": "adName",
+ * "type": "String", "alignment": "right","fieldLength": 20}, { "name": "bidPrice", "type":
+ * "Double", "length": 5}, { "name": "startDate", "type": "Date", "length": 10,
+ * "format":"dd/MM/yyyy" }, { "name": "securityCode", "type": "Long","length": 10 },
+ * { "name": "active", "type":"Boolean","length": 2} ] }}
+ */
+public class FixedWidthSchema extends Schema
+{
+  /**
+   * JSON key string for record length
+   */
+  public static final String FIELD_LENGTH = "length";
+  /**
+   * JSON key string for Padding Character
+   */
+  public static final String FIELD_PADDING_CHARACTER = "padding";
+  /**
+   * Default Padding Character
+   */
+  public static final char DEFAULT_PADDING_CHARACTER = ' ';
+  /**
+   * Default Alignment
+   */
+  public static final String DEFAULT_ALIGNMENT= "left";
+  /**
+   * JSON key string for Field Alignment
+   */
+  public static final String FIELD_ALIGNMENT ="alignment";
+
+  public static final Logger logger = LoggerFactory.getLogger(FixedWidthSchema.class);
+  /**
+   * This holds list of {@link Field}
+   */
+  private List<Field> fields = new LinkedList<>();
+  /**
+   * This holds the padding character for the entire file
+   */
+  private char globalPadding;
+  /**
+   * This holds the global alignment
+   */
+  private String globalAlignment;
+
+  /**
+   * Constructor for FixedWidthSchema
+   */
+  public FixedWidthSchema(String json)
+  {
+    try {
+      initialize(json);
+    } catch (JSONException | IOException e) {
+      logger.error("{}", e);
+      throw new IllegalArgumentException(e);
+    }
+  }
+
+  /**
+   * Get the Padding character
+   * @return the padding character for the entire file
+   */
+  public char getGlobalPadding()
+  {
+    return globalPadding;
+  }
+
+  /**
+   * Set the padding character
+   * @param padding the padding character for the entire file
+   */
+  public void setGlobalPadding(char padding)
+  {
+    this.globalPadding = padding;
+  }
+
+  /**
+   * Get the global alignment
+   * @return globalAlignment the global alignment for the entire file.
+   */
+  public String getGlobalAlignment()
+  {
+    return globalAlignment;
+  }
+
+  /**
+   * Set the global alignment
+   * @param globalAlignment the global alignment for the entire file
+   */
+  public void setGlobalAlignment(String globalAlignment)
+  {
+    this.globalAlignment = globalAlignment;
+  }
+
+  /**
+   * For a given json string, this method sets the field members
+   *
+   * @param json schema as provided by the user.
+   */
+  private void initialize(String json) throws JSONException, IOException
+  {
+
+    JSONObject jo = new JSONObject(json);
+    JSONArray fieldArray = jo.getJSONArray(FIELDS);
+    if (jo.has(FIELD_PADDING_CHARACTER)) {
+      globalPadding = jo.getString(FIELD_PADDING_CHARACTER).charAt(0);
+    } else {
+      globalPadding = DEFAULT_PADDING_CHARACTER;
+    }
+    if (jo.has(FIELD_ALIGNMENT)) {
+      globalAlignment = jo.getString(FIELD_ALIGNMENT);
+    } else {
+      globalAlignment = DEFAULT_ALIGNMENT;
+    }
+
+    for (int i = 0; i < fieldArray.length(); i++) {
+      JSONObject obj = fieldArray.getJSONObject(i);
+      Field field = new Field(obj.getString(NAME),
+        obj.getString(TYPE).toUpperCase(), obj.getInt(FIELD_LENGTH));
+      if(obj.has(FIELD_PADDING_CHARACTER)) {
+        field.setPadding(obj.getString(FIELD_PADDING_CHARACTER).charAt(0));
+      } else {
+        field.setPadding(globalPadding);
+      }
+      if(obj.has(FIELD_ALIGNMENT)) {
+        field.setAlignment(obj.getString(FIELD_ALIGNMENT));
+      } else {
+        field.setAlignment(globalAlignment);
+      }
+      //Get the format if the given data type is Date
+      if (field.getType() == FieldType.DATE) {
+        if (obj.has(DATE_FORMAT)) {
+          field.setDateFormat(obj.getString(DATE_FORMAT));
+        } else {
+          field.setDateFormat(DEFAULT_DATE_FORMAT);
+        }
+
+      }
+      //Get the trueValue and falseValue if the data type is Boolean
+      if (field.getType() == FieldType.BOOLEAN) {
+        if (obj.has(TRUE_VALUE)) {
+          field.setTrueValue(obj.getString(TRUE_VALUE));
+        } else {
+          field.setTrueValue(DEFAULT_TRUE_VALUE);
+        }
+        if (obj.has(FALSE_VALUE)) {
+          field.setFalseValue(obj.getString(FALSE_VALUE));
+        } else {
+          field.setFalseValue(DEFAULT_FALSE_VALUE);
+        }
+
+      }
+      fields.add(field);
+      fieldNames.add(field.name);
+    }
+  }
+
+  /**
+   * Get the list of Fields.
+   *
+   * @return fields list of {@link Field}
+   */
+  public List<Field> getFields()
+  {
+    return Collections.unmodifiableList(fields);
+  }
+
+  /**
+   * Objects of this class represents a particular field in the schema. Each
+   * field has a name, type and a fieldLength.
+   * In case of type Date we need a dateFormat.
+   *
+   */
+  public class Field extends Schema.Field
+  {
+    /**
+     * Length of the field
+     */
+    private int fieldLength;
+    /**
+     * Parameter to specify format of date
+     */
+    private String dateFormat;
+    /**
+     * Parameter to specify true value of Boolean
+     */
+    private String trueValue;
+    /**
+     * Parameter to specify false value of Boolean
+     */
+    private String falseValue;
+    /**
+     * Parameter to specify padding
+     */
+    private char padding;
+    /**
+     * Parameter to specify alignment
+     */
+    private String alignment;
+
+    /**
+     * Constructor for Field
+     * @param name - name of the field.
+     * @param type - type of the field.
+     * @param fieldLength - length of the field.
+     */
+    public Field(String name, String type, Integer fieldLength)
+    {
+      super(name, type);
+      this.fieldLength = fieldLength;
+      this.dateFormat = DEFAULT_DATE_FORMAT;
+      this.trueValue = DEFAULT_TRUE_VALUE;
+      this.falseValue = DEFAULT_FALSE_VALUE;
+      this.padding=' ';
+      this.alignment=DEFAULT_ALIGNMENT;
+    }
+
+    /**
+     * Get the Length of the Field
+     * @return fieldLength length of the field.
+     */
+    public int getFieldLength()
+    {
+      return fieldLength;
+    }
+
+    /**
+     * Set the end pointer of the field
+     *
+     * @param fieldLength length of the field.
+     */
+    public void setFieldLength(Integer fieldLength)
+    {
+      this.fieldLength = fieldLength;
+    }
+
+    /**
+     * Get the dateFormat of the field
+     *
+     * @return dateFormat format of date given.
+     */
+    public String getDateFormat()
+    {
+      return dateFormat;
+    }
+
+    /**
+     * Set the the dateFormat of the field
+     *
+     * @param dateFormat sets the format of date.
+     */
+    public void setDateFormat(String dateFormat)
+    {
+      this.dateFormat = dateFormat;
+    }
+
+    /**
+     * Get the trueValue of the Boolean field
+     * @return trueValue gets the equivalent true value.
+     */
+    public String getTrueValue()
+    {
+      return trueValue;
+    }
+
+    /**
+     * Set the trueValue of the Boolean field
+     *
+     * @param trueValue sets the equivalent true value.
+     */
+    public void setTrueValue(String trueValue)
+    {
+      this.trueValue = trueValue;
+    }
+
+    /**
+     * Get the falseValue of the Boolean field
+     * @return falseValue gets the equivalent false value.
+     */
+    public String getFalseValue()
+    {
+      return falseValue;
+    }
+
+    /**
+     * Set the end pointer of the field
+     *
+     * @param falseValue sets the equivalent false value.
+     */
+    public void setFalseValue(String falseValue)
+    {
+      this.falseValue = falseValue;
+    }
+    /**
+     * Get the field padding
+     * @return padding gets the padding for the individual field.
+     */
+    public char getPadding()
+    {
+      return padding;
+    }
+
+    /**
+     * Set the field padding
+     * @param padding sets the padding for the individual field.
+     */
+    public void setPadding(char padding)
+    {
+      this.padding = padding;
+    }
+
+    /**
+     * Get the field alignment
+     * @return alignment gets the alignment for the individual field.
+     */
+    public String getAlignment()
+    {
+      return alignment;
+    }
+
+    /**
+     * Set the field alignment
+     * @param alignment sets the alignment for the individual field.
+     */
+    public void setAlignment(String alignment)
+    {
+      this.alignment = alignment;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Fields [name=" + name + ", type=" + type + " fieldLength= " + fieldLength + "]";
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/Schema.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/Schema.java b/contrib/src/main/java/com/datatorrent/contrib/parser/Schema.java
new file mode 100644
index 0000000..c09ff92
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/Schema.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.contrib.parser;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * <p>
+ * This is schema that defines fields and their constraints for delimited and fixedWidth files
+ * The operators use this information to validate the incoming tuples.
+ * Information from JSON schema is saved in this object and is used by the
+ * operators
+ */
+public class Schema
+{
+  /**
+   * JSON key string for fields array
+   */
+  public static final String FIELDS = "fields";
+  /**
+   * JSON key string for name of the field within fields array
+   */
+  public static final String NAME = "name";
+  /**
+   * JSON key string for type of the field within fields array
+   */
+  public static final String TYPE = "type";
+  /**
+   * JSON key string for date format constraint
+   */
+  public static final String DATE_FORMAT = "format";
+  /**
+   * JSON key string for true value constraint
+   */
+  public static final String TRUE_VALUE = "trueValue";
+  /**
+   * JSON key string for false value constraint
+   */
+  public static final String FALSE_VALUE = "falseValue";
+  /**
+   * JSON key string for default true value of boolean
+   */
+  public static final String DEFAULT_TRUE_VALUE = "true";
+  /**
+   * JSON key string for default true value of boolean
+   */
+  public static final String DEFAULT_FALSE_VALUE = "false";
+  /**
+   * Default date format
+   */
+  public static final String DEFAULT_DATE_FORMAT = "dd/mm/yy";
+  /**
+   * This holds the list of field names in the same order as in the schema
+   */
+  protected List<String> fieldNames = new LinkedList<String>();
+
+
+  /**
+   * Get the list of field names mentioned in schema
+   *
+   * @return fieldNames
+   */
+  public List<String> getFieldNames()
+  {
+    return Collections.unmodifiableList(fieldNames);
+  }
+
+  /**
+   * Supported data types
+   */
+  public enum FieldType
+  {
+    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
+  }
+
+  /**
+   * Objects of this class represents a particular field in the schema. Each
+   * field has a name, type and a set of associated constraints.
+   *
+   */
+  public class Field
+  {
+    /**
+     * name of the field
+     */
+    String name;
+    /**
+     * Data type of the field
+     */
+    FieldType type;
+
+    /**
+     * Parameterized Constructor
+     * @param name name of the field.
+     * @param type data type of the field.
+     */
+    public Field(String name, String type)
+    {
+      this.name = name;
+      this.type = FieldType.valueOf(type.toUpperCase());
+    }
+
+    /**
+     * Default Constructor
+     */
+    public Field()
+    {
+    }
+
+    /**
+     * Get the name of the field
+     *
+     * @return name
+     */
+    public String getName()
+    {
+      return name;
+    }
+
+    /**
+     * Set the name of the field
+     *
+     * @param name
+     */
+    public void setName(String name)
+    {
+      this.name = name;
+    }
+
+    /**
+     * Get {@link FieldType}
+     *
+     * @return type
+     */
+    public FieldType getType()
+    {
+      return type;
+    }
+
+    /**
+     * Set {@link FieldType}
+     *
+     * @param type
+     */
+    public void setType(FieldType type)
+    {
+      this.type = type;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Fields [name=" + name + ", type=" + type + "]";
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/test/java/com/datatorrent/contrib/parser/FixedWidthTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/FixedWidthTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/FixedWidthTest.java
new file mode 100644
index 0000000..bc72efd
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/parser/FixedWidthTest.java
@@ -0,0 +1,526 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.util.Date;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class FixedWidthTest
+{
+
+  private static final String filename = "FixedWidthSchema.json";
+  @Rule
+  public Watcher watcher = new Watcher();
+  CollectorTestSink<Object> error = new CollectorTestSink<Object>();
+  CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>();
+  CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
+  FixedWidthParser parser = new FixedWidthParser();
+
+  /*
+  * adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,isActive,isOptimized,parentCampaign,weatherTargeted,valid
+  * e.g. in csv 1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes
+  * 120982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes
+  * Constraints are defined in FixedWidthSchema.json
+  */
+  @Test
+  public void TestParserValidInput()
+  {
+    String input = "120982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes       ";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, objectPort.collectedTuples.size());
+    Assert.assertEquals(1, pojoPort.collectedTuples.size());
+    Assert.assertEquals(0, error.collectedTuples.size());
+    Object obj = pojoPort.collectedTuples.get(0);
+    Ad adPojo = (Ad)obj;
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(Ad.class, obj.getClass());
+
+    Assert.assertEquals(12, adPojo.getAdId());
+    Assert.assertTrue("adxyz".equals(adPojo.getAdName()));
+    Assert.assertEquals(0.2, adPojo.getBidPrice(), 0.0);
+    Assert.assertEquals(Date.class, adPojo.getStartDate().getClass());
+    Assert.assertEquals(Date.class, adPojo.getEndDate().getClass());
+    Assert.assertEquals(12, adPojo.getSecurityCode());
+    Assert.assertTrue("CAMP_AD123".equals(adPojo.getParentCampaign()));
+    Assert.assertTrue("yes".equals(adPojo.getValid()));
+    Assert.assertTrue(adPojo.isActive());
+    Assert.assertFalse(adPojo.isOptimized());
+  }
+
+  @Test
+  public void TestParserValidInputPojoPortNotConnected()
+  {
+    parser.out.setSink(null);
+    String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes       ";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(0, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserValidInputClassNameNotProvided()
+  {
+    parser.setClazz(null);
+    String input = "120982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes       ";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(0, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserInvalidAdIdInput()
+  {
+    String input = "1c2982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes       ";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertEquals(input, errorTuple.getKey());
+  }
+
+  @Test
+  public void TestParserNoCampaignIdInput()
+  {
+    String input = "123   _____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes       ";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, objectPort.collectedTuples.size());
+    Assert.assertEquals(1, pojoPort.collectedTuples.size());
+    Object obj = pojoPort.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(Ad.class, obj.getClass());
+    Assert.assertEquals(0, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserInvalidCampaignIdInput()
+  {
+    String input = "1239c2_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes       ";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertEquals(input, errorTuple.getKey());
+  }
+
+  @Test
+  public void TestParserInvalidBidPriceInput()
+  {
+    String input = "123982_____adxyz0..2015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes       ";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertEquals(input, errorTuple.getKey());
+  }
+
+  @Test
+  public void TestParserInvalidStartDateInput()
+  {
+    String input = "123982_____adxyz0.22015-31-13 03:37:1211/12/201212___y_CAMP_AD123Yyes       ";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertEquals(input, errorTuple.getKey());
+  }
+
+  @Test
+  public void TestParserInvalidSecurityCodeInput()
+  {
+    String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212b__y_CAMP_AD123Yyes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertEquals(input, errorTuple.getKey());
+  }
+
+  @Test
+  public void TestParserInvalidisActiveInput()
+  {
+    String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___p_CAMP_AD123Yyes";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertEquals(input, errorTuple.getKey());
+  }
+
+  @Test
+  public void TestParserNullOrBlankInput()
+  {
+    parser.beginWindow(0);
+    parser.in.process(null);
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void TestParserHeaderAsInput()
+  {
+    parser.setHeader("aIdcIdadName    bidstartDate          endDate   sCodeBBparentCampWvalid     ");
+    String input = "aIdcIdadName    bidstartDate          endDate   sCodeBBparentCampWvalid     ";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertEquals(input, errorTuple.getKey());
+  }
+
+  @Test
+  public void TestParserShorterRecord()
+  {
+    String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes    ";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertEquals(input, errorTuple.getKey());
+  }
+
+  @Test
+  public void TestParserShorterRecordOnlyPOJOPortConnected()
+  {
+    String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes    ";
+    parser.parsedOutput.setSink(null);
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertEquals(input, errorTuple.getKey());
+  }
+
+  @Test
+  public void TestParserLongerRecord()
+  {
+
+    String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes       Extra_stuff";
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertEquals(input, errorTuple.getKey());
+  }
+
+  @Test
+  public void TestParserLongerRecordOnlyPOJOPortConnected()
+  {
+    String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes       Extra_stuff";
+    parser.parsedOutput.setSink(null);
+    parser.beginWindow(0);
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertEquals(input, errorTuple.getKey());
+  }
+
+  @Test
+  public void TestParserValidInputMetricVerification()
+  {
+    parser.beginWindow(0);
+    Assert.assertEquals(0, parser.getParsedOutputCount());
+    Assert.assertEquals(0, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+    String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes       ";
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, parser.getParsedOutputCount());
+    Assert.assertEquals(1, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(1, parser.getEmittedObjectCount());
+  }
+
+  @Test
+  public void TestParserInvalidInputMetricVerification()
+  {
+    parser.beginWindow(0);
+    Assert.assertEquals(0, parser.getParsedOutputCount());
+    Assert.assertEquals(0, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+    parser.in.process("123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes       Extra_stuff"
+      .getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, parser.getParsedOutputCount());
+    Assert.assertEquals(1, parser.getIncomingTuplesCount());
+    Assert.assertEquals(1, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+  }
+
+  @Test
+  public void TestParserValidInputMetricResetCheck()
+  {
+    parser.beginWindow(0);
+    Assert.assertEquals(0, parser.getParsedOutputCount());
+    Assert.assertEquals(0, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+    String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes       ";
+    parser.in.process(input.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, parser.getParsedOutputCount());
+    Assert.assertEquals(1, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(1, parser.getEmittedObjectCount());
+    parser.beginWindow(1);
+    Assert.assertEquals(0, parser.getParsedOutputCount());
+    Assert.assertEquals(0, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+    parser.in.process(input.getBytes());
+    Assert.assertEquals(1, parser.getParsedOutputCount());
+    Assert.assertEquals(1, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(1, parser.getEmittedObjectCount());
+    parser.endWindow();
+  }
+
+  public static class Ad
+  {
+
+    private int adId;
+    private int campaignId;
+    private String adName;
+    private double bidPrice;
+    private Date startDate;
+    private Date endDate;
+    private long securityCode;
+    private boolean active;
+    private boolean optimized;
+    private String parentCampaign;
+    private Character weatherTargeted;
+    private String valid;
+
+    public Ad()
+    {
+
+    }
+
+    public int getAdId()
+    {
+      return adId;
+    }
+
+    public void setAdId(int adId)
+    {
+      this.adId = adId;
+    }
+
+    public int getCampaignId()
+    {
+      return campaignId;
+    }
+
+    public void setCampaignId(int campaignId)
+    {
+      this.campaignId = campaignId;
+    }
+
+    public String getAdName()
+    {
+      return adName;
+    }
+
+    public void setAdName(String adName)
+    {
+      this.adName = adName;
+    }
+
+    public double getBidPrice()
+    {
+      return bidPrice;
+    }
+
+    public void setBidPrice(double bidPrice)
+    {
+      this.bidPrice = bidPrice;
+    }
+
+    public Date getStartDate()
+    {
+      return startDate;
+    }
+
+    public void setStartDate(Date startDate)
+    {
+      this.startDate = startDate;
+    }
+
+    public Date getEndDate()
+    {
+      return endDate;
+    }
+
+    public void setEndDate(Date endDate)
+    {
+      this.endDate = endDate;
+    }
+
+    public long getSecurityCode()
+    {
+      return securityCode;
+    }
+
+    public void setSecurityCode(long securityCode)
+    {
+      this.securityCode = securityCode;
+    }
+
+    public boolean isActive()
+    {
+      return active;
+    }
+
+    public void setActive(boolean active)
+    {
+      this.active = active;
+    }
+
+    public boolean isOptimized()
+    {
+      return optimized;
+    }
+
+    public void setOptimized(boolean optimized)
+    {
+      this.optimized = optimized;
+    }
+
+    public String getParentCampaign()
+    {
+      return parentCampaign;
+    }
+
+    public void setParentCampaign(String parentCampaign)
+    {
+      this.parentCampaign = parentCampaign;
+    }
+
+    public Character getWeatherTargeted()
+    {
+      return weatherTargeted;
+    }
+
+    public void setWeatherTargeted(Character weatherTargeted)
+    {
+      this.weatherTargeted = weatherTargeted;
+    }
+
+    public String getValid()
+    {
+      return valid;
+    }
+
+    public void setValid(String valid)
+    {
+      this.valid = valid;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Ad [adId=" + adId + ", campaignId=" + campaignId + ", adName=" + adName + ", bidPrice=" + bidPrice
+        + ", startDate=" + startDate + ", endDate=" + endDate + ", securityCode=" + securityCode + ", active="
+        + active + ", optimized=" + optimized + ", parentCampaign=" + parentCampaign + ", weatherTargeted="
+        + weatherTargeted + ", valid=" + valid + "]";
+    }
+  }
+
+  public class Watcher extends TestWatcher
+  {
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      parser.setClazz(Ad.class);
+      parser.setJsonSchema(SchemaUtils.jarResourceFileToString(filename));
+      parser.setup(null);
+      parser.activate(null);
+      parser.err.setSink(error);
+      parser.parsedOutput.setSink(objectPort);
+      parser.out.setSink(pojoPort);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      super.finished(description);
+      error.clear();
+      objectPort.clear();
+      pojoPort.clear();
+      parser.teardown();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/test/resources/FixedWidthSchema.json
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/FixedWidthSchema.json b/contrib/src/test/resources/FixedWidthSchema.json
new file mode 100644
index 0000000..38d7cc7
--- /dev/null
+++ b/contrib/src/test/resources/FixedWidthSchema.json
@@ -0,0 +1,78 @@
+{
+    "padding": "_",
+    "alignment": "left",
+    "fields": [
+        {
+            "name": "adId",
+            "type": "Integer",
+            "length": "3",
+            "padding": "0"
+        },
+        {
+            "name": "campaignId",
+            "type": "Integer",
+            "length": "3",
+            "padding": " "
+
+        },
+        {
+            "name": "adName",
+            "type": "String",
+            "length": "10",
+            "alignment":"right"
+        },
+        {
+            "name": "bidPrice",
+            "type": "Double",
+            "length": "3"
+        },
+        {
+            "name": "startDate",
+            "type": "Date",
+            "format": "yyyy-MM-dd HH:mm:ss",
+            "length": "19"
+
+        },
+        {
+            "name": "endDate",
+            "type": "Date",
+            "format": "dd/MM/yyyy",
+            "length": "10"
+        },
+        {
+            "name": "securityCode",
+            "type": "Long",
+            "length": "5"
+        },
+        {
+            "name": "active",
+            "type": "Boolean",
+            "length": "1",
+            "trueValue": "y",
+            "falseValue": "n"
+        },
+        {
+            "name": "optimized",
+            "type": "Boolean",
+            "length": "1",
+            "trueValue": "y",
+            "falseValue": "n"
+        },
+        {
+            "name": "parentCampaign",
+            "type": "String",
+            "length": "10"
+        },
+        {
+            "name": "weatherTargeted",
+            "type": "Character",
+            "length": "1"
+        },
+        {
+            "name": "valid",
+            "type": "String",
+            "length": "10",
+            "padding":" "
+        }
+    ]
+}