You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:12:24 UTC

[07/50] [abbrv] incubator-apex-malhar git commit: Moving XML and JSON parsers & formatters to Malhar-lib and changing package names for parsers & formatters

Moving XML and JSON parsers & formatters to Malhar-lib and changing package names for parsers & formatters


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/23dabc41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/23dabc41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/23dabc41

Branch: refs/heads/master
Commit: 23dabc414b9df2d3b0739933406ad11c2388f0b6
Parents: f070c33
Author: ishark <is...@datatorrent.com>
Authored: Tue Dec 15 14:52:18 2015 -0800
Committer: ishark <is...@datatorrent.com>
Committed: Tue Dec 15 14:52:18 2015 -0800

----------------------------------------------------------------------
 contrib/pom.xml                                 |   6 -
 .../contrib/converter/Converter.java            |  44 ---
 .../contrib/formatter/CsvFormatter.java         | 286 +++++++++++++++++
 .../datatorrent/contrib/parser/CsvParser.java   | 315 +++++++++++++++++++
 .../contrib/schema/formatter/CsvFormatter.java  | 286 -----------------
 .../contrib/schema/formatter/Formatter.java     | 102 ------
 .../contrib/schema/formatter/JsonFormatter.java | 110 -------
 .../contrib/schema/formatter/XmlFormatter.java  | 173 ----------
 .../contrib/schema/parser/CsvParser.java        | 315 -------------------
 .../contrib/schema/parser/JsonParser.java       | 110 -------
 .../contrib/schema/parser/Parser.java           | 103 ------
 .../contrib/schema/parser/XmlParser.java        | 142 ---------
 .../contrib/formatter/CsvFormatterTest.java     | 165 ++++++++++
 .../contrib/parser/CsvPOJOParserTest.java       | 189 +++++++++++
 .../schema/formatter/CsvFormatterTest.java      | 165 ----------
 .../schema/formatter/JsonFormatterTest.java     | 204 ------------
 .../schema/formatter/XmlFormatterTest.java      | 244 --------------
 .../contrib/schema/parser/CsvParserTest.java    | 190 -----------
 .../contrib/schema/parser/JsonParserTest.java   | 230 --------------
 .../contrib/schema/parser/XmlParserTest.java    | 272 ----------------
 library/pom.xml                                 |  12 +
 .../datatorrent/lib/converter/Converter.java    |  44 +++
 .../datatorrent/lib/formatter/Formatter.java    | 102 ++++++
 .../lib/formatter/JsonFormatter.java            | 110 +++++++
 .../datatorrent/lib/formatter/XmlFormatter.java | 173 ++++++++++
 .../com/datatorrent/lib/parser/JsonParser.java  | 110 +++++++
 .../java/com/datatorrent/lib/parser/Parser.java | 103 ++++++
 .../com/datatorrent/lib/parser/XmlParser.java   | 142 +++++++++
 .../lib/formatter/JsonFormatterTest.java        | 204 ++++++++++++
 .../lib/formatter/XmlFormatterTest.java         | 243 ++++++++++++++
 .../datatorrent/lib/parser/JsonParserTest.java  | 230 ++++++++++++++
 .../datatorrent/lib/parser/XmlParserTest.java   | 272 ++++++++++++++++
 pom.xml                                         |   2 +
 33 files changed, 2702 insertions(+), 2696 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index f1b6ceb..256b438 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -612,12 +612,6 @@
       <type>jar</type>
     </dependency>
     <dependency>
-      <!-- required by Xml parser and formatter -->
-      <groupId>com.thoughtworks.xstream</groupId>
-      <artifactId>xstream</artifactId>
-      <version>1.4.8</version>
-    </dependency>
-    <dependency>
       <!-- required by Csv parser and formatter -->
       <groupId>net.sf.supercsv</groupId>
       <artifactId>super-csv-joda</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java b/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java
deleted file mode 100644
index 601268d..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.converter;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * Operators that are converting tuples from one format to another must
- * implement this interface. Eg. Parsers or formatters , that parse data of
- * certain format and convert them to another format.
- * 
- * @param <INPUT>
- * @param <OUTPUT>
- * @since 3.2.0
- */
-@InterfaceStability.Evolving
-public interface Converter<INPUT, OUTPUT>
-{
-  /**
-   * Provide the implementation for converting tuples from one format to the
-   * other
-   * 
-   * @param INPUT
-   *          tuple of certain format
-   * @return OUTPUT tuple of converted format
-   */
-  public OUTPUT convert(INPUT tuple);
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
new file mode 100644
index 0000000..e5bbd3c
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.formatter;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.supercsv.cellprocessor.FmtDate;
+import org.supercsv.cellprocessor.Optional;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.exception.SuperCsvException;
+import org.supercsv.io.CsvBeanWriter;
+import org.supercsv.io.ICsvBeanWriter;
+import org.supercsv.prefs.CsvPreference;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.formatter.Formatter;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Operator that converts POJO to CSV string <br>
+ * Assumption is that each field in the delimited data should map to a simple
+ * java type.<br>
+ * <br>
+ * <b>Properties</b> <br>
+ * <b>fieldInfo</b>:User need to specify fields and their types as a comma
+ * separated string having format &lt;NAME&gt;:&lt;TYPE&gt;|&lt;FORMAT&gt; in
+ * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as
+ * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+ * 
+ * @displayName CsvFormatter
+ * @category Formatter
+ * @tags pojo csv formatter
+ * @since 3.2.0
+ */
+@InterfaceStability.Evolving
+public class CsvFormatter extends Formatter<String>
+{
+
+  private ArrayList<Field> fields;
+  @NotNull
+  protected String classname;
+  @NotNull
+  protected int fieldDelimiter;
+  protected String lineDelimiter;
+
+  @NotNull
+  protected String fieldInfo;
+
+  public enum FIELD_TYPE
+  {
+    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
+  };
+
+  protected transient String[] nameMapping;
+  protected transient CellProcessor[] processors;
+  protected transient CsvPreference preference;
+
+  public CsvFormatter()
+  {
+    fields = new ArrayList<Field>();
+    fieldDelimiter = ',';
+    lineDelimiter = "\r\n";
+
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+
+    //fieldInfo information
+    fields = new ArrayList<Field>();
+    String[] fieldInfoTuple = fieldInfo.split(",");
+    for (int i = 0; i < fieldInfoTuple.length; i++) {
+      String[] fieldTuple = fieldInfoTuple[i].split(":");
+      Field field = new Field();
+      field.setName(fieldTuple[0]);
+      String[] typeFormat = fieldTuple[1].split("\\|");
+      field.setType(typeFormat[0].toUpperCase());
+      if (typeFormat.length > 1) {
+        field.setFormat(typeFormat[1]);
+      }
+      getFields().add(field);
+    }
+    preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
+    int countKeyValue = getFields().size();
+    nameMapping = new String[countKeyValue];
+    processors = new CellProcessor[countKeyValue];
+    initialise(nameMapping, processors);
+
+  }
+
+  private void initialise(String[] nameMapping, CellProcessor[] processors)
+  {
+    for (int i = 0; i < getFields().size(); i++) {
+      FIELD_TYPE type = getFields().get(i).type;
+      nameMapping[i] = getFields().get(i).name;
+      if (type == FIELD_TYPE.DATE) {
+        String dateFormat = getFields().get(i).format;
+        processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
+      } else {
+        processors[i] = new Optional();
+      }
+    }
+
+  }
+
+  @Override
+  public void activate(Context context)
+  {
+
+  }
+
+  @Override
+  public void deactivate()
+  {
+
+  }
+
+  @Override
+  public String convert(Object tuple)
+  {
+    try {
+      StringWriter stringWriter = new StringWriter();
+      ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter, preference);
+      beanWriter.write(tuple, nameMapping, processors);
+      beanWriter.flush();
+      beanWriter.close();
+      return stringWriter.toString();
+    } catch (SuperCsvException e) {
+      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+    return null;
+  }
+
+  public static class Field
+  {
+    String name;
+    String format;
+    FIELD_TYPE type;
+
+    public String getName()
+    {
+      return name;
+    }
+
+    public void setName(String name)
+    {
+      this.name = name;
+    }
+
+    public FIELD_TYPE getType()
+    {
+      return type;
+    }
+
+    public void setType(String type)
+    {
+      this.type = FIELD_TYPE.valueOf(type);
+    }
+
+    public String getFormat()
+    {
+      return format;
+    }
+
+    public void setFormat(String format)
+    {
+      this.format = format;
+    }
+  }
+
+  /**
+   * Gets the array list of the fields, a field being a POJO containing the name
+   * of the field and type of field.
+   * 
+   * @return An array list of Fields.
+   */
+  public ArrayList<Field> getFields()
+  {
+    return fields;
+  }
+
+  /**
+   * Sets the array list of the fields, a field being a POJO containing the name
+   * of the field and type of field.
+   * 
+   * @param fields
+   *          An array list of Fields.
+   */
+  public void setFields(ArrayList<Field> fields)
+  {
+    this.fields = fields;
+  }
+
+  /**
+   * Gets the delimiter which separates fields in incoming data.
+   * 
+   * @return fieldDelimiter
+   */
+  public int getFieldDelimiter()
+  {
+    return fieldDelimiter;
+  }
+
+  /**
+   * Sets the delimiter which separates fields in incoming data.
+   * 
+   * @param fieldDelimiter
+   */
+  public void setFieldDelimiter(int fieldDelimiter)
+  {
+    this.fieldDelimiter = fieldDelimiter;
+  }
+
+  /**
+   * Gets the delimiter which separates lines in incoming data.
+   * 
+   * @return lineDelimiter
+   */
+  public String getLineDelimiter()
+  {
+    return lineDelimiter;
+  }
+
+  /**
+   * Sets the delimiter which separates line in incoming data.
+   * 
+   * @param lineDelimiter
+   */
+  public void setLineDelimiter(String lineDelimiter)
+  {
+    this.lineDelimiter = lineDelimiter;
+  }
+
+  /**
+   * Gets the name of the fields with type and format in data as comma separated
+   * string in same order as incoming data. e.g
+   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+   * 
+   * @return fieldInfo
+   */
+  public String getFieldInfo()
+  {
+    return fieldInfo;
+  }
+
+  /**
+   * Sets the name of the fields with type and format in data as comma separated
+   * string in same order as incoming data. e.g
+   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+   * 
+   * @param fieldInfo
+   */
+  public void setFieldInfo(String fieldInfo)
+  {
+    this.fieldInfo = fieldInfo;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(CsvFormatter.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
new file mode 100644
index 0000000..0c5f8d2
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.supercsv.cellprocessor.Optional;
+import org.supercsv.cellprocessor.ParseBool;
+import org.supercsv.cellprocessor.ParseChar;
+import org.supercsv.cellprocessor.ParseDate;
+import org.supercsv.cellprocessor.ParseDouble;
+import org.supercsv.cellprocessor.ParseInt;
+import org.supercsv.cellprocessor.ParseLong;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.io.CsvBeanReader;
+import org.supercsv.prefs.CsvPreference;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.ReusableStringReader;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Operator that converts CSV string to Pojo <br>
+ * Assumption is that each field in the delimited data should map to a simple
+ * java type.<br>
+ * <br>
+ * <b>Properties</b> <br>
+ * <b>fieldInfo</b>:User need to specify fields and their types as a comma
+ * separated string having format &lt;NAME&gt;:&lt;TYPE&gt;|&lt;FORMAT&gt; in
+ * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as
+ * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy <br>
+ * <b>fieldDelimiter</b>: Default is comma <br>
+ * <b>lineDelimiter</b>: Default is '\r\n'
+ * 
+ * @displayName CsvParser
+ * @category Parsers
+ * @tags csv pojo parser
+ * @since 3.2.0
+ */
+@InterfaceStability.Evolving
+public class CsvParser extends Parser<String>
+{
+
+  private ArrayList<Field> fields;
+  @NotNull
+  protected int fieldDelimiter;
+  protected String lineDelimiter;
+
+  @NotNull
+  protected String fieldInfo;
+
+  protected transient String[] nameMapping;
+  protected transient CellProcessor[] processors;
+  private transient CsvBeanReader csvReader;
+
+  public enum FIELD_TYPE
+  {
+    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
+  };
+
+  @NotNull
+  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
+
+  public CsvParser()
+  {
+    fields = new ArrayList<Field>();
+    fieldDelimiter = ',';
+    lineDelimiter = "\r\n";
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+
+    logger.info("field info {}", fieldInfo);
+    fields = new ArrayList<Field>();
+    String[] fieldInfoTuple = fieldInfo.split(",");
+    for (int i = 0; i < fieldInfoTuple.length; i++) {
+      String[] fieldTuple = fieldInfoTuple[i].split(":");
+      Field field = new Field();
+      field.setName(fieldTuple[0]);
+      String[] typeFormat = fieldTuple[1].split("\\|");
+      field.setType(typeFormat[0].toUpperCase());
+      if (typeFormat.length > 1) {
+        field.setFormat(typeFormat[1]);
+      }
+      getFields().add(field);
+    }
+
+    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
+    csvReader = new CsvBeanReader(csvStringReader, preference);
+    int countKeyValue = getFields().size();
+    logger.info("countKeyValue {}", countKeyValue);
+    nameMapping = new String[countKeyValue];
+    processors = new CellProcessor[countKeyValue];
+    initialise(nameMapping, processors);
+  }
+
+  private void initialise(String[] nameMapping, CellProcessor[] processors)
+  {
+    for (int i = 0; i < getFields().size(); i++) {
+      FIELD_TYPE type = getFields().get(i).type;
+      nameMapping[i] = getFields().get(i).name;
+      if (type == FIELD_TYPE.DOUBLE) {
+        processors[i] = new Optional(new ParseDouble());
+      } else if (type == FIELD_TYPE.INTEGER) {
+        processors[i] = new Optional(new ParseInt());
+      } else if (type == FIELD_TYPE.FLOAT) {
+        processors[i] = new Optional(new ParseDouble());
+      } else if (type == FIELD_TYPE.LONG) {
+        processors[i] = new Optional(new ParseLong());
+      } else if (type == FIELD_TYPE.SHORT) {
+        processors[i] = new Optional(new ParseInt());
+      } else if (type == FIELD_TYPE.STRING) {
+        processors[i] = new Optional();
+      } else if (type == FIELD_TYPE.CHARACTER) {
+        processors[i] = new Optional(new ParseChar());
+      } else if (type == FIELD_TYPE.BOOLEAN) {
+        processors[i] = new Optional(new ParseBool());
+      } else if (type == FIELD_TYPE.DATE) {
+        String dateFormat = getFields().get(i).format;
+        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
+      }
+    }
+  }
+
+  @Override
+  public void activate(Context context)
+  {
+
+  }
+
+  @Override
+  public void deactivate()
+  {
+
+  }
+
+  @Override
+  public Object convert(String tuple)
+  {
+    try {
+      csvStringReader.open(tuple);
+      return csvReader.read(clazz, nameMapping, processors);
+    } catch (IOException e) {
+      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
+      return null;
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    try {
+      if (csvReader != null) {
+        csvReader.close();
+      }
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+  }
+
+  public static class Field
+  {
+    String name;
+    String format;
+    FIELD_TYPE type;
+
+    public String getName()
+    {
+      return name;
+    }
+
+    public void setName(String name)
+    {
+      this.name = name;
+    }
+
+    public FIELD_TYPE getType()
+    {
+      return type;
+    }
+
+    public void setType(String type)
+    {
+      this.type = FIELD_TYPE.valueOf(type);
+    }
+
+    public String getFormat()
+    {
+      return format;
+    }
+
+    public void setFormat(String format)
+    {
+      this.format = format;
+    }
+
+  }
+
+  /**
+   * Gets the array list of the fields, a field being a POJO containing the name
+   * of the field and type of field.
+   * 
+   * @return An array list of Fields.
+   */
+  public ArrayList<Field> getFields()
+  {
+    return fields;
+  }
+
+  /**
+   * Sets the array list of the fields, a field being a POJO containing the name
+   * of the field and type of field.
+   * 
+   * @param fields
+   *          An array list of Fields.
+   */
+  public void setFields(ArrayList<Field> fields)
+  {
+    this.fields = fields;
+  }
+
+  /**
+   * Gets the delimiter which separates fields in incoming data.
+   * 
+   * @return fieldDelimiter
+   */
+  public int getFieldDelimiter()
+  {
+    return fieldDelimiter;
+  }
+
+  /**
+   * Sets the delimiter which separates fields in incoming data.
+   * 
+   * @param fieldDelimiter
+   */
+  public void setFieldDelimiter(int fieldDelimiter)
+  {
+    this.fieldDelimiter = fieldDelimiter;
+  }
+
+  /**
+   * Gets the delimiter which separates lines in incoming data.
+   * 
+   * @return lineDelimiter
+   */
+  public String getLineDelimiter()
+  {
+    return lineDelimiter;
+  }
+
+  /**
+   * Sets the delimiter which separates line in incoming data.
+   * 
+   * @param lineDelimiter
+   */
+  public void setLineDelimiter(String lineDelimiter)
+  {
+    this.lineDelimiter = lineDelimiter;
+  }
+
+  /**
+   * Gets the name of the fields with type and format ( for date ) as comma
+   * separated string in same order as incoming data. e.g
+   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+   * 
+   * @return fieldInfo
+   */
+  public String getFieldInfo()
+  {
+    return fieldInfo;
+  }
+
+  /**
+   * Sets the name of the fields with type and format ( for date ) as comma
+   * separated string in same order as incoming data. e.g
+   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+   * 
+   * @param fieldInfo
+   */
+  public void setFieldInfo(String fieldInfo)
+  {
+    this.fieldInfo = fieldInfo;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(CsvParser.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java
deleted file mode 100644
index 490c4f2..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.schema.formatter;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-
-import javax.validation.constraints.NotNull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.supercsv.cellprocessor.FmtDate;
-import org.supercsv.cellprocessor.Optional;
-import org.supercsv.cellprocessor.ift.CellProcessor;
-import org.supercsv.exception.SuperCsvException;
-import org.supercsv.io.CsvBeanWriter;
-import org.supercsv.io.ICsvBeanWriter;
-import org.supercsv.prefs.CsvPreference;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.DTThrowable;
-
-/**
- * Operator that converts POJO to CSV string <br>
- * Assumption is that each field in the delimited data should map to a simple
- * java type.<br>
- * <br>
- * <b>Properties</b> <br>
- * <b>fieldInfo</b>:User need to specify fields and their types as a comma
- * separated string having format &lt;NAME&gt;:&lt;TYPE&gt;|&lt;FORMAT&gt; in
- * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as
- * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
- * 
- * @displayName CsvFormatter
- * @category Formatter
- * @tags pojo csv formatter
- * @since 3.2.0
- */
-@InterfaceStability.Evolving
-public class CsvFormatter extends Formatter<String>
-{
-
-  private ArrayList<Field> fields;
-  @NotNull
-  protected String classname;
-  @NotNull
-  protected int fieldDelimiter;
-  protected String lineDelimiter;
-
-  @NotNull
-  protected String fieldInfo;
-
-  public enum FIELD_TYPE
-  {
-    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
-  };
-
-  protected transient String[] nameMapping;
-  protected transient CellProcessor[] processors;
-  protected transient CsvPreference preference;
-
-  public CsvFormatter()
-  {
-    fields = new ArrayList<Field>();
-    fieldDelimiter = ',';
-    lineDelimiter = "\r\n";
-
-  }
-
-  @Override
-  public void setup(Context.OperatorContext context)
-  {
-    super.setup(context);
-
-    //fieldInfo information
-    fields = new ArrayList<Field>();
-    String[] fieldInfoTuple = fieldInfo.split(",");
-    for (int i = 0; i < fieldInfoTuple.length; i++) {
-      String[] fieldTuple = fieldInfoTuple[i].split(":");
-      Field field = new Field();
-      field.setName(fieldTuple[0]);
-      String[] typeFormat = fieldTuple[1].split("\\|");
-      field.setType(typeFormat[0].toUpperCase());
-      if (typeFormat.length > 1) {
-        field.setFormat(typeFormat[1]);
-      }
-      getFields().add(field);
-    }
-    preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
-    int countKeyValue = getFields().size();
-    nameMapping = new String[countKeyValue];
-    processors = new CellProcessor[countKeyValue];
-    initialise(nameMapping, processors);
-
-  }
-
-  private void initialise(String[] nameMapping, CellProcessor[] processors)
-  {
-    for (int i = 0; i < getFields().size(); i++) {
-      FIELD_TYPE type = getFields().get(i).type;
-      nameMapping[i] = getFields().get(i).name;
-      if (type == FIELD_TYPE.DATE) {
-        String dateFormat = getFields().get(i).format;
-        processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
-      } else {
-        processors[i] = new Optional();
-      }
-    }
-
-  }
-
-  @Override
-  public void activate(Context context)
-  {
-
-  }
-
-  @Override
-  public void deactivate()
-  {
-
-  }
-
-  @Override
-  public String convert(Object tuple)
-  {
-    try {
-      StringWriter stringWriter = new StringWriter();
-      ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter, preference);
-      beanWriter.write(tuple, nameMapping, processors);
-      beanWriter.flush();
-      beanWriter.close();
-      return stringWriter.toString();
-    } catch (SuperCsvException e) {
-      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
-    } catch (IOException e) {
-      DTThrowable.rethrow(e);
-    }
-    return null;
-  }
-
-  public static class Field
-  {
-    String name;
-    String format;
-    FIELD_TYPE type;
-
-    public String getName()
-    {
-      return name;
-    }
-
-    public void setName(String name)
-    {
-      this.name = name;
-    }
-
-    public FIELD_TYPE getType()
-    {
-      return type;
-    }
-
-    public void setType(String type)
-    {
-      this.type = FIELD_TYPE.valueOf(type);
-    }
-
-    public String getFormat()
-    {
-      return format;
-    }
-
-    public void setFormat(String format)
-    {
-      this.format = format;
-    }
-  }
-
-  /**
-   * Gets the array list of the fields, a field being a POJO containing the name
-   * of the field and type of field.
-   * 
-   * @return An array list of Fields.
-   */
-  public ArrayList<Field> getFields()
-  {
-    return fields;
-  }
-
-  /**
-   * Sets the array list of the fields, a field being a POJO containing the name
-   * of the field and type of field.
-   * 
-   * @param fields
-   *          An array list of Fields.
-   */
-  public void setFields(ArrayList<Field> fields)
-  {
-    this.fields = fields;
-  }
-
-  /**
-   * Gets the delimiter which separates fields in incoming data.
-   * 
-   * @return fieldDelimiter
-   */
-  public int getFieldDelimiter()
-  {
-    return fieldDelimiter;
-  }
-
-  /**
-   * Sets the delimiter which separates fields in incoming data.
-   * 
-   * @param fieldDelimiter
-   */
-  public void setFieldDelimiter(int fieldDelimiter)
-  {
-    this.fieldDelimiter = fieldDelimiter;
-  }
-
-  /**
-   * Gets the delimiter which separates lines in incoming data.
-   * 
-   * @return lineDelimiter
-   */
-  public String getLineDelimiter()
-  {
-    return lineDelimiter;
-  }
-
-  /**
-   * Sets the delimiter which separates line in incoming data.
-   * 
-   * @param lineDelimiter
-   */
-  public void setLineDelimiter(String lineDelimiter)
-  {
-    this.lineDelimiter = lineDelimiter;
-  }
-
-  /**
-   * Gets the name of the fields with type and format in data as comma separated
-   * string in same order as incoming data. e.g
-   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
-   * 
-   * @return fieldInfo
-   */
-  public String getFieldInfo()
-  {
-    return fieldInfo;
-  }
-
-  /**
-   * Sets the name of the fields with type and format in data as comma separated
-   * string in same order as incoming data. e.g
-   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
-   * 
-   * @param fieldInfo
-   */
-  public void setFieldInfo(String fieldInfo)
-  {
-    this.fieldInfo = fieldInfo;
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(CsvFormatter.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java
deleted file mode 100644
index 77fa630..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.schema.formatter;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator.ActivationListener;
-import com.datatorrent.api.annotation.InputPortFieldAnnotation;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.contrib.converter.Converter;
-
-/**
- * Abstract class that implements Converter interface. This is a schema enabled
- * Formatter <br>
- * Sub classes need to implement the convert method <br>
- * <b>Port Interface</b><br>
- * <b>in</b>: expects &lt;Object&gt; this is a schema enabled port<br>
- * <b>out</b>: emits &lt;OUTPUT&gt; <br>
- * <b>err</b>: emits &lt;Object&gt; error port that emits input tuple that could
- * not be converted<br>
- * <br>
- * 
- * @displayName Parser
- * @tags parser converter
- * @param <INPUT>
- * @since 3.2.0
- */
-@InterfaceStability.Evolving
-public abstract class Formatter<OUTPUT> extends BaseOperator implements Converter<Object, OUTPUT>,
-    ActivationListener<Context>
-{
-  protected transient Class<?> clazz;
-
-  @OutputPortFieldAnnotation
-  public transient DefaultOutputPort<OUTPUT> out = new DefaultOutputPort<OUTPUT>();
-
-  @OutputPortFieldAnnotation(optional = true)
-  public transient DefaultOutputPort<Object> err = new DefaultOutputPort<Object>();
-
-  @InputPortFieldAnnotation(schemaRequired = true)
-  public transient DefaultInputPort<Object> in = new DefaultInputPort<Object>()
-  {
-    public void setup(PortContext context)
-    {
-      clazz = context.getValue(Context.PortContext.TUPLE_CLASS);
-    }
-
-    @Override
-    public void process(Object inputTuple)
-    {
-      OUTPUT tuple = convert(inputTuple);
-      if (tuple == null && err.isConnected()) {
-        err.emit(inputTuple);
-        return;
-      }
-      if (out.isConnected()) {
-        out.emit(tuple);
-      }
-    }
-  };
-
-  /**
-   * Get the class that needs to be formatted
-   * 
-   * @return Class<?>
-   */
-  public Class<?> getClazz()
-  {
-    return clazz;
-  }
-
-  /**
-   * Set the class of tuple that needs to be formatted
-   * 
-   * @param clazz
-   */
-  public void setClazz(Class<?> clazz)
-  {
-    this.clazz = clazz;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java
deleted file mode 100644
index 5f7bce6..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.schema.formatter;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectWriter;
-import org.codehaus.jackson.map.SerializationConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.DTThrowable;
-
-/**
- * Operator that converts POJO to JSON string <br>
- * <b>Properties</b> <br>
- * <b>dateFormat</b>: date format e.g dd/MM/yyyy
- * 
- * @displayName JsonFormatter
- * @category Formatter
- * @tags pojo json formatter
- * @since 3.2.0
- */
-@InterfaceStability.Evolving
-public class JsonFormatter extends Formatter<String>
-{
-  private transient ObjectWriter writer;
-  protected String dateFormat;
-
-  @Override
-  public void activate(Context context)
-  {
-    try {
-      ObjectMapper mapper = new ObjectMapper();
-      if (dateFormat != null) {
-        mapper.setDateFormat(new SimpleDateFormat(dateFormat));
-      }
-      writer = mapper.writerWithType(clazz);
-      mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
-      mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, true);
-      mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, true);
-    } catch (Throwable e) {
-      throw new RuntimeException("Unable find provided class");
-    }
-  }
-
-  @Override
-  public void deactivate()
-  {
-
-  }
-
-  @Override
-  public String convert(Object tuple)
-  {
-    try {
-      return writer.writeValueAsString(tuple);
-    } catch (JsonGenerationException | JsonMappingException e) {
-      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
-    } catch (IOException e) {
-      DTThrowable.rethrow(e);
-    }
-    return null;
-  }
-
-  /**
-   * Get the date format
-   * 
-   * @return Date format string
-   */
-  public String getDateFormat()
-  {
-    return dateFormat;
-  }
-
-  /**
-   * Set the date format
-   * 
-   * @param dateFormat
-   */
-  public void setDateFormat(String dateFormat)
-  {
-    this.dateFormat = dateFormat;
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(JsonFormatter.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java
deleted file mode 100644
index 40fef69..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.schema.formatter;
-
-import java.io.Writer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.api.Context;
-
-import com.thoughtworks.xstream.XStream;
-import com.thoughtworks.xstream.XStreamException;
-import com.thoughtworks.xstream.converters.basic.DateConverter;
-import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
-import com.thoughtworks.xstream.io.xml.CompactWriter;
-import com.thoughtworks.xstream.io.xml.XppDriver;
-
-/**
- * @displayName XmlParser
- * @category Formatter
- * @tags xml pojo formatter
- * @since 3.2.0
- */
-@InterfaceStability.Evolving
-public class XmlFormatter extends Formatter<String>
-{
-
-  private transient XStream xstream;
-
-  protected String alias;
-  protected String dateFormat;
-  protected boolean prettyPrint;
-
-  public XmlFormatter()
-  {
-    alias = null;
-    dateFormat = null;
-  }
-
-  @Override
-  public void activate(Context context)
-  {
-    if (prettyPrint) {
-      xstream = new XStream();
-    } else {
-      xstream = new XStream(new XppDriver()
-      {
-        @Override
-        public HierarchicalStreamWriter createWriter(Writer out)
-        {
-          return new CompactWriter(out, getNameCoder());
-        }
-      });
-    }
-    if (alias != null) {
-      try {
-        xstream.alias(alias, clazz);
-      } catch (Throwable e) {
-        throw new RuntimeException("Unable find provided class");
-      }
-    }
-    if (dateFormat != null) {
-      xstream.registerConverter(new DateConverter(dateFormat, new String[] {}));
-    }
-  }
-
-  @Override
-  public void deactivate()
-  {
-
-  }
-
-  @Override
-  public String convert(Object tuple)
-  {
-    try {
-      return xstream.toXML(tuple);
-    } catch (XStreamException e) {
-      logger.debug("Error while converting tuple {} {} ",tuple,e.getMessage());
-      return null;
-    }
-  }
-
-  /**
-   * Gets the alias This is an optional step. Without it XStream would work
-   * fine, but the XML element names would contain the fully qualified name of
-   * each class (including package) which would bulk up the XML a bit.
-   * 
-   * @return alias.
-   */
-  public String getAlias()
-  {
-    return alias;
-  }
-
-  /**
-   * Sets the alias This is an optional step. Without it XStream would work
-   * fine, but the XML element names would contain the fully qualified name of
-   * each class (including package) which would bulk up the XML a bit.
-   * 
-   * @param alias
-   *          .
-   */
-  public void setAlias(String alias)
-  {
-    this.alias = alias;
-  }
-
-  /**
-   * Gets the date format e.g dd/mm/yyyy - this will be how a date would be
-   * formatted
-   * 
-   * @return dateFormat.
-   */
-  public String getDateFormat()
-  {
-    return dateFormat;
-  }
-
-  /**
-   * Sets the date format e.g dd/mm/yyyy - this will be how a date would be
-   * formatted
-   * 
-   * @param dateFormat
-   *          .
-   */
-  public void setDateFormat(String dateFormat)
-  {
-    this.dateFormat = dateFormat;
-  }
-
-  /**
-   * Returns true if pretty print is enabled.
-   * 
-   * @return prettyPrint
-   */
-  public boolean isPrettyPrint()
-  {
-    return prettyPrint;
-  }
-
-  /**
-   * Sets pretty print option.
-   * 
-   * @param prettyPrint
-   */
-  public void setPrettyPrint(boolean prettyPrint)
-  {
-    this.prettyPrint = prettyPrint;
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(XmlFormatter.class);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java
deleted file mode 100644
index 991f6eb..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.schema.parser;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import javax.validation.constraints.NotNull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.supercsv.cellprocessor.Optional;
-import org.supercsv.cellprocessor.ParseBool;
-import org.supercsv.cellprocessor.ParseChar;
-import org.supercsv.cellprocessor.ParseDate;
-import org.supercsv.cellprocessor.ParseDouble;
-import org.supercsv.cellprocessor.ParseInt;
-import org.supercsv.cellprocessor.ParseLong;
-import org.supercsv.cellprocessor.ift.CellProcessor;
-import org.supercsv.io.CsvBeanReader;
-import org.supercsv.prefs.CsvPreference;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.lib.util.ReusableStringReader;
-import com.datatorrent.netlet.util.DTThrowable;
-
-/**
- * Operator that converts CSV string to Pojo <br>
- * Assumption is that each field in the delimited data should map to a simple
- * java type.<br>
- * <br>
- * <b>Properties</b> <br>
- * <b>fieldInfo</b>:User need to specify fields and their types as a comma
- * separated string having format &lt;NAME&gt;:&lt;TYPE&gt;|&lt;FORMAT&gt; in
- * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as
- * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy <br>
- * <b>fieldDelimiter</b>: Default is comma <br>
- * <b>lineDelimiter</b>: Default is '\r\n'
- * 
- * @displayName CsvParser
- * @category Parsers
- * @tags csv pojo parser
- * @since 3.2.0
- */
-@InterfaceStability.Evolving
-public class CsvParser extends Parser<String>
-{
-
-  private ArrayList<Field> fields;
-  @NotNull
-  protected int fieldDelimiter;
-  protected String lineDelimiter;
-
-  @NotNull
-  protected String fieldInfo;
-
-  protected transient String[] nameMapping;
-  protected transient CellProcessor[] processors;
-  private transient CsvBeanReader csvReader;
-
-  public enum FIELD_TYPE
-  {
-    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
-  };
-
-  @NotNull
-  private transient ReusableStringReader csvStringReader = new ReusableStringReader();
-
-  public CsvParser()
-  {
-    fields = new ArrayList<Field>();
-    fieldDelimiter = ',';
-    lineDelimiter = "\r\n";
-  }
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    super.setup(context);
-
-    logger.info("field info {}", fieldInfo);
-    fields = new ArrayList<Field>();
-    String[] fieldInfoTuple = fieldInfo.split(",");
-    for (int i = 0; i < fieldInfoTuple.length; i++) {
-      String[] fieldTuple = fieldInfoTuple[i].split(":");
-      Field field = new Field();
-      field.setName(fieldTuple[0]);
-      String[] typeFormat = fieldTuple[1].split("\\|");
-      field.setType(typeFormat[0].toUpperCase());
-      if (typeFormat.length > 1) {
-        field.setFormat(typeFormat[1]);
-      }
-      getFields().add(field);
-    }
-
-    CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
-    csvReader = new CsvBeanReader(csvStringReader, preference);
-    int countKeyValue = getFields().size();
-    logger.info("countKeyValue {}", countKeyValue);
-    nameMapping = new String[countKeyValue];
-    processors = new CellProcessor[countKeyValue];
-    initialise(nameMapping, processors);
-  }
-
-  private void initialise(String[] nameMapping, CellProcessor[] processors)
-  {
-    for (int i = 0; i < getFields().size(); i++) {
-      FIELD_TYPE type = getFields().get(i).type;
-      nameMapping[i] = getFields().get(i).name;
-      if (type == FIELD_TYPE.DOUBLE) {
-        processors[i] = new Optional(new ParseDouble());
-      } else if (type == FIELD_TYPE.INTEGER) {
-        processors[i] = new Optional(new ParseInt());
-      } else if (type == FIELD_TYPE.FLOAT) {
-        processors[i] = new Optional(new ParseDouble());
-      } else if (type == FIELD_TYPE.LONG) {
-        processors[i] = new Optional(new ParseLong());
-      } else if (type == FIELD_TYPE.SHORT) {
-        processors[i] = new Optional(new ParseInt());
-      } else if (type == FIELD_TYPE.STRING) {
-        processors[i] = new Optional();
-      } else if (type == FIELD_TYPE.CHARACTER) {
-        processors[i] = new Optional(new ParseChar());
-      } else if (type == FIELD_TYPE.BOOLEAN) {
-        processors[i] = new Optional(new ParseBool());
-      } else if (type == FIELD_TYPE.DATE) {
-        String dateFormat = getFields().get(i).format;
-        processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
-      }
-    }
-  }
-
-  @Override
-  public void activate(Context context)
-  {
-
-  }
-
-  @Override
-  public void deactivate()
-  {
-
-  }
-
-  @Override
-  public Object convert(String tuple)
-  {
-    try {
-      csvStringReader.open(tuple);
-      return csvReader.read(clazz, nameMapping, processors);
-    } catch (IOException e) {
-      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
-      return null;
-    }
-  }
-
-  @Override
-  public void teardown()
-  {
-    try {
-      if (csvReader != null) {
-        csvReader.close();
-      }
-    } catch (IOException e) {
-      DTThrowable.rethrow(e);
-    }
-  }
-
-  public static class Field
-  {
-    String name;
-    String format;
-    FIELD_TYPE type;
-
-    public String getName()
-    {
-      return name;
-    }
-
-    public void setName(String name)
-    {
-      this.name = name;
-    }
-
-    public FIELD_TYPE getType()
-    {
-      return type;
-    }
-
-    public void setType(String type)
-    {
-      this.type = FIELD_TYPE.valueOf(type);
-    }
-
-    public String getFormat()
-    {
-      return format;
-    }
-
-    public void setFormat(String format)
-    {
-      this.format = format;
-    }
-
-  }
-
-  /**
-   * Gets the array list of the fields, a field being a POJO containing the name
-   * of the field and type of field.
-   * 
-   * @return An array list of Fields.
-   */
-  public ArrayList<Field> getFields()
-  {
-    return fields;
-  }
-
-  /**
-   * Sets the array list of the fields, a field being a POJO containing the name
-   * of the field and type of field.
-   * 
-   * @param fields
-   *          An array list of Fields.
-   */
-  public void setFields(ArrayList<Field> fields)
-  {
-    this.fields = fields;
-  }
-
-  /**
-   * Gets the delimiter which separates fields in incoming data.
-   * 
-   * @return fieldDelimiter
-   */
-  public int getFieldDelimiter()
-  {
-    return fieldDelimiter;
-  }
-
-  /**
-   * Sets the delimiter which separates fields in incoming data.
-   * 
-   * @param fieldDelimiter
-   */
-  public void setFieldDelimiter(int fieldDelimiter)
-  {
-    this.fieldDelimiter = fieldDelimiter;
-  }
-
-  /**
-   * Gets the delimiter which separates lines in incoming data.
-   * 
-   * @return lineDelimiter
-   */
-  public String getLineDelimiter()
-  {
-    return lineDelimiter;
-  }
-
-  /**
-   * Sets the delimiter which separates line in incoming data.
-   * 
-   * @param lineDelimiter
-   */
-  public void setLineDelimiter(String lineDelimiter)
-  {
-    this.lineDelimiter = lineDelimiter;
-  }
-
-  /**
-   * Gets the name of the fields with type and format ( for date ) as comma
-   * separated string in same order as incoming data. e.g
-   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
-   * 
-   * @return fieldInfo
-   */
-  public String getFieldInfo()
-  {
-    return fieldInfo;
-  }
-
-  /**
-   * Sets the name of the fields with type and format ( for date ) as comma
-   * separated string in same order as incoming data. e.g
-   * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
-   * 
-   * @param fieldInfo
-   */
-  public void setFieldInfo(String fieldInfo)
-  {
-    this.fieldInfo = fieldInfo;
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(CsvParser.class);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java
deleted file mode 100644
index 513be15..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.schema.parser;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-
-import org.codehaus.jackson.JsonProcessingException;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.DTThrowable;
-
-/**
- * Operator that converts JSON string to Pojo <br>
- * <b>Properties</b> <br>
- * <b>dateFormat</b>: date format e.g dd/MM/yyyy
- * 
- * @displayName JsonParser
- * @category Parsers
- * @tags json pojo parser
- * @since 3.2.0
- */
-@InterfaceStability.Evolving
-public class JsonParser extends Parser<String>
-{
-
-  private transient ObjectReader reader;
-  protected String dateFormat;
-
-  @Override
-  public void activate(Context context)
-  {
-    try {
-      ObjectMapper mapper = new ObjectMapper();
-      mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-      if (dateFormat != null) {
-        mapper.setDateFormat(new SimpleDateFormat(dateFormat));
-      }
-      reader = mapper.reader(clazz);
-    } catch (Throwable e) {
-      throw new RuntimeException("Unable find provided class");
-    }
-  }
-
-  @Override
-  public void deactivate()
-  {
-  }
-
-  @Override
-  public Object convert(String tuple)
-  {
-    try {
-      if (!StringUtils.isEmpty(tuple)) {
-        return reader.readValue(tuple);
-      }
-    } catch (JsonProcessingException e) {
-      logger.debug("Error while converting tuple {} {}", tuple, e.getMessage());
-    } catch (IOException e) {
-      DTThrowable.rethrow(e);
-    }
-    return null;
-  }
-
-  /**
-   * Get the date format
-   * 
-   * @return Date format string
-   */
-  public String getDateFormat()
-  {
-    return dateFormat;
-  }
-
-  /**
-   * Set the date format
-   * 
-   * @param dateFormat
-   */
-  public void setDateFormat(String dateFormat)
-  {
-    this.dateFormat = dateFormat;
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(JsonParser.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java
deleted file mode 100644
index 3c1df8f..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.schema.parser;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.Context.PortContext;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Operator.ActivationListener;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.contrib.converter.Converter;
-
-/**
- * Abstract class that implements Converter interface. This is a schema enabled
- * Parser <br>
- * Sub classes need to implement the convert method <br>
- * <br>
- * <b>Port Interface</b><br>
- * <b>in</b>: expects &lt;INPUT&gt;<br>
- * <b>out</b>: emits &lt;Object&gt; this is a schema enabled port<br>
- * <b>err</b>: emits &lt;INPUT&gt; error port that emits input tuple that could
- * not be converted<br>
- * <br>
- * 
- * @displayName Parser
- * @tags parser converter
- * @param <INPUT>
- * @since 3.2.0
- */
-@InterfaceStability.Evolving
-public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>,
-    ActivationListener<Context>
-{
-  protected transient Class<?> clazz;
-
-  @OutputPortFieldAnnotation(schemaRequired = true)
-  public transient DefaultOutputPort<Object> out = new DefaultOutputPort<Object>()
-  {
-    public void setup(PortContext context)
-    {
-      clazz = context.getValue(Context.PortContext.TUPLE_CLASS);
-    }
-  };
-
-  @OutputPortFieldAnnotation(optional = true)
-  public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
-
-  public transient DefaultInputPort<INPUT> in = new DefaultInputPort<INPUT>()
-  {
-    @Override
-    public void process(INPUT inputTuple)
-    {
-      Object tuple = convert(inputTuple);
-      if (tuple == null && err.isConnected()) {
-        err.emit(inputTuple);
-        return;
-      }
-      if (out.isConnected()) {
-        out.emit(tuple);
-      }
-    }
-  };
-
-  /**
-   * Get the class that needs to be formatted
-   * 
-   * @return Class<?>
-   */
-  public Class<?> getClazz()
-  {
-    return clazz;
-  }
-
-  /**
-   * Set the class of tuple that needs to be formatted
-   * 
-   * @param clazz
-   */
-  public void setClazz(Class<?> clazz)
-  {
-    this.clazz = clazz;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java
deleted file mode 100644
index 9e1c8be..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.schema.parser;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.thoughtworks.xstream.XStream;
-import com.thoughtworks.xstream.XStreamException;
-import com.thoughtworks.xstream.converters.basic.DateConverter;
-
-import com.datatorrent.api.Context;
-
-/**
- * Operator that converts XML string to Pojo <br>
- * <b>Properties</b> <br>
- * <b>alias</b>:This maps to the root element of the XML string. If not
- * specified, parser would expect the root element to be fully qualified name of
- * the Pojo Class. <br>
- * <b>dateFormats</b>: Comma separated string of date formats e.g
- * dd/mm/yyyy,dd-mmm-yyyy where first one would be considered default
- * 
- * @displayName XmlParser
- * @category Parsers
- * @tags xml pojo parser
- * @since 3.2.0
- */
-@InterfaceStability.Evolving
-public class XmlParser extends Parser<String>
-{
-
-  private transient XStream xstream;
-  protected String alias;
-  protected String dateFormats;
-
-  public XmlParser()
-  {
-    alias = null;
-    dateFormats = null;
-  }
-
-  @Override
-  public void activate(Context context)
-  {
-    xstream = new XStream();
-    if (alias != null) {
-      try {
-        xstream.alias(alias, clazz);
-      } catch (Throwable e) {
-        throw new RuntimeException("Unable find provided class");
-      }
-    }
-    if (dateFormats != null) {
-      String[] dateFormat = dateFormats.split(",");
-      xstream.registerConverter(new DateConverter(dateFormat[0], dateFormat));
-    }
-  }
-
-  @Override
-  public void deactivate()
-  {
-
-  }
-
-  @Override
-  public Object convert(String tuple)
-  {
-    try {
-      return xstream.fromXML(tuple);
-    } catch (XStreamException e) {
-      logger.debug("Error while converting tuple {} {}", tuple,e.getMessage());
-      return null;
-    }
-  }
-
-  /**
-   * Gets the alias
-   * 
-   * @return alias.
-   */
-  public String getAlias()
-  {
-    return alias;
-  }
-
-  /**
-   * Sets the alias This maps to the root element of the XML string. If not
-   * specified, parser would expect the root element to be fully qualified name
-   * of the Pojo Class.
-   * 
-   * @param alias
-   *          .
-   */
-  public void setAlias(String alias)
-  {
-    this.alias = alias;
-  }
-
-  /**
-   * Gets the comma separated string of date formats e.g dd/mm/yyyy,dd-mmm-yyyy
-   * where first one would be considered default
-   * 
-   * @return dateFormats.
-   */
-  public String getDateFormats()
-  {
-    return dateFormats;
-  }
-
-  /**
-   * Sets the comma separated string of date formats e.g dd/mm/yyyy,dd-mmm-yyyy
-   * where first one would be considered default
-   * 
-   * @param dateFormats
-   *          .
-   */
-  public void setDateFormats(String dateFormats)
-  {
-    this.dateFormats = dateFormats;
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(XmlParser.class);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java
new file mode 100644
index 0000000..13d9739
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/formatter/CsvFormatterTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.formatter;
+
+import java.util.Date;
+
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.contrib.formatter.CsvFormatter;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class CsvFormatterTest
+{
+
+  CsvFormatter operator;
+  CollectorTestSink<Object> validDataSink;
+  CollectorTestSink<String> invalidDataSink;
+
+  @Rule
+  public Watcher watcher = new Watcher();
+
+  public class Watcher extends TestWatcher
+  {
+
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      operator = new CsvFormatter();
+      operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date");
+      operator.setLineDelimiter("\r\n");
+      validDataSink = new CollectorTestSink<Object>();
+      invalidDataSink = new CollectorTestSink<String>();
+      TestUtils.setSink(operator.out, validDataSink);
+      TestUtils.setSink(operator.err, invalidDataSink);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      super.finished(description);
+      operator.teardown();
+    }
+
+  }
+
+  @Test
+  public void testPojoReaderToCsv()
+  {
+    operator.setup(null);
+    EmployeeBean emp = new EmployeeBean();
+    emp.setName("john");
+    emp.setDept("cs");
+    emp.setEid(1);
+    emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate());
+    operator.in.process(emp);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    String csvOp = (String)validDataSink.collectedTuples.get(0);
+    Assert.assertNotNull(csvOp);
+    Assert.assertEquals("john,cs,1,01/01/2015" + operator.getLineDelimiter(), csvOp);
+  }
+
+  @Test
+  public void testPojoReaderToCsvMultipleDate()
+  {
+    operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date,dateOfBirth:date|dd-MMM-yyyy");
+    operator.setup(null);
+    EmployeeBean emp = new EmployeeBean();
+    emp.setName("john");
+    emp.setDept("cs");
+    emp.setEid(1);
+    emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate());
+    emp.setDateOfBirth(new DateTime().withDate(2015, 1, 1).toDate());
+    operator.in.process(emp);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    String csvOp = (String)validDataSink.collectedTuples.get(0);
+    Assert.assertNotNull(csvOp);
+    Assert.assertEquals("john,cs,1,01/01/2015,01-Jan-2015" + operator.getLineDelimiter(), csvOp);
+  }
+
+  public static class EmployeeBean
+  {
+
+    private String name;
+    private String dept;
+    private int eid;
+    private Date dateOfJoining;
+    private Date dateOfBirth;
+
+    public String getName()
+    {
+      return name;
+    }
+
+    public void setName(String name)
+    {
+      this.name = name;
+    }
+
+    public String getDept()
+    {
+      return dept;
+    }
+
+    public void setDept(String dept)
+    {
+      this.dept = dept;
+    }
+
+    public int getEid()
+    {
+      return eid;
+    }
+
+    public void setEid(int eid)
+    {
+      this.eid = eid;
+    }
+
+    public Date getDateOfJoining()
+    {
+      return dateOfJoining;
+    }
+
+    public void setDateOfJoining(Date dateOfJoining)
+    {
+      this.dateOfJoining = dateOfJoining;
+    }
+
+    public Date getDateOfBirth()
+    {
+      return dateOfBirth;
+    }
+
+    public void setDateOfBirth(Date dateOfBirth)
+    {
+      this.dateOfBirth = dateOfBirth;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java
new file mode 100644
index 0000000..c9a4179
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.util.Date;
+
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class CsvPOJOParserTest
+{
+
+  CsvParser operator;
+  CollectorTestSink<Object> validDataSink;
+  CollectorTestSink<String> invalidDataSink;
+
+  @Rule
+  public Watcher watcher = new Watcher();
+
+  public class Watcher extends TestWatcher
+  {
+
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      operator = new CsvParser();
+      operator.setClazz(EmployeeBean.class);
+      operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date");
+      validDataSink = new CollectorTestSink<Object>();
+      invalidDataSink = new CollectorTestSink<String>();
+      TestUtils.setSink(operator.out, validDataSink);
+      TestUtils.setSink(operator.err, invalidDataSink);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      super.finished(description);
+      operator.teardown();
+    }
+
+  }
+
+  @Test
+  public void testCsvToPojoWriterDefault()
+  {
+    operator.setup(null);
+    String tuple = "john,cs,1,01/01/2015";
+    operator.in.process(tuple);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    Object obj = validDataSink.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(EmployeeBean.class, obj.getClass());
+    EmployeeBean pojo = (EmployeeBean)obj;
+    Assert.assertEquals("john", pojo.getName());
+    Assert.assertEquals("cs", pojo.getDept());
+    Assert.assertEquals(1, pojo.getEid());
+    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
+        pojo.getDateOfJoining()));
+  }
+
+  @Test
+  public void testCsvToPojoWriterDateFormat()
+  {
+    operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy");
+    operator.setup(null);
+    String tuple = "john,cs,1,01-JAN-2015";
+    operator.in.process(tuple);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    Object obj = validDataSink.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(EmployeeBean.class, obj.getClass());
+    EmployeeBean pojo = (EmployeeBean)obj;
+    Assert.assertEquals("john", pojo.getName());
+    Assert.assertEquals("cs", pojo.getDept());
+    Assert.assertEquals(1, pojo.getEid());
+    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
+        pojo.getDateOfJoining()));
+  }
+
+  @Test
+  public void testCsvToPojoWriterDateFormatMultiple()
+  {
+    operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy,dateOfBirth:date");
+    operator.setup(null);
+    String tuple = "john,cs,1,01-JAN-2015,01/01/2015";
+    operator.in.process(tuple);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    Object obj = validDataSink.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(EmployeeBean.class, obj.getClass());
+    EmployeeBean pojo = (EmployeeBean)obj;
+    Assert.assertEquals("john", pojo.getName());
+    Assert.assertEquals("cs", pojo.getDept());
+    Assert.assertEquals(1, pojo.getEid());
+    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
+        pojo.getDateOfJoining()));
+    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
+        pojo.getDateOfBirth()));
+  }
+
+  public static class EmployeeBean
+  {
+
+    private String name;
+    private String dept;
+    private int eid;
+    private Date dateOfJoining;
+    private Date dateOfBirth;
+
+    public String getName()
+    {
+      return name;
+    }
+
+    public void setName(String name)
+    {
+      this.name = name;
+    }
+
+    public String getDept()
+    {
+      return dept;
+    }
+
+    public void setDept(String dept)
+    {
+      this.dept = dept;
+    }
+
+    public int getEid()
+    {
+      return eid;
+    }
+
+    public void setEid(int eid)
+    {
+      this.eid = eid;
+    }
+
+    public Date getDateOfJoining()
+    {
+      return dateOfJoining;
+    }
+
+    public void setDateOfJoining(Date dateOfJoining)
+    {
+      this.dateOfJoining = dateOfJoining;
+    }
+
+    public Date getDateOfBirth()
+    {
+      return dateOfBirth;
+    }
+
+    public void setDateOfBirth(Date dateOfBirth)
+    {
+      this.dateOfBirth = dateOfBirth;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java
deleted file mode 100644
index 8183381..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.schema.formatter;
-
-import java.util.Date;
-
-import org.joda.time.DateTime;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import com.datatorrent.contrib.schema.formatter.CsvFormatter;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-
-public class CsvFormatterTest
-{
-
-  CsvFormatter operator;
-  CollectorTestSink<Object> validDataSink;
-  CollectorTestSink<String> invalidDataSink;
-
-  @Rule
-  public Watcher watcher = new Watcher();
-
-  public class Watcher extends TestWatcher
-  {
-
-    @Override
-    protected void starting(Description description)
-    {
-      super.starting(description);
-      operator = new CsvFormatter();
-      operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date");
-      operator.setLineDelimiter("\r\n");
-      validDataSink = new CollectorTestSink<Object>();
-      invalidDataSink = new CollectorTestSink<String>();
-      TestUtils.setSink(operator.out, validDataSink);
-      TestUtils.setSink(operator.err, invalidDataSink);
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      super.finished(description);
-      operator.teardown();
-    }
-
-  }
-
-  @Test
-  public void testPojoReaderToCsv()
-  {
-    operator.setup(null);
-    EmployeeBean emp = new EmployeeBean();
-    emp.setName("john");
-    emp.setDept("cs");
-    emp.setEid(1);
-    emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate());
-    operator.in.process(emp);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String csvOp = (String)validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(csvOp);
-    Assert.assertEquals("john,cs,1,01/01/2015" + operator.getLineDelimiter(), csvOp);
-  }
-
-  @Test
-  public void testPojoReaderToCsvMultipleDate()
-  {
-    operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date,dateOfBirth:date|dd-MMM-yyyy");
-    operator.setup(null);
-    EmployeeBean emp = new EmployeeBean();
-    emp.setName("john");
-    emp.setDept("cs");
-    emp.setEid(1);
-    emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate());
-    emp.setDateOfBirth(new DateTime().withDate(2015, 1, 1).toDate());
-    operator.in.process(emp);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String csvOp = (String)validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(csvOp);
-    Assert.assertEquals("john,cs,1,01/01/2015,01-Jan-2015" + operator.getLineDelimiter(), csvOp);
-  }
-
-  public static class EmployeeBean
-  {
-
-    private String name;
-    private String dept;
-    private int eid;
-    private Date dateOfJoining;
-    private Date dateOfBirth;
-
-    public String getName()
-    {
-      return name;
-    }
-
-    public void setName(String name)
-    {
-      this.name = name;
-    }
-
-    public String getDept()
-    {
-      return dept;
-    }
-
-    public void setDept(String dept)
-    {
-      this.dept = dept;
-    }
-
-    public int getEid()
-    {
-      return eid;
-    }
-
-    public void setEid(int eid)
-    {
-      this.eid = eid;
-    }
-
-    public Date getDateOfJoining()
-    {
-      return dateOfJoining;
-    }
-
-    public void setDateOfJoining(Date dateOfJoining)
-    {
-      this.dateOfJoining = dateOfJoining;
-    }
-
-    public Date getDateOfBirth()
-    {
-      return dateOfBirth;
-    }
-
-    public void setDateOfBirth(Date dateOfBirth)
-    {
-      this.dateOfBirth = dateOfBirth;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java
deleted file mode 100644
index d377b07..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.schema.formatter;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.io.FileUtils;
-import org.joda.time.DateTime;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.Description;
-
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-import com.datatorrent.lib.util.TestUtils.TestInfo;
-import com.google.common.collect.Lists;
-
-public class JsonFormatterTest
-{
-  JsonFormatter operator;
-  CollectorTestSink<Object> validDataSink;
-  CollectorTestSink<String> invalidDataSink;
-
-  final ByteArrayOutputStream myOut = new ByteArrayOutputStream();
-
-  public JsonFormatterTest()
-  {
-    // So that the output is cleaner.
-    System.setErr(new PrintStream(myOut));
-  }
-
-  @Rule
-  public TestInfo testMeta = new FSTestWatcher()
-  {
-    private void deleteDirectory()
-    {
-      try {
-        FileUtils.deleteDirectory(new File(getDir()));
-      } catch (IOException ex) {
-        throw new RuntimeException(ex);
-      }
-    }
-
-    @Override
-    protected void starting(Description descriptor)
-    {
-      super.starting(descriptor);
-      deleteDirectory();
-
-      operator = new JsonFormatter();
-
-      validDataSink = new CollectorTestSink<Object>();
-      invalidDataSink = new CollectorTestSink<String>();
-      TestUtils.setSink(operator.out, validDataSink);
-      TestUtils.setSink(operator.err, invalidDataSink);
-      operator.setup(null);
-      operator.activate(null);
-
-      operator.beginWindow(0);
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      operator.endWindow();
-      operator.teardown();
-
-      deleteDirectory();
-      super.finished(description);
-    }
-  };
-
-  @Test
-  public void testJSONToPOJO()
-  {
-    Test1Pojo pojo = new Test1Pojo();
-    pojo.a = 123;
-    pojo.b = 234876274;
-    pojo.c = "HowAreYou?";
-    pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ");
-
-    operator.in.put(pojo);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":null}";
-    Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testJSONToPOJODate()
-  {
-    Test1Pojo pojo = new Test1Pojo();
-    pojo.a = 123;
-    pojo.b = 234876274;
-    pojo.c = "HowAreYou?";
-    pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ");
-    pojo.date = new DateTime().withYear(2015).withMonthOfYear(9).withDayOfMonth(15).toDate();
-    operator.setDateFormat("dd-MM-yyyy");
-    operator.setup(null);
-    operator.activate(null);
-    operator.in.put(pojo);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}";
-    Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testJSONToPOJONullFields()
-  {
-    Test1Pojo pojo = new Test1Pojo();
-    pojo.a = 123;
-    pojo.b = 234876274;
-    pojo.c = "HowAreYou?";
-    pojo.d = null;
-
-    operator.in.put(pojo);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":null,\"date\":null}";
-    Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testJSONToPOJOEmptyPOJO()
-  {
-    Test1Pojo pojo = new Test1Pojo();
-    operator.in.put(pojo);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expectedJSONString = "{\"a\":0,\"b\":0,\"c\":null,\"d\":null,\"date\":null}";
-    System.out.println(validDataSink.collectedTuples.get(0));
-    Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testJSONToPOJONullPOJO()
-  {
-    operator.in.put(null);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expectedJSONString = "null";
-    Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testJSONToPOJONoFieldPOJO()
-  {
-    operator.endWindow();
-    operator.teardown();
-    operator.setClazz(Test2Pojo.class);
-    operator.setup(null);
-    operator.beginWindow(1);
-
-    Test2Pojo o = new Test2Pojo();
-    operator.in.put(o);
-    Assert.assertEquals(0, validDataSink.collectedTuples.size());
-    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
-    Assert.assertEquals(o, invalidDataSink.collectedTuples.get(0));
-  }
-
-  public static class Test1Pojo
-  {
-    public int a;
-    public long b;
-    public String c;
-    public List<String> d;
-    public Date date;
-
-    @Override
-    public String toString()
-    {
-      return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]";
-    }
-  }
-
-  public static class Test2Pojo
-  {
-  }
-
-}