You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/02/16 08:13:06 UTC

[1/2] incubator-apex-malhar git commit: APEXMALHAR-1962 json parser enhancements

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 1eb554f03 -> 1afb4a26f


APEXMALHAR-1962 json parser enhancements


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0a3e00ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0a3e00ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0a3e00ce

Branch: refs/heads/master
Commit: 0a3e00ce0b804ddcb9a2f7bb45dc8698dfd05d84
Parents: cec33da
Author: shubham <sh...@github.com>
Authored: Mon Jan 4 12:43:01 2016 +0530
Committer: shubham <sh...@github.com>
Committed: Tue Feb 16 10:53:38 2016 +0530

----------------------------------------------------------------------
 contrib/pom.xml                                 |   6 +
 .../datatorrent/contrib/parser/JsonParser.java  | 247 +++++++++++
 .../parser/JsonParserApplicationTest.java       |  93 ++++
 .../contrib/parser/JsonParserTest.java          | 443 +++++++++++++++++++
 .../src/test/resources/json-parser-schema.json  |  51 +++
 .../com/datatorrent/lib/parser/JsonParser.java  | 110 -----
 .../datatorrent/lib/parser/JsonParserTest.java  | 228 ----------
 7 files changed, 840 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 6145fac..b994928 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -622,6 +622,12 @@
       <artifactId>gemfire-core</artifactId>
       <version>1.0.0-incubating.M1</version>
       <optional>true</optional>
+    </dependency>  
+    <dependency>
+      <groupId>com.github.fge</groupId>
+      <artifactId>json-schema-validator</artifactId>
+      <version>2.0.1</version>
+      <optional>true</optional>
     </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java
new file mode 100644
index 0000000..b6c3c4d
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.fge.jsonschema.exceptions.ProcessingException;
+import com.github.fge.jsonschema.main.JsonSchema;
+import com.github.fge.jsonschema.main.JsonSchemaFactory;
+import com.github.fge.jsonschema.report.ProcessingMessage;
+import com.github.fge.jsonschema.report.ProcessingReport;
+import com.github.fge.jsonschema.util.JsonLoader;
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Operator that parses a json string tuple against a specified json schema and
+ * emits JSONObject on one port, POJO on other port and tuples that could not be
+ * parsed on error port.<br>
+ * Schema is specified in a json format as per http://json-schema.org/ <br>
+ * Example for the schema can be seen here http://json-schema.org/example1.html <br>
+ * User can choose to skip validations by not specifying the schema at all. <br>
+ * <br>
+ * <b>Properties</b><br>
+ * <b>jsonSchema</b>:schema as a string<br>
+ * <b>clazz</b>:Pojo class <br>
+ * <b>Ports</b> <br>
+ * <b>in</b>:input tuple as a String. Each tuple represents a json string<br>
+ * <b>parsedOutput</b>:tuples that are validated against the schema are emitted
+ * as JSONObject on this port<br>
+ * <b>out</b>:tuples that are validated against the schema are emitted as pojo
+ * on this port<br>
+ * <b>err</b>:tuples that do not confine to schema are emitted on this port as
+ * KeyValPair<String,String><br>
+ * Key being the tuple and Val being the reason.
+ * 
+ * 
+ * @displayName JsonParser
+ * @category Parsers
+ * @tags json pojo parser
+ * @since 3.2.0
+ */
+@InterfaceStability.Evolving
+public class JsonParser extends Parser<byte[], KeyValPair<String, String>>
+{
+
+  /**
+   * Contents of the schema.Schema is specified as per http://json-schema.org/
+   */
+  private String jsonSchema;
+  private transient JsonSchema schema;
+  private transient ObjectMapper objMapper;
+  /**
+   * output port to emit validate records as JSONObject
+   */
+  public transient DefaultOutputPort<JSONObject> parsedOutput = new DefaultOutputPort<JSONObject>();
+  /**
+   * metric to keep count of number of tuples emitted on {@link #parsedOutput}
+   * port
+   */
+  @AutoMetric
+  long parsedOutputCount;
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    parsedOutputCount = 0;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    try {
+      if (jsonSchema != null) {
+        JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
+        JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
+        schema = factory.getJsonSchema(schemaNode);
+      }
+      objMapper = new ObjectMapper();
+      objMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+    } catch (ProcessingException | IOException e) {
+      DTThrowable.wrapIfChecked(e);
+    }
+  }
+
+  @Override
+  public void processTuple(byte[] tuple)
+  {
+    if (tuple == null) {
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<String, String>(null, "null tuple"));
+      }
+      errorTupleCount++;
+      return;
+    }
+    String incomingString = new String(tuple);
+    try {
+      if (schema != null) {
+        ProcessingReport report = null;
+        JsonNode data = JsonLoader.fromString(incomingString);
+        report = schema.validate(data);
+        if (report != null && !report.isSuccess()) {
+          Iterator<ProcessingMessage> iter = report.iterator();
+          StringBuilder s = new StringBuilder();
+          while (iter.hasNext()) {
+            ProcessingMessage pm = iter.next();
+            s.append(pm.asJson().get("instance").findValue("pointer")).append(":").append(pm.asJson().get("message"))
+                .append(",");
+          }
+          s.setLength(s.length() - 1);
+          errorTupleCount++;
+          if (err.isConnected()) {
+            err.emit(new KeyValPair<String, String>(incomingString, s.toString()));
+          }
+          return;
+        }
+      }
+      if (parsedOutput.isConnected()) {
+        parsedOutput.emit(new JSONObject(incomingString));
+        parsedOutputCount++;
+      }
+      if (out.isConnected()) {
+        out.emit(objMapper.readValue(tuple, clazz));
+        emittedObjectCount++;
+      }
+    } catch (JSONException | ProcessingException | IOException e) {
+      errorTupleCount++;
+      if (err.isConnected()) {
+        err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
+      }
+      logger.error("Failed to parse json tuple {}, Exception = {} ", tuple, e);
+    }
+  }
+
+  @Override
+  public Object convert(byte[] tuple)
+  {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public KeyValPair<String, String> processErrorTuple(byte[] input)
+  {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  /**
+   * Get jsonSchema contents as a string to be used during validation
+   * 
+   * @return jsonSchema
+   */
+  public String getJsonSchema()
+  {
+    return jsonSchema;
+  }
+
+  /**
+   * Sets jsonSchema to be used during validation
+   * 
+   * @param jsonSchema
+   *          schema as a string
+   */
+  public void setJsonSchema(String jsonSchema)
+  {
+    this.jsonSchema = jsonSchema;
+  }
+
+  /**
+   * Get errorTupleCount
+   * 
+   * @return errorTupleCount
+   */
+  @VisibleForTesting
+  public long getErrorTupleCount()
+  {
+    return errorTupleCount;
+  }
+
+  /**
+   * Get emittedObjectCount
+   * 
+   * @return emittedObjectCount
+   */
+  @VisibleForTesting
+  public long getEmittedObjectCount()
+  {
+    return emittedObjectCount;
+  }
+
+  /**
+   * Get incomingTuplesCount
+   * 
+   * @return incomingTuplesCount
+   */
+  @VisibleForTesting
+  public long getIncomingTuplesCount()
+  {
+    return incomingTuplesCount;
+  }
+
+  /**
+   * Set schema.
+   * 
+   * @param schema
+   */
+  @VisibleForTesting
+  public void setSchema(JsonSchema schema)
+  {
+    this.schema = schema;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(JsonParser.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserApplicationTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserApplicationTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserApplicationTest.java
new file mode 100644
index 0000000..d1b1efa
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserApplicationTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.IOException;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.contrib.parser.JsonParserTest.Product;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+public class JsonParserApplicationTest
+{
+
+  @Test
+  public void testApplication() throws IOException, Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      lma.prepareDAG(new JsonParserTest(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.run(10000);// runs for 10 seconds and quits
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+  public static class JsonParserTest implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      JsonDataEmitterOperator input = dag.addOperator("data", new JsonDataEmitterOperator());
+      JsonParser parser = dag.addOperator("jsonparser", new JsonParser());
+      parser.setClazz(Product.class);
+      dag.getMeta(parser).getMeta(parser.out).getAttributes().put(Context.PortContext.TUPLE_CLASS, Product.class);
+      parser.setJsonSchema(SchemaUtils.jarResourceFileToString("json-parser-schema.json"));
+      ConsoleOutputOperator jsonObjectOp = dag.addOperator("jsonObjectOp", new ConsoleOutputOperator());
+      ConsoleOutputOperator pojoOp = dag.addOperator("pojoOp", new ConsoleOutputOperator());
+      jsonObjectOp.setDebug(true);
+      dag.addStream("input", input.output, parser.in);
+      dag.addStream("output", parser.parsedOutput, jsonObjectOp.input);
+      dag.addStream("pojo", parser.out, pojoOp.input);
+    }
+  }
+
+  public static class JsonDataEmitterOperator extends BaseOperator implements InputOperator
+  {
+    public static String jsonSample = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+        + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+        + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+        + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+
+    public final transient DefaultOutputPort<byte[]> output = new DefaultOutputPort<byte[]>();
+
+    @Override
+    public void emitTuples()
+    {
+      output.emit(jsonSample.getBytes());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserTest.java
new file mode 100644
index 0000000..f492597
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserTest.java
@@ -0,0 +1,443 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class JsonParserTest
+{
+  private static final String filename = "json-parser-schema.json";
+  CollectorTestSink<Object> error = new CollectorTestSink<Object>();
+  CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>();
+  CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
+  JsonParser parser = new JsonParser();
+
+  @Rule
+  public Watcher watcher = new Watcher();
+
+  public class Watcher extends TestWatcher
+  {
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      parser.err.setSink(error);
+      parser.parsedOutput.setSink(objectPort);
+      parser.out.setSink(pojoPort);
+      parser.setClazz(Product.class);
+      parser.setJsonSchema(SchemaUtils.jarResourceFileToString(filename));
+      parser.setup(null);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      super.finished(description);
+      error.clear();
+      objectPort.clear();
+      pojoPort.clear();
+      parser.teardown();
+    }
+  }
+
+  @Test
+  public void testValidInput() throws JSONException
+  {
+    String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+        + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+        + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+        + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+    parser.beginWindow(0);
+    parser.in.process(tuple.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, objectPort.collectedTuples.size());
+    Assert.assertEquals(1, pojoPort.collectedTuples.size());
+    Assert.assertEquals(0, error.collectedTuples.size());
+    Object obj = pojoPort.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(Product.class, obj.getClass());
+    Product pojo = (Product)obj;
+    JSONObject jsonObject = (JSONObject)objectPort.collectedTuples.get(0);
+    Assert.assertEquals(2, jsonObject.getInt("id"));
+    Assert.assertEquals(1, jsonObject.getInt("price"));
+    Assert.assertEquals("An ice sculpture", jsonObject.get("name"));
+    Assert.assertEquals(7.0, jsonObject.getJSONObject("dimensions").getDouble("length"), 0);
+    Assert.assertEquals(2, pojo.getId());
+    Assert.assertEquals(1, pojo.getPrice());
+    Assert.assertEquals("An ice sculpture", pojo.getName());
+    Assert.assertEquals(7.0, (double)pojo.getDimensions().get("length"), 0);
+  }
+
+  @Test
+  public void testEmptyInput() throws JSONException
+  {
+    parser.setSchema(null);
+    String tuple = "";
+    parser.beginWindow(0);
+    parser.in.process(tuple.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+  }
+
+  @Test
+  public void testValidInputWithoutSchema() throws JSONException
+  {
+    parser.setSchema(null);
+    String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+        + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+        + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+        + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+    parser.beginWindow(0);
+    parser.in.process(tuple.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, objectPort.collectedTuples.size());
+    Assert.assertEquals(1, pojoPort.collectedTuples.size());
+    Assert.assertEquals(0, error.collectedTuples.size());
+    Object obj = pojoPort.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(Product.class, obj.getClass());
+    Product pojo = (Product)obj;
+    JSONObject jsonObject = (JSONObject)objectPort.collectedTuples.get(0);
+    Assert.assertEquals(2, jsonObject.getInt("id"));
+    Assert.assertEquals(1, jsonObject.getInt("price"));
+    Assert.assertEquals("An ice sculpture", jsonObject.get("name"));
+    Assert.assertEquals(7.0, jsonObject.getJSONObject("dimensions").getDouble("length"), 0);
+    Assert.assertEquals(2, pojo.getId());
+    Assert.assertEquals(1, pojo.getPrice());
+    Assert.assertEquals("An ice sculpture", pojo.getName());
+    Assert.assertEquals(7.0, (double)pojo.getDimensions().get("length"), 0);
+  }
+
+  @Test
+  public void testUnknowFieldsInData() throws JSONException
+  {
+    parser.setSchema(null);
+    String tuple = "{" + "\"id\": 2," + "\"id2\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+        + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+        + "\"height\": 9.5" + "}" + "}";
+    parser.beginWindow(0);
+    parser.in.process(tuple.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, objectPort.collectedTuples.size());
+    Assert.assertEquals(1, pojoPort.collectedTuples.size());
+    Assert.assertEquals(0, error.collectedTuples.size());
+    Object obj = pojoPort.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(Product.class, obj.getClass());
+    Product pojo = (Product)obj;
+    JSONObject jsonObject = (JSONObject)objectPort.collectedTuples.get(0);
+    Assert.assertEquals(2, jsonObject.getInt("id"));
+    Assert.assertEquals(1, jsonObject.getInt("price"));
+    Assert.assertEquals("An ice sculpture", jsonObject.get("name"));
+    Assert.assertEquals(7.0, jsonObject.getJSONObject("dimensions").getDouble("length"), 0);
+    Assert.assertEquals(2, pojo.getId());
+    Assert.assertEquals(1, pojo.getPrice());
+    Assert.assertEquals("An ice sculpture", pojo.getName());
+    Assert.assertEquals(7.0, (double)pojo.getDimensions().get("length"), 0);
+    Assert.assertNull(pojo.getWarehouseLocation());
+  }
+
+  @Test
+  public void testInvalidPrice() throws JSONException
+  {
+    String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": -1,"
+        + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+        + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+        + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+    parser.beginWindow(0);
+    parser.in.process(tuple.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> errorKeyValPair = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertEquals(tuple, errorKeyValPair.getKey());
+    Assert.assertEquals("\"/price\":\"number is lower than the required minimum\"", errorKeyValPair.getValue());
+  }
+
+  @Test
+  public void testMultipleViolations() throws JSONException
+  {
+    String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": -1,"
+        + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"width\" : 8.0," + "\"height\": 9.5" + "},"
+        + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4" + "},"
+        + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+    parser.beginWindow(0);
+    parser.in.process(tuple.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> errorKeyValPair = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertEquals(tuple, errorKeyValPair.getKey());
+    Assert.assertEquals(
+        "\"/dimensions\":\"missing required property(ies)\",\"/price\":\"number is lower than the required minimum\"",
+        errorKeyValPair.getValue());
+  }
+
+  @Test
+  public void testJsonSyntaxError() throws JSONException
+  {
+    String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": -1,"
+        + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"width\" : 8.0," + "\"height\": 9.5" + "}"
+        + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4" + "},"
+        + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+    parser.beginWindow(0);
+    parser.in.process(tuple.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(1, error.collectedTuples.size());
+    KeyValPair<String, String> errorKeyValPair = (KeyValPair<String, String>)error.collectedTuples.get(0);
+    Assert.assertEquals(tuple, errorKeyValPair.getKey());
+  }
+
+  @Test
+  public void testValidInputPojoPortNotConnected() throws JSONException
+  {
+    parser.out.setSink(null);
+    String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+        + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+        + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+        + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+    parser.beginWindow(0);
+    parser.in.process(tuple.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, objectPort.collectedTuples.size());
+    Assert.assertEquals(0, pojoPort.collectedTuples.size());
+    Assert.assertEquals(0, error.collectedTuples.size());
+    JSONObject jsonObject = (JSONObject)objectPort.collectedTuples.get(0);
+    Assert.assertEquals(2, jsonObject.getInt("id"));
+    Assert.assertEquals(1, jsonObject.getInt("price"));
+    Assert.assertEquals("An ice sculpture", jsonObject.get("name"));
+    Assert.assertEquals(7.0, jsonObject.getJSONObject("dimensions").getDouble("length"), 0);
+  }
+
+  @Test
+  public void testValidInputParsedOutputPortNotConnected() throws JSONException
+  {
+    parser.parsedOutput.setSink(null);
+    String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+        + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+        + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+        + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+    parser.beginWindow(0);
+    parser.in.process(tuple.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, objectPort.collectedTuples.size());
+    Assert.assertEquals(1, pojoPort.collectedTuples.size());
+    Assert.assertEquals(0, error.collectedTuples.size());
+    Object obj = pojoPort.collectedTuples.get(0);
+    Assert.assertNotNull(obj);
+    Assert.assertEquals(Product.class, obj.getClass());
+    Product pojo = (Product)obj;
+    Assert.assertEquals(2, pojo.getId());
+    Assert.assertEquals(1, pojo.getPrice());
+    Assert.assertEquals("An ice sculpture", pojo.getName());
+    Assert.assertEquals(7.0, (double)pojo.getDimensions().get("length"), 0);
+  }
+
+  @Test
+  public void testParserValidInputMetricVerification()
+  {
+    parser.beginWindow(0);
+    Assert.assertEquals(0, parser.parsedOutputCount);
+    Assert.assertEquals(0, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+    String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+        + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+        + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+        + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+    parser.in.process(tuple.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, parser.parsedOutputCount);
+    Assert.assertEquals(1, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(1, parser.getEmittedObjectCount());
+  }
+
+  @Test
+  public void testParserInvalidInputMetricVerification()
+  {
+    parser.beginWindow(0);
+    Assert.assertEquals(0, parser.parsedOutputCount);
+    Assert.assertEquals(0, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+    String tuple = "{" + "\"id\": 2" + "}";
+    parser.in.process(tuple.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(0, parser.parsedOutputCount);
+    Assert.assertEquals(1, parser.getIncomingTuplesCount());
+    Assert.assertEquals(1, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+  }
+
+  @Test
+  public void testParserMetricResetVerification()
+  {
+    Assert.assertEquals(0, parser.parsedOutputCount);
+    Assert.assertEquals(0, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+    String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+        + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+        + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+        + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+    parser.beginWindow(0);
+    parser.in.process(tuple.getBytes());
+    parser.endWindow();
+    Assert.assertEquals(1, parser.parsedOutputCount);
+    Assert.assertEquals(1, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(1, parser.getEmittedObjectCount());
+    parser.beginWindow(1);
+    Assert.assertEquals(0, parser.parsedOutputCount);
+    Assert.assertEquals(0, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(0, parser.getEmittedObjectCount());
+    parser.in.process(tuple.getBytes());
+    Assert.assertEquals(1, parser.parsedOutputCount);
+    Assert.assertEquals(1, parser.getIncomingTuplesCount());
+    Assert.assertEquals(0, parser.getErrorTupleCount());
+    Assert.assertEquals(1, parser.getEmittedObjectCount());
+    parser.endWindow();
+  }
+
+  public static class Product
+  {
+    public int id;
+    public int price;
+    public String name;
+    public List<String> tags;
+    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy/MM/dd")
+    public Date dateOfManufacture;
+    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy")
+    public Date dateOfExpiry;
+    public Map<String, Object> dimensions;
+    public Map<String, Object> warehouseLocation;
+
+    public int getId()
+    {
+      return id;
+    }
+
+    public void setId(int id)
+    {
+      this.id = id;
+    }
+
+    public int getPrice()
+    {
+      return price;
+    }
+
+    public void setPrice(int price)
+    {
+      this.price = price;
+    }
+
+    public String getName()
+    {
+      return name;
+    }
+
+    public void setName(String name)
+    {
+      this.name = name;
+    }
+
+    public List<String> getTags()
+    {
+      return tags;
+    }
+
+    public void setTags(List<String> tags)
+    {
+      this.tags = tags;
+    }
+
+    public Date getDateOfManufacture()
+    {
+      return dateOfManufacture;
+    }
+
+    public void setDateOfManufacture(Date dateOfManufacture)
+    {
+      this.dateOfManufacture = dateOfManufacture;
+    }
+
+    public Map<String, Object> getDimensions()
+    {
+      return dimensions;
+    }
+
+    public void setDimensions(Map<String, Object> dimensions)
+    {
+      this.dimensions = dimensions;
+    }
+
+    public Map<String, Object> getWarehouseLocation()
+    {
+      return warehouseLocation;
+    }
+
+    public void setWarehouseLocation(Map<String, Object> warehouseLocation)
+    {
+      this.warehouseLocation = warehouseLocation;
+    }
+
+    public Date getDateOfExpiry()
+    {
+      return dateOfExpiry;
+    }
+
+    public void setDateOfExpiry(Date dateOfExpiry)
+    {
+      this.dateOfExpiry = dateOfExpiry;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Product [id=" + id + ", price=" + price + ", name=" + name + ", tags=" + tags + ", dateOfManufacture="
+          + dateOfManufacture + ", dateOfExpiry=" + dateOfExpiry + ", dimensions=" + dimensions
+          + ", warehouseLocation=" + warehouseLocation + "]";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/src/test/resources/json-parser-schema.json
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/json-parser-schema.json b/contrib/src/test/resources/json-parser-schema.json
new file mode 100644
index 0000000..a7a2e6c
--- /dev/null
+++ b/contrib/src/test/resources/json-parser-schema.json
@@ -0,0 +1,51 @@
+{
+    "$schema": "http://json-schema.org/draft-04/schema#",
+    "title": "Product set",
+    "type": "object",
+        "properties": {
+            "id": {
+                "description": "The unique identifier for a product",
+                "type": "number"
+            },
+            "name": {
+                "type": "string"
+            },
+            "price": {
+                "type": "number",
+                "minimum": 0,
+                "exclusiveMinimum": true
+            },
+            "tags": {
+                "type": "array",
+                "items": {
+                    "type": "string"
+                },
+                "minItems": 1,
+                "uniqueItems": true
+            },
+            "dimensions": {
+                "type": "object",
+                "properties": {
+                    "length": {"type": "number"},
+                    "width": {"type": "number"},
+                    "height": {"type": "number"}
+                },
+                "required": ["length", "width", "height"]
+            },
+            "warehouseLocation": {
+                "description": "Coordinates of the warehouse with the product",
+                "$ref": "http://json-schema.org/geo"
+            },
+            "dateOfManufacture": {
+                "description": "manufacturing date",
+                 "type": "string"
+                
+            }, 
+            "dateOfExpiry": {
+                "description": "expiry date",
+                 "type": "string"
+                
+            }            
+        },
+        "required": ["id", "name", "price"]
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
deleted file mode 100644
index 4e9800a..0000000
--- a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.parser;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-
-import org.codehaus.jackson.JsonProcessingException;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.netlet.util.DTThrowable;
-
-/**
- * Operator that converts JSON string to Pojo <br>
- * <b>Properties</b> <br>
- * <b>dateFormat</b>: date format e.g dd/MM/yyyy
- * 
- * @displayName JsonParser
- * @category Parsers
- * @tags json pojo parser
- * @since 3.2.0
- */
-@InterfaceStability.Evolving
-public class JsonParser extends Parser<String, String>
-{
-
-  private transient ObjectReader reader;
-  protected String dateFormat;
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    try {
-      ObjectMapper mapper = new ObjectMapper();
-      mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-      if (dateFormat != null) {
-        mapper.setDateFormat(new SimpleDateFormat(dateFormat));
-      }
-      reader = mapper.reader(clazz);
-    } catch (Throwable e) {
-      throw new RuntimeException("Unable find provided class");
-    }
-  }
-
-  @Override
-  public Object convert(String tuple)
-  {
-    try {
-      if (!StringUtils.isEmpty(tuple)) {
-        return reader.readValue(tuple);
-      }
-    } catch (JsonProcessingException e) {
-      logger.debug("Error while converting tuple {} {}", tuple, e.getMessage());
-    } catch (IOException e) {
-      DTThrowable.rethrow(e);
-    }
-    return null;
-  }
-
-  @Override
-  public String processErrorTuple(String input)
-  {
-    return input;
-  }
-
-  /**
-   * Get the date format
-   * 
-   * @return Date format string
-   */
-  public String getDateFormat()
-  {
-    return dateFormat;
-  }
-
-  /**
-   * Set the date format
-   * 
-   * @param dateFormat
-   */
-  public void setDateFormat(String dateFormat)
-  {
-    this.dateFormat = dateFormat;
-  }
-
-  private static final Logger logger = LoggerFactory.getLogger(JsonParser.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java b/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java
deleted file mode 100644
index 0400d23..0000000
--- a/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.parser;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.io.FileUtils;
-import org.joda.time.DateTime;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.Description;
-
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-import com.datatorrent.lib.util.TestUtils.TestInfo;
-
-public class JsonParserTest
-{
-  JsonParser operator;
-  CollectorTestSink<Object> validDataSink;
-  CollectorTestSink<String> invalidDataSink;
-
-  final ByteArrayOutputStream myOut = new ByteArrayOutputStream();
-
-  public JsonParserTest()
-  {
-    // So that the output is cleaner.
-    System.setErr(new PrintStream(myOut));
-  }
-
-  @Rule
-  public TestInfo testMeta = new FSTestWatcher()
-  {
-    private void deleteDirectory()
-    {
-      try {
-        FileUtils.deleteDirectory(new File(getDir()));
-      } catch (IOException ex) {
-        throw new RuntimeException(ex);
-      }
-    }
-
-    @Override
-    protected void starting(Description descriptor)
-    {
-
-      super.starting(descriptor);
-      deleteDirectory();
-
-      operator = new JsonParser();
-      operator.setClazz(Test1Pojo.class);
-      validDataSink = new CollectorTestSink<Object>();
-      invalidDataSink = new CollectorTestSink<String>();
-      TestUtils.setSink(operator.out, validDataSink);
-      TestUtils.setSink(operator.err, invalidDataSink);
-      operator.setup(null);
-
-      operator.beginWindow(0);
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      operator.endWindow();
-      operator.teardown();
-
-      deleteDirectory();
-      super.finished(description);
-    }
-  };
-
-  @Test
-  public void testJSONToPOJO()
-  {
-    String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
-    operator.in.put(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(Test1Pojo.class, obj.getClass());
-    Test1Pojo pojo = (Test1Pojo)obj;
-    Assert.assertEquals(123, pojo.a);
-    Assert.assertEquals(234876274, pojo.b);
-    Assert.assertEquals("HowAreYou?", pojo.c);
-    Assert.assertEquals(3, pojo.d.size());
-    Assert.assertEquals("ABC", pojo.d.get(0));
-    Assert.assertEquals("PQR", pojo.d.get(1));
-    Assert.assertEquals("XYZ", pojo.d.get(2));
-  }
-
-  @Test
-  public void testJSONToPOJODate()
-  {
-    String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}";
-    operator.setDateFormat("dd-MM-yyyy");
-    operator.setup(null);
-    operator.in.put(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(Test1Pojo.class, obj.getClass());
-    Test1Pojo pojo = (Test1Pojo)obj;
-    Assert.assertEquals(123, pojo.a);
-    Assert.assertEquals(234876274, pojo.b);
-    Assert.assertEquals("HowAreYou?", pojo.c);
-    Assert.assertEquals(3, pojo.d.size());
-    Assert.assertEquals("ABC", pojo.d.get(0));
-    Assert.assertEquals("PQR", pojo.d.get(1));
-    Assert.assertEquals("XYZ", pojo.d.get(2));
-    Assert.assertEquals(2015, new DateTime(pojo.date).getYear());
-    Assert.assertEquals(9, new DateTime(pojo.date).getMonthOfYear());
-    Assert.assertEquals(15, new DateTime(pojo.date).getDayOfMonth());
-  }
-
-  @Test
-  public void testJSONToPOJOInvalidData()
-  {
-    String tuple = "{\"a\":123\"b\":234876274,\"c\":\"HowAreYou?\"}";
-    operator.in.put(tuple);
-    Assert.assertEquals(0, validDataSink.collectedTuples.size());
-    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
-    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testJSONToPOJOUnknownFields()
-  {
-    String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"asd\":433.6}";
-    operator.in.put(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(Test1Pojo.class, obj.getClass());
-    Test1Pojo pojo = (Test1Pojo)obj;
-    Assert.assertEquals(123, pojo.a);
-    Assert.assertEquals(234876274, pojo.b);
-    Assert.assertEquals("HowAreYou?", pojo.c);
-    Assert.assertEquals(null, pojo.d);
-  }
-
-  @Test
-  public void testJSONToPOJOMismatchingFields()
-  {
-    String tuple = "{\"a\":123,\"c\":234876274,\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
-    operator.in.put(tuple);
-    Assert.assertEquals(0, validDataSink.collectedTuples.size());
-    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
-    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testJSONToPOJOEmptyString()
-  {
-    String tuple = "";
-    operator.in.put(tuple);
-    Assert.assertEquals(0, validDataSink.collectedTuples.size());
-    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
-    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testJSONToPOJOEmptyJSON()
-  {
-    String tuple = "{}";
-    operator.in.put(tuple);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    Object obj = validDataSink.collectedTuples.get(0);
-    Assert.assertNotNull(obj);
-    Assert.assertEquals(Test1Pojo.class, obj.getClass());
-    Test1Pojo pojo = (Test1Pojo)obj;
-    Assert.assertEquals(0, pojo.a);
-    Assert.assertEquals(0, pojo.b);
-    Assert.assertEquals(null, pojo.c);
-    Assert.assertEquals(null, pojo.d);
-  }
-
-  @Test
-  public void testJSONToPOJOArrayInJson()
-  {
-    String tuple = "{\"a\":123,\"c\":[234,65,23],\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
-    operator.in.put(tuple);
-    Assert.assertEquals(0, validDataSink.collectedTuples.size());
-    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
-    Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
-  }
-
-  public static class Test1Pojo
-  {
-    public int a;
-    public long b;
-    public String c;
-    public List<String> d;
-    public Date date;
-
-    @Override
-    public String toString()
-    {
-      return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]";
-    }
-  }
-}



[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1962' of https://github.com/shubham-pathak22/incubator-apex-malhar

Posted by ch...@apache.org.
Merge branch 'MLHR-1962' of https://github.com/shubham-pathak22/incubator-apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/1afb4a26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/1afb4a26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/1afb4a26

Branch: refs/heads/master
Commit: 1afb4a26fa0f6447d54a1f21705c3dd9a4a4f489
Parents: 1eb554f 0a3e00c
Author: chinmaykolhatkar <ch...@datatorrent.com>
Authored: Tue Feb 16 12:30:25 2016 +0530
Committer: chinmaykolhatkar <ch...@datatorrent.com>
Committed: Tue Feb 16 12:30:25 2016 +0530

----------------------------------------------------------------------
 contrib/pom.xml                                 |   6 +
 .../datatorrent/contrib/parser/JsonParser.java  | 247 +++++++++++
 .../parser/JsonParserApplicationTest.java       |  93 ++++
 .../contrib/parser/JsonParserTest.java          | 443 +++++++++++++++++++
 .../src/test/resources/json-parser-schema.json  |  51 +++
 .../com/datatorrent/lib/parser/JsonParser.java  | 110 -----
 .../datatorrent/lib/parser/JsonParserTest.java  | 228 ----------
 7 files changed, 840 insertions(+), 338 deletions(-)
----------------------------------------------------------------------