You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:12:23 UTC

[06/50] [abbrv] incubator-apex-malhar git commit: Moving XML and JSON parsers & formatters to Malhar-lib and changing package names for parsers & formatters

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java
deleted file mode 100644
index 969da1e..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.schema.formatter;
-
-import java.util.Date;
-
-import org.joda.time.DateTime;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import com.datatorrent.contrib.schema.formatter.XmlFormatter;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-
-public class XmlFormatterTest
-{
-
-  XmlFormatter operator;
-  CollectorTestSink<Object> validDataSink;
-  CollectorTestSink<String> invalidDataSink;
-
-  @Rule
-  public Watcher watcher = new Watcher();
-
-  public class Watcher extends TestWatcher
-  {
-
-    @Override
-    protected void starting(Description description)
-    {
-      super.starting(description);
-      operator = new XmlFormatter();
-      operator.setClazz(EmployeeBean.class);
-      operator.setDateFormat("yyyy-MM-dd");
-      validDataSink = new CollectorTestSink<Object>();
-      invalidDataSink = new CollectorTestSink<String>();
-      TestUtils.setSink(operator.out, validDataSink);
-      TestUtils.setSink(operator.err, invalidDataSink);
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      super.finished(description);
-      operator.teardown();
-    }
-
-  }
-
-  @Test
-  public void testPojoToXmlWithoutAlias()
-  {
-    EmployeeBean e = new EmployeeBean();
-    e.setName("john");
-    e.setEid(1);
-    e.setDept("cs");
-    e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
-
-    operator.setup(null);
-    operator.activate(null);
-    operator.in.process(e);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expected = "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" + "<name>john</name>"
-        + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>"
-        + "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>";
-    Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testXmlToPojoWithAlias()
-  {
-    EmployeeBean e = new EmployeeBean();
-    e.setName("john");
-    e.setEid(1);
-    e.setDept("cs");
-    e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
-
-    operator.setAlias("EmployeeBean");
-    operator.setup(null);
-    operator.activate(null);
-    operator.in.process(e);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expected = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>"
-        + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>";
-    Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testXmlToPojoWithPrettyPrint()
-  {
-    EmployeeBean e = new EmployeeBean();
-    e.setName("john");
-    e.setEid(1);
-    e.setDept("cs");
-    e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
-
-    operator.setAlias("EmployeeBean");
-    operator.setPrettyPrint(true);
-    operator.setup(null);
-    operator.activate(null);
-    operator.in.process(e);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expected = "<EmployeeBean>\n" + "  <name>john</name>\n" + "  <dept>cs</dept>\n" + "  <eid>1</eid>\n"
-        + "  <dateOfJoining>2015-01-01</dateOfJoining>\n" + "</EmployeeBean>";
-    Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testPojoToXmlWithoutAliasHeirarchical()
-  {
-    EmployeeBean e = new EmployeeBean();
-    e.setName("john");
-    e.setEid(1);
-    e.setDept("cs");
-    e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
-    Address address = new Address();
-    address.setCity("new york");
-    address.setCountry("US");
-    e.setAddress(address);
-
-    operator.setup(null);
-    operator.activate(null);
-    operator.in.process(e);
-    System.out.println(validDataSink.collectedTuples.get(0));
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expected = "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" + "<name>john</name>"
-        + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>"
-        + "<city>new york</city>" + "<country>US</country>" + "</address>"
-        + "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>";
-    Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
-  }
-
-  public static class EmployeeBean
-  {
-
-    private String name;
-    private String dept;
-    private int eid;
-    private Date dateOfJoining;
-    private Address address;
-
-    public String getName()
-    {
-      return name;
-    }
-
-    public void setName(String name)
-    {
-      this.name = name;
-    }
-
-    public String getDept()
-    {
-      return dept;
-    }
-
-    public void setDept(String dept)
-    {
-      this.dept = dept;
-    }
-
-    public int getEid()
-    {
-      return eid;
-    }
-
-    public void setEid(int eid)
-    {
-      this.eid = eid;
-    }
-
-    public Date getDateOfJoining()
-    {
-      return dateOfJoining;
-    }
-
-    public void setDateOfJoining(Date dateOfJoining)
-    {
-      this.dateOfJoining = dateOfJoining;
-    }
-
-    public Address getAddress()
-    {
-      return address;
-    }
-
-    public void setAddress(Address address)
-    {
-      this.address = address;
-    }
-  }
-
-  public static class Address
-  {
-
-    private String city;
-    private String country;
-
-    public String getCity()
-    {
-      return city;
-    }
-
-    public void setCity(String city)
-    {
-      this.city = city;
-    }
-
-    public String getCountry()
-    {
-      return country;
-    }
-
-    public void setCountry(String country)
-    {
-      this.country = country;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java
deleted file mode 100644
index 9e87496..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.schema.parser;
-
-import java.util.Date;
-
-import org.joda.time.DateTime;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import com.datatorrent.contrib.schema.parser.CsvParser;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-
-public class CsvParserTest
-{
-
-  CsvParser operator;
-  CollectorTestSink<Object> validDataSink;
-  CollectorTestSink<String> invalidDataSink;
-
-  @Rule
-  public Watcher watcher = new Watcher();
-
-  public class Watcher extends TestWatcher
-  {
-
-    @Override
-    protected void starting(Description description)
-    {
-      super.starting(description);
-      operator = new CsvParser();
-      operator.setClazz(EmployeeBean.class);
-      operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date");
-      validDataSink = new CollectorTestSink<Object>();
-      invalidDataSink = new CollectorTestSink<String>();
-      TestUtils.setSink(operator.out, validDataSink);
-      TestUtils.setSink(operator.err, invalidDataSink);
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      super.finished(description);
-      operator.teardown();
-    }
-
-  }
-
-  @Test
-  public void testCsvToPojoWriterDefault()
-  {
-    operator.setup(null);
-    String tuple = "john,cs,1,01/01/2015";
-    operator.in.process(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(EmployeeBean.class, obj.getClass());
-    EmployeeBean pojo = (EmployeeBean)obj;
-    Assert.assertEquals("john", pojo.getName());
-    Assert.assertEquals("cs", pojo.getDept());
-    Assert.assertEquals(1, pojo.getEid());
-    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
-        pojo.getDateOfJoining()));
-  }
-
-  @Test
-  public void testCsvToPojoWriterDateFormat()
-  {
-    operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy");
-    operator.setup(null);
-    String tuple = "john,cs,1,01-JAN-2015";
-    operator.in.process(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(EmployeeBean.class, obj.getClass());
-    EmployeeBean pojo = (EmployeeBean)obj;
-    Assert.assertEquals("john", pojo.getName());
-    Assert.assertEquals("cs", pojo.getDept());
-    Assert.assertEquals(1, pojo.getEid());
-    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
-        pojo.getDateOfJoining()));
-  }
-
-  @Test
-  public void testCsvToPojoWriterDateFormatMultiple()
-  {
-    operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy,dateOfBirth:date");
-    operator.setup(null);
-    String tuple = "john,cs,1,01-JAN-2015,01/01/2015";
-    operator.in.process(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(EmployeeBean.class, obj.getClass());
-    EmployeeBean pojo = (EmployeeBean)obj;
-    Assert.assertEquals("john", pojo.getName());
-    Assert.assertEquals("cs", pojo.getDept());
-    Assert.assertEquals(1, pojo.getEid());
-    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
-        pojo.getDateOfJoining()));
-    Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
-        pojo.getDateOfBirth()));
-  }
-
-  public static class EmployeeBean
-  {
-
-    private String name;
-    private String dept;
-    private int eid;
-    private Date dateOfJoining;
-    private Date dateOfBirth;
-
-    public String getName()
-    {
-      return name;
-    }
-
-    public void setName(String name)
-    {
-      this.name = name;
-    }
-
-    public String getDept()
-    {
-      return dept;
-    }
-
-    public void setDept(String dept)
-    {
-      this.dept = dept;
-    }
-
-    public int getEid()
-    {
-      return eid;
-    }
-
-    public void setEid(int eid)
-    {
-      this.eid = eid;
-    }
-
-    public Date getDateOfJoining()
-    {
-      return dateOfJoining;
-    }
-
-    public void setDateOfJoining(Date dateOfJoining)
-    {
-      this.dateOfJoining = dateOfJoining;
-    }
-
-    public Date getDateOfBirth()
-    {
-      return dateOfBirth;
-    }
-
-    public void setDateOfBirth(Date dateOfBirth)
-    {
-      this.dateOfBirth = dateOfBirth;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java
deleted file mode 100644
index 5a50ddb..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.schema.parser;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.io.FileUtils;
-import org.joda.time.DateTime;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.Description;
-
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-import com.datatorrent.lib.util.TestUtils.TestInfo;
-
-public class JsonParserTest
-{
-  JsonParser operator;
-  CollectorTestSink<Object> validDataSink;
-  CollectorTestSink<String> invalidDataSink;
-
-  final ByteArrayOutputStream myOut = new ByteArrayOutputStream();
-
-  public JsonParserTest()
-  {
-    // So that the output is cleaner.
-    System.setErr(new PrintStream(myOut));
-  }
-
-  @Rule
-  public TestInfo testMeta = new FSTestWatcher()
-  {
-    private void deleteDirectory()
-    {
-      try {
-        FileUtils.deleteDirectory(new File(getDir()));
-      } catch (IOException ex) {
-        throw new RuntimeException(ex);
-      }
-    }
-
-    @Override
-    protected void starting(Description descriptor)
-    {
-
-      super.starting(descriptor);
-      deleteDirectory();
-
-      operator = new JsonParser();
-      operator.setClazz(Test1Pojo.class);
-      validDataSink = new CollectorTestSink<Object>();
-      invalidDataSink = new CollectorTestSink<String>();
-      TestUtils.setSink(operator.out, validDataSink);
-      TestUtils.setSink(operator.err, invalidDataSink);
-      operator.setup(null);
-      operator.activate(null);
-
-      operator.beginWindow(0);
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      operator.endWindow();
-      operator.teardown();
-
-      deleteDirectory();
-      super.finished(description);
-    }
-  };
-
-  @Test
-  public void testJSONToPOJO()
-  {
-    String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
-    operator.in.put(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(Test1Pojo.class, obj.getClass());
-    Test1Pojo pojo = (Test1Pojo)obj;
-    Assert.assertEquals(123, pojo.a);
-    Assert.assertEquals(234876274, pojo.b);
-    Assert.assertEquals("HowAreYou?", pojo.c);
-    Assert.assertEquals(3, pojo.d.size());
-    Assert.assertEquals("ABC", pojo.d.get(0));
-    Assert.assertEquals("PQR", pojo.d.get(1));
-    Assert.assertEquals("XYZ", pojo.d.get(2));
-  }
-
-  @Test
-  public void testJSONToPOJODate()
-  {
-    String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}";
-    operator.setDateFormat("dd-MM-yyyy");
-    operator.setup(null);
-    operator.activate(null);
-    operator.in.put(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(Test1Pojo.class, obj.getClass());
-    Test1Pojo pojo = (Test1Pojo)obj;
-    Assert.assertEquals(123, pojo.a);
-    Assert.assertEquals(234876274, pojo.b);
-    Assert.assertEquals("HowAreYou?", pojo.c);
-    Assert.assertEquals(3, pojo.d.size());
-    Assert.assertEquals("ABC", pojo.d.get(0));
-    Assert.assertEquals("PQR", pojo.d.get(1));
-    Assert.assertEquals("XYZ", pojo.d.get(2));
-    Assert.assertEquals(2015, new DateTime(pojo.date).getYear());
-    Assert.assertEquals(9, new DateTime(pojo.date).getMonthOfYear());
-    Assert.assertEquals(15, new DateTime(pojo.date).getDayOfMonth());
-  }
-
-  @Test
-  public void testJSONToPOJOInvalidData()
-  {
-    String tuple = "{\"a\":123\"b\":234876274,\"c\":\"HowAreYou?\"}";
-    operator.in.put(tuple);
-    Assert.assertEquals(0, validDataSink.collectedTuples.size());
-    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
-    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testJSONToPOJOUnknownFields()
-  {
-    String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"asd\":433.6}";
-    operator.in.put(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(Test1Pojo.class, obj.getClass());
-    Test1Pojo pojo = (Test1Pojo)obj;
-    Assert.assertEquals(123, pojo.a);
-    Assert.assertEquals(234876274, pojo.b);
-    Assert.assertEquals("HowAreYou?", pojo.c);
-    Assert.assertEquals(null, pojo.d);
-  }
-
-  @Test
-  public void testJSONToPOJOMismatchingFields()
-  {
-    String tuple = "{\"a\":123,\"c\":234876274,\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
-    operator.in.put(tuple);
-    Assert.assertEquals(0, validDataSink.collectedTuples.size());
-    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
-    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testJSONToPOJOEmptyString()
-  {
-    String tuple = "";
-    operator.in.put(tuple);
-    Assert.assertEquals(0, validDataSink.collectedTuples.size());
-    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
-    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testJSONToPOJOEmptyJSON()
-  {
-    String tuple = "{}";
-    operator.in.put(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(Test1Pojo.class, obj.getClass());
-    Test1Pojo pojo = (Test1Pojo)obj;
-    Assert.assertEquals(0, pojo.a);
-    Assert.assertEquals(0, pojo.b);
-    Assert.assertEquals(null, pojo.c);
-    Assert.assertEquals(null, pojo.d);
-  }
-
-  @Test
-  public void testJSONToPOJOArrayInJson()
-  {
-    String tuple = "{\"a\":123,\"c\":[234,65,23],\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
-    operator.in.put(tuple);
-    Assert.assertEquals(0, validDataSink.collectedTuples.size());
-    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
-    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
-  }
-
-  public static class Test1Pojo
-  {
-    public int a;
-    public long b;
-    public String c;
-    public List<String> d;
-    public Date date;
-
-    @Override
-    public String toString()
-    {
-      return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]";
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java
deleted file mode 100644
index c5f0407..0000000
--- a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.contrib.schema.parser;
-
-import java.util.Date;
-
-import org.joda.time.DateTime;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-
-public class XmlParserTest
-{
-  XmlParser operator;
-  CollectorTestSink<Object> validDataSink;
-  CollectorTestSink<String> invalidDataSink;
-
-  @Rule
-  public Watcher watcher = new Watcher();
-
-  public class Watcher extends TestWatcher
-  {
-
-    @Override
-    protected void starting(Description description)
-    {
-      super.starting(description);
-      operator = new XmlParser();
-      operator.setClazz(EmployeeBean.class);
-      operator.setDateFormats("yyyy-MM-dd"); //setting default date pattern
-      validDataSink = new CollectorTestSink<Object>();
-      invalidDataSink = new CollectorTestSink<String>();
-      TestUtils.setSink(operator.out, validDataSink);
-      TestUtils.setSink(operator.err, invalidDataSink);
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      super.finished(description);
-      operator.teardown();
-    }
-
-  }
-
-  @Test
-  public void testXmlToPojoWithoutAlias()
-  {
-    String tuple = "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + "<name>john</name>"
-        + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>"
-        + "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>";
-
-    operator.setup(null);
-    operator.activate(null);
-    operator.in.process(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(EmployeeBean.class, obj.getClass());
-    EmployeeBean pojo = (EmployeeBean)obj;
-    Assert.assertEquals("john", pojo.getName());
-    Assert.assertEquals("cs", pojo.getDept());
-    Assert.assertEquals(1, pojo.getEid());
-    Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear());
-    Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear());
-    Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth());
-  }
-
-  @Test
-  public void testXmlToPojoWithAliasDateFormat()
-  {
-    String tuple = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>"
-        + "<dateOfJoining>2015-JAN-01</dateOfJoining>" + "</EmployeeBean>";
-
-    operator.setAlias("EmployeeBean");
-    operator.setDateFormats("yyyy-MM-dd,yyyy-MMM-dd");
-    operator.setup(null);
-    operator.activate(null);
-    operator.in.process(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(EmployeeBean.class, obj.getClass());
-    EmployeeBean pojo = (EmployeeBean)obj;
-    Assert.assertEquals("john", pojo.getName());
-    Assert.assertEquals("cs", pojo.getDept());
-    Assert.assertEquals(1, pojo.getEid());
-    Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear());
-    Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear());
-    Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth());
-  }
-
-  @Test
-  public void testXmlToPojoWithAlias()
-  {
-    String tuple = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>"
-        + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>";
-
-    operator.setAlias("EmployeeBean");
-    operator.setup(null);
-    operator.activate(null);
-    operator.in.process(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(EmployeeBean.class, obj.getClass());
-    EmployeeBean pojo = (EmployeeBean)obj;
-    Assert.assertEquals("john", pojo.getName());
-    Assert.assertEquals("cs", pojo.getDept());
-    Assert.assertEquals(1, pojo.getEid());
-    Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear());
-    Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear());
-    Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth());
-  }
-
-  @Test
-  public void testXmlToPojoIncorrectXML()
-  {
-    String tuple = "<EmployeeBean>"
-        + "<firstname>john</firstname>" //incorrect field name
-        + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01 00:00:00.00 IST</dateOfJoining>"
-        + "</EmployeeBean>";
-
-    operator.setAlias("EmployeeBean");
-    operator.setup(null);
-    operator.activate(null);
-    operator.in.process(tuple);
-    Assert.assertEquals(0, validDataSink.collectedTuples.size());
-    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
-    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testXmlToPojoWithoutAliasHeirarchical()
-  {
-    String tuple = "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + "<name>john</name>"
-        + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>"
-        + "<city>new york</city>" + "<country>US</country>" + "</address>"
-        + "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>";
-
-    operator.setup(null);
-    operator.activate(null);
-    operator.in.process(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(EmployeeBean.class, obj.getClass());
-    EmployeeBean pojo = (EmployeeBean)obj;
-    Assert.assertEquals("john", pojo.getName());
-    Assert.assertEquals("cs", pojo.getDept());
-    Assert.assertEquals(1, pojo.getEid());
-    Assert.assertEquals(Address.class, pojo.getAddress().getClass());
-    Assert.assertEquals("new york", pojo.getAddress().getCity());
-    Assert.assertEquals("US", pojo.getAddress().getCountry());
-    Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear());
-    Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear());
-    Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth());
-  }
-
-  public static class EmployeeBean
-  {
-
-    private String name;
-    private String dept;
-    private int eid;
-    private Date dateOfJoining;
-    private Address address;
-
-    public String getName()
-    {
-      return name;
-    }
-
-    public void setName(String name)
-    {
-      this.name = name;
-    }
-
-    public String getDept()
-    {
-      return dept;
-    }
-
-    public void setDept(String dept)
-    {
-      this.dept = dept;
-    }
-
-    public int getEid()
-    {
-      return eid;
-    }
-
-    public void setEid(int eid)
-    {
-      this.eid = eid;
-    }
-
-    public Date getDateOfJoining()
-    {
-      return dateOfJoining;
-    }
-
-    public void setDateOfJoining(Date dateOfJoining)
-    {
-      this.dateOfJoining = dateOfJoining;
-    }
-
-    public Address getAddress()
-    {
-      return address;
-    }
-
-    public void setAddress(Address address)
-    {
-      this.address = address;
-    }
-  }
-
-  public static class Address
-  {
-
-    private String city;
-    private String country;
-
-    public String getCity()
-    {
-      return city;
-    }
-
-    public void setCity(String city)
-    {
-      this.city = city;
-    }
-
-    public String getCountry()
-    {
-      return country;
-    }
-
-    public void setCountry(String country)
-    {
-      this.country = country;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index 8df7a7e..cbc0fb7 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -300,6 +300,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <!-- required by Xml parser and formatter -->
+      <groupId>com.thoughtworks.xstream</groupId>
+      <artifactId>xstream</artifactId>
+      <version>1.4.8</version>
+    </dependency>
+    <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-websocket</artifactId>
       <version>${jetty.version}</version>
@@ -310,6 +316,12 @@
       <artifactId>commons-beanutils</artifactId>
       <version>1.8.3</version>
     </dependency>
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>2.9.1</version>
+    </dependency>
+
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/main/java/com/datatorrent/lib/converter/Converter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/converter/Converter.java b/library/src/main/java/com/datatorrent/lib/converter/Converter.java
new file mode 100644
index 0000000..f1d4325
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/converter/Converter.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.converter;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Operators that are converting tuples from one format to another must
+ * implement this interface. Eg. Parsers or formatters , that parse data of
+ * certain format and convert them to another format.
+ * 
+ * @param <INPUT>
+ * @param <OUTPUT>
+ * @since 3.2.0
+ */
+@InterfaceStability.Evolving
+public interface Converter<INPUT, OUTPUT>
+{
+  /**
+   * Provide the implementation for converting tuples from one format to the
+   * other
+   * 
+   * @param INPUT
+   *          tuple of certain format
+   * @return OUTPUT tuple of converted format
+   */
+  public OUTPUT convert(INPUT tuple);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
new file mode 100644
index 0000000..0de9070
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.formatter;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.converter.Converter;
+
+/**
+ * Abstract class that implements Converter interface. This is a schema enabled
+ * Formatter <br>
+ * Sub classes need to implement the convert method <br>
+ * <b>Port Interface</b><br>
+ * <b>in</b>: expects &lt;Object&gt; this is a schema enabled port<br>
+ * <b>out</b>: emits &lt;OUTPUT&gt; <br>
+ * <b>err</b>: emits &lt;Object&gt; error port that emits input tuple that could
+ * not be converted<br>
+ * <br>
+ * 
+ * @displayName Parser
+ * @tags parser converter
+ * @param <INPUT>
+ * @since 3.2.0
+ */
+@InterfaceStability.Evolving
+public abstract class Formatter<OUTPUT> extends BaseOperator implements Converter<Object, OUTPUT>,
+    ActivationListener<Context>
+{
+  protected transient Class<?> clazz;
+
+  @OutputPortFieldAnnotation
+  public transient DefaultOutputPort<OUTPUT> out = new DefaultOutputPort<OUTPUT>();
+
+  @OutputPortFieldAnnotation(optional = true)
+  public transient DefaultOutputPort<Object> err = new DefaultOutputPort<Object>();
+
+  @InputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultInputPort<Object> in = new DefaultInputPort<Object>()
+  {
+    public void setup(PortContext context)
+    {
+      clazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+
+    @Override
+    public void process(Object inputTuple)
+    {
+      OUTPUT tuple = convert(inputTuple);
+      if (tuple == null && err.isConnected()) {
+        err.emit(inputTuple);
+        return;
+      }
+      if (out.isConnected()) {
+        out.emit(tuple);
+      }
+    }
+  };
+
+  /**
+   * Get the class that needs to be formatted
+   * 
+   * @return Class<?>
+   */
+  public Class<?> getClazz()
+  {
+    return clazz;
+  }
+
+  /**
+   * Set the class of tuple that needs to be formatted
+   * 
+   * @param clazz
+   */
+  public void setClazz(Class<?> clazz)
+  {
+    this.clazz = clazz;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java b/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
new file mode 100644
index 0000000..627bf95
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.formatter;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Operator that converts POJO to JSON string <br>
+ * <b>Properties</b> <br>
+ * <b>dateFormat</b>: date format e.g dd/MM/yyyy
+ * 
+ * @displayName JsonFormatter
+ * @category Formatter
+ * @tags pojo json formatter
+ * @since 3.2.0
+ */
+@InterfaceStability.Evolving
+public class JsonFormatter extends Formatter<String>
+{
+  private transient ObjectWriter writer;
+  protected String dateFormat;
+
+  @Override
+  public void activate(Context context)
+  {
+    try {
+      ObjectMapper mapper = new ObjectMapper();
+      if (dateFormat != null) {
+        mapper.setDateFormat(new SimpleDateFormat(dateFormat));
+      }
+      writer = mapper.writerWithType(clazz);
+      mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+      mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, true);
+      mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, true);
+    } catch (Throwable e) {
+      throw new RuntimeException("Unable find provided class");
+    }
+  }
+
+  @Override
+  public void deactivate()
+  {
+
+  }
+
+  @Override
+  public String convert(Object tuple)
+  {
+    try {
+      return writer.writeValueAsString(tuple);
+    } catch (JsonGenerationException | JsonMappingException e) {
+      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+    return null;
+  }
+
+  /**
+   * Get the date format
+   * 
+   * @return Date format string
+   */
+  public String getDateFormat()
+  {
+    return dateFormat;
+  }
+
+  /**
+   * Set the date format
+   * 
+   * @param dateFormat
+   */
+  public void setDateFormat(String dateFormat)
+  {
+    this.dateFormat = dateFormat;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(JsonFormatter.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java b/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java
new file mode 100644
index 0000000..35ee7b7
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.formatter;
+
+import java.io.Writer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.XStreamException;
+import com.thoughtworks.xstream.converters.basic.DateConverter;
+import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
+import com.thoughtworks.xstream.io.xml.CompactWriter;
+import com.thoughtworks.xstream.io.xml.XppDriver;
+
+import com.datatorrent.api.Context;
+
+/**
+ * @displayName XmlParser
+ * @category Formatter
+ * @tags xml pojo formatter
+ * @since 3.2.0
+ */
+@InterfaceStability.Evolving
+public class XmlFormatter extends Formatter<String>
+{
+
+  private transient XStream xstream;
+
+  protected String alias;
+  protected String dateFormat;
+  protected boolean prettyPrint;
+
+  public XmlFormatter()
+  {
+    alias = null;
+    dateFormat = null;
+  }
+
+  @Override
+  public void activate(Context context)
+  {
+    if (prettyPrint) {
+      xstream = new XStream();
+    } else {
+      xstream = new XStream(new XppDriver()
+      {
+        @Override
+        public HierarchicalStreamWriter createWriter(Writer out)
+        {
+          return new CompactWriter(out, getNameCoder());
+        }
+      });
+    }
+    if (alias != null) {
+      try {
+        xstream.alias(alias, clazz);
+      } catch (Throwable e) {
+        throw new RuntimeException("Unable find provided class");
+      }
+    }
+    if (dateFormat != null) {
+      xstream.registerConverter(new DateConverter(dateFormat, new String[] {}));
+    }
+  }
+
+  @Override
+  public void deactivate()
+  {
+
+  }
+
+  @Override
+  public String convert(Object tuple)
+  {
+    try {
+      return xstream.toXML(tuple);
+    } catch (XStreamException e) {
+      logger.debug("Error while converting tuple {} {} ",tuple,e.getMessage());
+      return null;
+    }
+  }
+
+  /**
+   * Gets the alias This is an optional step. Without it XStream would work
+   * fine, but the XML element names would contain the fully qualified name of
+   * each class (including package) which would bulk up the XML a bit.
+   * 
+   * @return alias.
+   */
+  public String getAlias()
+  {
+    return alias;
+  }
+
+  /**
+   * Sets the alias This is an optional step. Without it XStream would work
+   * fine, but the XML element names would contain the fully qualified name of
+   * each class (including package) which would bulk up the XML a bit.
+   * 
+   * @param alias
+   *          .
+   */
+  public void setAlias(String alias)
+  {
+    this.alias = alias;
+  }
+
+  /**
+   * Gets the date format e.g dd/mm/yyyy - this will be how a date would be
+   * formatted
+   * 
+   * @return dateFormat.
+   */
+  public String getDateFormat()
+  {
+    return dateFormat;
+  }
+
+  /**
+   * Sets the date format e.g dd/mm/yyyy - this will be how a date would be
+   * formatted
+   * 
+   * @param dateFormat
+   *          .
+   */
+  public void setDateFormat(String dateFormat)
+  {
+    this.dateFormat = dateFormat;
+  }
+
+  /**
+   * Returns true if pretty print is enabled.
+   * 
+   * @return prettyPrint
+   */
+  public boolean isPrettyPrint()
+  {
+    return prettyPrint;
+  }
+
+  /**
+   * Sets pretty print option.
+   * 
+   * @param prettyPrint
+   */
+  public void setPrettyPrint(boolean prettyPrint)
+  {
+    this.prettyPrint = prettyPrint;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(XmlFormatter.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
new file mode 100644
index 0000000..3727d86
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.parser;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Operator that converts JSON string to Pojo <br>
+ * <b>Properties</b> <br>
+ * <b>dateFormat</b>: date format e.g dd/MM/yyyy
+ * 
+ * @displayName JsonParser
+ * @category Parsers
+ * @tags json pojo parser
+ * @since 3.2.0
+ */
+@InterfaceStability.Evolving
+public class JsonParser extends Parser<String>
+{
+
+  private transient ObjectReader reader;
+  protected String dateFormat;
+
+  @Override
+  public void activate(Context context)
+  {
+    try {
+      ObjectMapper mapper = new ObjectMapper();
+      mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+      if (dateFormat != null) {
+        mapper.setDateFormat(new SimpleDateFormat(dateFormat));
+      }
+      reader = mapper.reader(clazz);
+    } catch (Throwable e) {
+      throw new RuntimeException("Unable find provided class");
+    }
+  }
+
+  @Override
+  public void deactivate()
+  {
+  }
+
+  @Override
+  public Object convert(String tuple)
+  {
+    try {
+      if (!StringUtils.isEmpty(tuple)) {
+        return reader.readValue(tuple);
+      }
+    } catch (JsonProcessingException e) {
+      logger.debug("Error while converting tuple {} {}", tuple, e.getMessage());
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+    return null;
+  }
+
+  /**
+   * Get the date format
+   * 
+   * @return Date format string
+   */
+  public String getDateFormat()
+  {
+    return dateFormat;
+  }
+
+  /**
+   * Set the date format
+   * 
+   * @param dateFormat
+   */
+  public void setDateFormat(String dateFormat)
+  {
+    this.dateFormat = dateFormat;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(JsonParser.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/main/java/com/datatorrent/lib/parser/Parser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/Parser.java b/library/src/main/java/com/datatorrent/lib/parser/Parser.java
new file mode 100644
index 0000000..c9455e2
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/parser/Parser.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.parser;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.converter.Converter;
+
+/**
+ * Abstract class that implements Converter interface. This is a schema enabled
+ * Parser <br>
+ * Sub classes need to implement the convert method <br>
+ * <br>
+ * <b>Port Interface</b><br>
+ * <b>in</b>: expects &lt;INPUT&gt;<br>
+ * <b>out</b>: emits &lt;Object&gt; this is a schema enabled port<br>
+ * <b>err</b>: emits &lt;INPUT&gt; error port that emits input tuple that could
+ * not be converted<br>
+ * <br>
+ * 
+ * @displayName Parser
+ * @tags parser converter
+ * @param <INPUT>
+ * @since 3.2.0
+ */
+@InterfaceStability.Evolving
+public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>,
+    ActivationListener<Context>
+{
+  protected transient Class<?> clazz;
+
+  @OutputPortFieldAnnotation(schemaRequired = true)
+  public transient DefaultOutputPort<Object> out = new DefaultOutputPort<Object>()
+  {
+    public void setup(PortContext context)
+    {
+      clazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+    }
+  };
+
+  @OutputPortFieldAnnotation(optional = true)
+  public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
+
+  public transient DefaultInputPort<INPUT> in = new DefaultInputPort<INPUT>()
+  {
+    @Override
+    public void process(INPUT inputTuple)
+    {
+      Object tuple = convert(inputTuple);
+      if (tuple == null && err.isConnected()) {
+        err.emit(inputTuple);
+        return;
+      }
+      if (out.isConnected()) {
+        out.emit(tuple);
+      }
+    }
+  };
+
+  /**
+   * Get the class that needs to be formatted
+   * 
+   * @return Class<?>
+   */
+  public Class<?> getClazz()
+  {
+    return clazz;
+  }
+
+  /**
+   * Set the class of tuple that needs to be formatted
+   * 
+   * @param clazz
+   */
+  public void setClazz(Class<?> clazz)
+  {
+    this.clazz = clazz;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
new file mode 100644
index 0000000..888837d
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.parser;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.XStreamException;
+import com.thoughtworks.xstream.converters.basic.DateConverter;
+
+import com.datatorrent.api.Context;
+
+/**
+ * Operator that converts XML string to Pojo <br>
+ * <b>Properties</b> <br>
+ * <b>alias</b>:This maps to the root element of the XML string. If not
+ * specified, parser would expect the root element to be fully qualified name of
+ * the Pojo Class. <br>
+ * <b>dateFormats</b>: Comma separated string of date formats e.g
+ * dd/mm/yyyy,dd-mmm-yyyy where first one would be considered default
+ * 
+ * @displayName XmlParser
+ * @category Parsers
+ * @tags xml pojo parser
+ * @since 3.2.0
+ */
+@InterfaceStability.Evolving
+public class XmlParser extends Parser<String>
+{
+
+  private transient XStream xstream;
+  protected String alias;
+  protected String dateFormats;
+
+  public XmlParser()
+  {
+    alias = null;
+    dateFormats = null;
+  }
+
+  @Override
+  public void activate(Context context)
+  {
+    xstream = new XStream();
+    if (alias != null) {
+      try {
+        xstream.alias(alias, clazz);
+      } catch (Throwable e) {
+        throw new RuntimeException("Unable find provided class");
+      }
+    }
+    if (dateFormats != null) {
+      String[] dateFormat = dateFormats.split(",");
+      xstream.registerConverter(new DateConverter(dateFormat[0], dateFormat));
+    }
+  }
+
+  @Override
+  public void deactivate()
+  {
+
+  }
+
+  @Override
+  public Object convert(String tuple)
+  {
+    try {
+      return xstream.fromXML(tuple);
+    } catch (XStreamException e) {
+      logger.debug("Error while converting tuple {} {}", tuple,e.getMessage());
+      return null;
+    }
+  }
+
+  /**
+   * Gets the alias
+   * 
+   * @return alias.
+   */
+  public String getAlias()
+  {
+    return alias;
+  }
+
+  /**
+   * Sets the alias This maps to the root element of the XML string. If not
+   * specified, parser would expect the root element to be fully qualified name
+   * of the Pojo Class.
+   * 
+   * @param alias
+   *          .
+   */
+  public void setAlias(String alias)
+  {
+    this.alias = alias;
+  }
+
+  /**
+   * Gets the comma separated string of date formats e.g dd/mm/yyyy,dd-mmm-yyyy
+   * where first one would be considered default
+   * 
+   * @return dateFormats.
+   */
+  public String getDateFormats()
+  {
+    return dateFormats;
+  }
+
+  /**
+   * Sets the comma separated string of date formats e.g dd/mm/yyyy,dd-mmm-yyyy
+   * where first one would be considered default
+   * 
+   * @param dateFormats
+   *          .
+   */
+  public void setDateFormats(String dateFormats)
+  {
+    this.dateFormats = dateFormats;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(XmlParser.class);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
new file mode 100644
index 0000000..bde544e
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.formatter;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.lib.util.TestUtils.TestInfo;
+import com.google.common.collect.Lists;
+
+public class JsonFormatterTest
+{
+  JsonFormatter operator;
+  CollectorTestSink<Object> validDataSink;
+  CollectorTestSink<String> invalidDataSink;
+
+  final ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+
+  public JsonFormatterTest()
+  {
+    // So that the output is cleaner.
+    System.setErr(new PrintStream(myOut));
+  }
+
+  @Rule
+  public TestInfo testMeta = new FSTestWatcher()
+  {
+    private void deleteDirectory()
+    {
+      try {
+        FileUtils.deleteDirectory(new File(getDir()));
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    @Override
+    protected void starting(Description descriptor)
+    {
+      super.starting(descriptor);
+      deleteDirectory();
+
+      operator = new JsonFormatter();
+
+      validDataSink = new CollectorTestSink<Object>();
+      invalidDataSink = new CollectorTestSink<String>();
+      TestUtils.setSink(operator.out, validDataSink);
+      TestUtils.setSink(operator.err, invalidDataSink);
+      operator.setup(null);
+      operator.activate(null);
+
+      operator.beginWindow(0);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      operator.endWindow();
+      operator.teardown();
+
+      deleteDirectory();
+      super.finished(description);
+    }
+  };
+
+  @Test
+  public void testJSONToPOJO()
+  {
+    Test1Pojo pojo = new Test1Pojo();
+    pojo.a = 123;
+    pojo.b = 234876274;
+    pojo.c = "HowAreYou?";
+    pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ");
+
+    operator.in.put(pojo);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":null}";
+    Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
+  }
+
+  @Test
+  public void testJSONToPOJODate()
+  {
+    Test1Pojo pojo = new Test1Pojo();
+    pojo.a = 123;
+    pojo.b = 234876274;
+    pojo.c = "HowAreYou?";
+    pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ");
+    pojo.date = new DateTime().withYear(2015).withMonthOfYear(9).withDayOfMonth(15).toDate();
+    operator.setDateFormat("dd-MM-yyyy");
+    operator.setup(null);
+    operator.activate(null);
+    operator.in.put(pojo);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}";
+    Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
+  }
+
+  @Test
+  public void testJSONToPOJONullFields()
+  {
+    Test1Pojo pojo = new Test1Pojo();
+    pojo.a = 123;
+    pojo.b = 234876274;
+    pojo.c = "HowAreYou?";
+    pojo.d = null;
+
+    operator.in.put(pojo);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":null,\"date\":null}";
+    Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
+  }
+
+  @Test
+  public void testJSONToPOJOEmptyPOJO()
+  {
+    Test1Pojo pojo = new Test1Pojo();
+    operator.in.put(pojo);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    String expectedJSONString = "{\"a\":0,\"b\":0,\"c\":null,\"d\":null,\"date\":null}";
+    System.out.println(validDataSink.collectedTuples.get(0));
+    Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
+  }
+
+  @Test
+  public void testJSONToPOJONullPOJO()
+  {
+    operator.in.put(null);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    String expectedJSONString = "null";
+    Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
+  }
+
+  @Test
+  public void testJSONToPOJONoFieldPOJO()
+  {
+    operator.endWindow();
+    operator.teardown();
+    operator.setClazz(Test2Pojo.class);
+    operator.setup(null);
+    operator.beginWindow(1);
+
+    Test2Pojo o = new Test2Pojo();
+    operator.in.put(o);
+    Assert.assertEquals(0, validDataSink.collectedTuples.size());
+    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+    Assert.assertEquals(o, invalidDataSink.collectedTuples.get(0));
+  }
+
+  public static class Test1Pojo
+  {
+    public int a;
+    public long b;
+    public String c;
+    public List<String> d;
+    public Date date;
+
+    @Override
+    public String toString()
+    {
+      return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]";
+    }
+  }
+
+  public static class Test2Pojo
+  {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
new file mode 100644
index 0000000..237ae5a
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.formatter;
+
+import java.util.Date;
+
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class XmlFormatterTest
+{
+
+  XmlFormatter operator;
+  CollectorTestSink<Object> validDataSink;
+  CollectorTestSink<String> invalidDataSink;
+
+  @Rule
+  public Watcher watcher = new Watcher();
+
+  public class Watcher extends TestWatcher
+  {
+
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      operator = new XmlFormatter();
+      operator.setClazz(EmployeeBean.class);
+      operator.setDateFormat("yyyy-MM-dd");
+      validDataSink = new CollectorTestSink<Object>();
+      invalidDataSink = new CollectorTestSink<String>();
+      TestUtils.setSink(operator.out, validDataSink);
+      TestUtils.setSink(operator.err, invalidDataSink);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      super.finished(description);
+      operator.teardown();
+    }
+
+  }
+
+  @Test
+  public void testPojoToXmlWithoutAlias()
+  {
+    EmployeeBean e = new EmployeeBean();
+    e.setName("john");
+    e.setEid(1);
+    e.setDept("cs");
+    e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
+
+    operator.setup(null);
+    operator.activate(null);
+    operator.in.process(e);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    String expected = "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" + "<name>john</name>"
+        + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>"
+        + "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>";
+    Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
+  }
+
+  @Test
+  public void testXmlToPojoWithAlias()
+  {
+    EmployeeBean e = new EmployeeBean();
+    e.setName("john");
+    e.setEid(1);
+    e.setDept("cs");
+    e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
+
+    operator.setAlias("EmployeeBean");
+    operator.setup(null);
+    operator.activate(null);
+    operator.in.process(e);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    String expected = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>"
+        + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>";
+    Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
+  }
+
+  @Test
+  public void testXmlToPojoWithPrettyPrint()
+  {
+    EmployeeBean e = new EmployeeBean();
+    e.setName("john");
+    e.setEid(1);
+    e.setDept("cs");
+    e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
+
+    operator.setAlias("EmployeeBean");
+    operator.setPrettyPrint(true);
+    operator.setup(null);
+    operator.activate(null);
+    operator.in.process(e);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    String expected = "<EmployeeBean>\n" + "  <name>john</name>\n" + "  <dept>cs</dept>\n" + "  <eid>1</eid>\n"
+        + "  <dateOfJoining>2015-01-01</dateOfJoining>\n" + "</EmployeeBean>";
+    Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
+  }
+
+  @Test
+  public void testPojoToXmlWithoutAliasHeirarchical()
+  {
+    EmployeeBean e = new EmployeeBean();
+    e.setName("john");
+    e.setEid(1);
+    e.setDept("cs");
+    e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
+    Address address = new Address();
+    address.setCity("new york");
+    address.setCountry("US");
+    e.setAddress(address);
+
+    operator.setup(null);
+    operator.activate(null);
+    operator.in.process(e);
+    System.out.println(validDataSink.collectedTuples.get(0));
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    String expected = "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" + "<name>john</name>"
+        + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>"
+        + "<city>new york</city>" + "<country>US</country>" + "</address>"
+        + "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>";
+    Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
+  }
+
+  public static class EmployeeBean
+  {
+
+    private String name;
+    private String dept;
+    private int eid;
+    private Date dateOfJoining;
+    private Address address;
+
+    public String getName()
+    {
+      return name;
+    }
+
+    public void setName(String name)
+    {
+      this.name = name;
+    }
+
+    public String getDept()
+    {
+      return dept;
+    }
+
+    public void setDept(String dept)
+    {
+      this.dept = dept;
+    }
+
+    public int getEid()
+    {
+      return eid;
+    }
+
+    public void setEid(int eid)
+    {
+      this.eid = eid;
+    }
+
+    public Date getDateOfJoining()
+    {
+      return dateOfJoining;
+    }
+
+    public void setDateOfJoining(Date dateOfJoining)
+    {
+      this.dateOfJoining = dateOfJoining;
+    }
+
+    public Address getAddress()
+    {
+      return address;
+    }
+
+    public void setAddress(Address address)
+    {
+      this.address = address;
+    }
+  }
+
+  public static class Address
+  {
+
+    private String city;
+    private String country;
+
+    public String getCity()
+    {
+      return city;
+    }
+
+    public void setCity(String city)
+    {
+      this.city = city;
+    }
+
+    public String getCountry()
+    {
+      return country;
+    }
+
+    public void setCountry(String country)
+    {
+      this.country = country;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/23dabc41/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java b/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java
new file mode 100644
index 0000000..d091267
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.parser;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.lib.util.TestUtils.TestInfo;
+
+public class JsonParserTest
+{
+  JsonParser operator;
+  CollectorTestSink<Object> validDataSink;
+  CollectorTestSink<String> invalidDataSink;
+
+  final ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+
+  public JsonParserTest()
+  {
+    // So that the output is cleaner.
+    System.setErr(new PrintStream(myOut));
+  }
+
+  @Rule
+  public TestInfo testMeta = new FSTestWatcher()
+  {
+    private void deleteDirectory()
+    {
+      try {
+        FileUtils.deleteDirectory(new File(getDir()));
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    @Override
+    protected void starting(Description descriptor)
+    {
+
+      super.starting(descriptor);
+      deleteDirectory();
+
+      operator = new JsonParser();
+      operator.setClazz(Test1Pojo.class);
+      validDataSink = new CollectorTestSink<Object>();
+      invalidDataSink = new CollectorTestSink<String>();
+      TestUtils.setSink(operator.out, validDataSink);
+      TestUtils.setSink(operator.err, invalidDataSink);
+      operator.setup(null);
+      operator.activate(null);
+
+      operator.beginWindow(0);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      operator.endWindow();
+      operator.teardown();
+
+      deleteDirectory();
+      super.finished(description);
+    }
+  };
+
+  @Test
+  public void testJSONToPOJO()
+  {
+    String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
+    operator.in.put(tuple);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    Object obj = validDataSink.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(Test1Pojo.class, obj.getClass());
+    Test1Pojo pojo = (Test1Pojo)obj;
+    Assert.assertEquals(123, pojo.a);
+    Assert.assertEquals(234876274, pojo.b);
+    Assert.assertEquals("HowAreYou?", pojo.c);
+    Assert.assertEquals(3, pojo.d.size());
+    Assert.assertEquals("ABC", pojo.d.get(0));
+    Assert.assertEquals("PQR", pojo.d.get(1));
+    Assert.assertEquals("XYZ", pojo.d.get(2));
+  }
+
+  @Test
+  public void testJSONToPOJODate()
+  {
+    String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}";
+    operator.setDateFormat("dd-MM-yyyy");
+    operator.setup(null);
+    operator.activate(null);
+    operator.in.put(tuple);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    Object obj = validDataSink.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(Test1Pojo.class, obj.getClass());
+    Test1Pojo pojo = (Test1Pojo)obj;
+    Assert.assertEquals(123, pojo.a);
+    Assert.assertEquals(234876274, pojo.b);
+    Assert.assertEquals("HowAreYou?", pojo.c);
+    Assert.assertEquals(3, pojo.d.size());
+    Assert.assertEquals("ABC", pojo.d.get(0));
+    Assert.assertEquals("PQR", pojo.d.get(1));
+    Assert.assertEquals("XYZ", pojo.d.get(2));
+    Assert.assertEquals(2015, new DateTime(pojo.date).getYear());
+    Assert.assertEquals(9, new DateTime(pojo.date).getMonthOfYear());
+    Assert.assertEquals(15, new DateTime(pojo.date).getDayOfMonth());
+  }
+
+  @Test
+  public void testJSONToPOJOInvalidData()
+  {
+    String tuple = "{\"a\":123\"b\":234876274,\"c\":\"HowAreYou?\"}";
+    operator.in.put(tuple);
+    Assert.assertEquals(0, validDataSink.collectedTuples.size());
+    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
+  }
+
+  @Test
+  public void testJSONToPOJOUnknownFields()
+  {
+    String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"asd\":433.6}";
+    operator.in.put(tuple);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    Object obj = validDataSink.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(Test1Pojo.class, obj.getClass());
+    Test1Pojo pojo = (Test1Pojo)obj;
+    Assert.assertEquals(123, pojo.a);
+    Assert.assertEquals(234876274, pojo.b);
+    Assert.assertEquals("HowAreYou?", pojo.c);
+    Assert.assertEquals(null, pojo.d);
+  }
+
+  @Test
+  public void testJSONToPOJOMismatchingFields()
+  {
+    String tuple = "{\"a\":123,\"c\":234876274,\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
+    operator.in.put(tuple);
+    Assert.assertEquals(0, validDataSink.collectedTuples.size());
+    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
+  }
+
+  @Test
+  public void testJSONToPOJOEmptyString()
+  {
+    String tuple = "";
+    operator.in.put(tuple);
+    Assert.assertEquals(0, validDataSink.collectedTuples.size());
+    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
+  }
+
+  @Test
+  public void testJSONToPOJOEmptyJSON()
+  {
+    String tuple = "{}";
+    operator.in.put(tuple);
+    Assert.assertEquals(1, validDataSink.collectedTuples.size());
+    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+    Object obj = validDataSink.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(Test1Pojo.class, obj.getClass());
+    Test1Pojo pojo = (Test1Pojo)obj;
+    Assert.assertEquals(0, pojo.a);
+    Assert.assertEquals(0, pojo.b);
+    Assert.assertEquals(null, pojo.c);
+    Assert.assertEquals(null, pojo.d);
+  }
+
+  @Test
+  public void testJSONToPOJOArrayInJson()
+  {
+    String tuple = "{\"a\":123,\"c\":[234,65,23],\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
+    operator.in.put(tuple);
+    Assert.assertEquals(0, validDataSink.collectedTuples.size());
+    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
+  }
+
+  public static class Test1Pojo
+  {
+    public int a;
+    public long b;
+    public String c;
+    public List<String> d;
+    public Date date;
+
+    @Override
+    public String toString()
+    {
+      return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]";
+    }
+  }
+}