You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/02/16 07:49:48 UTC
incubator-apex-malhar git commit: APEXMALHAR-1962 json parser
enhancements
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 cec33da88 -> 0a3e00ce0
APEXMALHAR-1962 json parser enhancements
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/0a3e00ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/0a3e00ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/0a3e00ce
Branch: refs/heads/devel-3
Commit: 0a3e00ce0b804ddcb9a2f7bb45dc8698dfd05d84
Parents: cec33da
Author: shubham <sh...@github.com>
Authored: Mon Jan 4 12:43:01 2016 +0530
Committer: shubham <sh...@github.com>
Committed: Tue Feb 16 10:53:38 2016 +0530
----------------------------------------------------------------------
contrib/pom.xml | 6 +
.../datatorrent/contrib/parser/JsonParser.java | 247 +++++++++++
.../parser/JsonParserApplicationTest.java | 93 ++++
.../contrib/parser/JsonParserTest.java | 443 +++++++++++++++++++
.../src/test/resources/json-parser-schema.json | 51 +++
.../com/datatorrent/lib/parser/JsonParser.java | 110 -----
.../datatorrent/lib/parser/JsonParserTest.java | 228 ----------
7 files changed, 840 insertions(+), 338 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 6145fac..b994928 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -622,6 +622,12 @@
<artifactId>gemfire-core</artifactId>
<version>1.0.0-incubating.M1</version>
<optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>com.github.fge</groupId>
+ <artifactId>json-schema-validator</artifactId>
+ <version>2.0.1</version>
+ <optional>true</optional>
</dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java
new file mode 100644
index 0000000..b6c3c4d
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.fge.jsonschema.exceptions.ProcessingException;
+import com.github.fge.jsonschema.main.JsonSchema;
+import com.github.fge.jsonschema.main.JsonSchemaFactory;
+import com.github.fge.jsonschema.report.ProcessingMessage;
+import com.github.fge.jsonschema.report.ProcessingReport;
+import com.github.fge.jsonschema.util.JsonLoader;
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Operator that parses a json string tuple against a specified json schema and
+ * emits JSONObject on one port, POJO on other port and tuples that could not be
+ * parsed on error port.<br>
+ * Schema is specified in a json format as per http://json-schema.org/ <br>
+ * Example for the schema can be seen here http://json-schema.org/example1.html <br>
+ * User can choose to skip validations by not specifying the schema at all. <br>
+ * <br>
+ * <b>Properties</b><br>
+ * <b>jsonSchema</b>:schema as a string<br>
+ * <b>clazz</b>:Pojo class <br>
+ * <b>Ports</b> <br>
+ * <b>in</b>:input tuple as a String. Each tuple represents a json string<br>
+ * <b>parsedOutput</b>:tuples that are validated against the schema are emitted
+ * as JSONObject on this port<br>
+ * <b>out</b>:tuples that are validated against the schema are emitted as pojo
+ * on this port<br>
+ * <b>err</b>:tuples that do not confine to schema are emitted on this port as
+ * KeyValPair<String,String><br>
+ * Key being the tuple and Val being the reason.
+ *
+ *
+ * @displayName JsonParser
+ * @category Parsers
+ * @tags json pojo parser
+ * @since 3.2.0
+ */
+@InterfaceStability.Evolving
+public class JsonParser extends Parser<byte[], KeyValPair<String, String>>
+{
+
+ /**
+ * Contents of the schema.Schema is specified as per http://json-schema.org/
+ */
+ private String jsonSchema;
+ private transient JsonSchema schema;
+ private transient ObjectMapper objMapper;
+ /**
+ * output port to emit validate records as JSONObject
+ */
+ public transient DefaultOutputPort<JSONObject> parsedOutput = new DefaultOutputPort<JSONObject>();
+ /**
+ * metric to keep count of number of tuples emitted on {@link #parsedOutput}
+ * port
+ */
+ @AutoMetric
+ long parsedOutputCount;
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ parsedOutputCount = 0;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ try {
+ if (jsonSchema != null) {
+ JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
+ JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
+ schema = factory.getJsonSchema(schemaNode);
+ }
+ objMapper = new ObjectMapper();
+ objMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+ } catch (ProcessingException | IOException e) {
+ DTThrowable.wrapIfChecked(e);
+ }
+ }
+
+ @Override
+ public void processTuple(byte[] tuple)
+ {
+ if (tuple == null) {
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<String, String>(null, "null tuple"));
+ }
+ errorTupleCount++;
+ return;
+ }
+ String incomingString = new String(tuple);
+ try {
+ if (schema != null) {
+ ProcessingReport report = null;
+ JsonNode data = JsonLoader.fromString(incomingString);
+ report = schema.validate(data);
+ if (report != null && !report.isSuccess()) {
+ Iterator<ProcessingMessage> iter = report.iterator();
+ StringBuilder s = new StringBuilder();
+ while (iter.hasNext()) {
+ ProcessingMessage pm = iter.next();
+ s.append(pm.asJson().get("instance").findValue("pointer")).append(":").append(pm.asJson().get("message"))
+ .append(",");
+ }
+ s.setLength(s.length() - 1);
+ errorTupleCount++;
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<String, String>(incomingString, s.toString()));
+ }
+ return;
+ }
+ }
+ if (parsedOutput.isConnected()) {
+ parsedOutput.emit(new JSONObject(incomingString));
+ parsedOutputCount++;
+ }
+ if (out.isConnected()) {
+ out.emit(objMapper.readValue(tuple, clazz));
+ emittedObjectCount++;
+ }
+ } catch (JSONException | ProcessingException | IOException e) {
+ errorTupleCount++;
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
+ }
+ logger.error("Failed to parse json tuple {}, Exception = {} ", tuple, e);
+ }
+ }
+
+ @Override
+ public Object convert(byte[] tuple)
+ {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public KeyValPair<String, String> processErrorTuple(byte[] input)
+ {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ /**
+ * Get jsonSchema contents as a string to be used during validation
+ *
+ * @return jsonSchema
+ */
+ public String getJsonSchema()
+ {
+ return jsonSchema;
+ }
+
+ /**
+ * Sets jsonSchema to be used during validation
+ *
+ * @param jsonSchema
+ * schema as a string
+ */
+ public void setJsonSchema(String jsonSchema)
+ {
+ this.jsonSchema = jsonSchema;
+ }
+
+ /**
+ * Get errorTupleCount
+ *
+ * @return errorTupleCount
+ */
+ @VisibleForTesting
+ public long getErrorTupleCount()
+ {
+ return errorTupleCount;
+ }
+
+ /**
+ * Get emittedObjectCount
+ *
+ * @return emittedObjectCount
+ */
+ @VisibleForTesting
+ public long getEmittedObjectCount()
+ {
+ return emittedObjectCount;
+ }
+
+ /**
+ * Get incomingTuplesCount
+ *
+ * @return incomingTuplesCount
+ */
+ @VisibleForTesting
+ public long getIncomingTuplesCount()
+ {
+ return incomingTuplesCount;
+ }
+
+ /**
+ * Set schema.
+ *
+ * @param schema
+ */
+ @VisibleForTesting
+ public void setSchema(JsonSchema schema)
+ {
+ this.schema = schema;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(JsonParser.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserApplicationTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserApplicationTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserApplicationTest.java
new file mode 100644
index 0000000..d1b1efa
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserApplicationTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.IOException;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.contrib.parser.JsonParserTest.Product;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+public class JsonParserApplicationTest
+{
+
+ @Test
+ public void testApplication() throws IOException, Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(new JsonParserTest(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(10000);// runs for 10 seconds and quits
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+ public static class JsonParserTest implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ JsonDataEmitterOperator input = dag.addOperator("data", new JsonDataEmitterOperator());
+ JsonParser parser = dag.addOperator("jsonparser", new JsonParser());
+ parser.setClazz(Product.class);
+ dag.getMeta(parser).getMeta(parser.out).getAttributes().put(Context.PortContext.TUPLE_CLASS, Product.class);
+ parser.setJsonSchema(SchemaUtils.jarResourceFileToString("json-parser-schema.json"));
+ ConsoleOutputOperator jsonObjectOp = dag.addOperator("jsonObjectOp", new ConsoleOutputOperator());
+ ConsoleOutputOperator pojoOp = dag.addOperator("pojoOp", new ConsoleOutputOperator());
+ jsonObjectOp.setDebug(true);
+ dag.addStream("input", input.output, parser.in);
+ dag.addStream("output", parser.parsedOutput, jsonObjectOp.input);
+ dag.addStream("pojo", parser.out, pojoOp.input);
+ }
+ }
+
+ public static class JsonDataEmitterOperator extends BaseOperator implements InputOperator
+ {
+ public static String jsonSample = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+ + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+ + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+ + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+
+ public final transient DefaultOutputPort<byte[]> output = new DefaultOutputPort<byte[]>();
+
+ @Override
+ public void emitTuples()
+ {
+ output.emit(jsonSample.getBytes());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserTest.java
new file mode 100644
index 0000000..f492597
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/parser/JsonParserTest.java
@@ -0,0 +1,443 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class JsonParserTest
+{
+ private static final String filename = "json-parser-schema.json";
+ CollectorTestSink<Object> error = new CollectorTestSink<Object>();
+ CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>();
+ CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
+ JsonParser parser = new JsonParser();
+
+ @Rule
+ public Watcher watcher = new Watcher();
+
+ public class Watcher extends TestWatcher
+ {
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ parser.err.setSink(error);
+ parser.parsedOutput.setSink(objectPort);
+ parser.out.setSink(pojoPort);
+ parser.setClazz(Product.class);
+ parser.setJsonSchema(SchemaUtils.jarResourceFileToString(filename));
+ parser.setup(null);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ super.finished(description);
+ error.clear();
+ objectPort.clear();
+ pojoPort.clear();
+ parser.teardown();
+ }
+ }
+
+ @Test
+ public void testValidInput() throws JSONException
+ {
+ String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+ + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+ + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+ + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+ parser.beginWindow(0);
+ parser.in.process(tuple.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, objectPort.collectedTuples.size());
+ Assert.assertEquals(1, pojoPort.collectedTuples.size());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ Object obj = pojoPort.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(Product.class, obj.getClass());
+ Product pojo = (Product)obj;
+ JSONObject jsonObject = (JSONObject)objectPort.collectedTuples.get(0);
+ Assert.assertEquals(2, jsonObject.getInt("id"));
+ Assert.assertEquals(1, jsonObject.getInt("price"));
+ Assert.assertEquals("An ice sculpture", jsonObject.get("name"));
+ Assert.assertEquals(7.0, jsonObject.getJSONObject("dimensions").getDouble("length"), 0);
+ Assert.assertEquals(2, pojo.getId());
+ Assert.assertEquals(1, pojo.getPrice());
+ Assert.assertEquals("An ice sculpture", pojo.getName());
+ Assert.assertEquals(7.0, (double)pojo.getDimensions().get("length"), 0);
+ }
+
+ @Test
+ public void testEmptyInput() throws JSONException
+ {
+ parser.setSchema(null);
+ String tuple = "";
+ parser.beginWindow(0);
+ parser.in.process(tuple.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void testValidInputWithoutSchema() throws JSONException
+ {
+ parser.setSchema(null);
+ String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+ + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+ + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+ + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+ parser.beginWindow(0);
+ parser.in.process(tuple.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, objectPort.collectedTuples.size());
+ Assert.assertEquals(1, pojoPort.collectedTuples.size());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ Object obj = pojoPort.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(Product.class, obj.getClass());
+ Product pojo = (Product)obj;
+ JSONObject jsonObject = (JSONObject)objectPort.collectedTuples.get(0);
+ Assert.assertEquals(2, jsonObject.getInt("id"));
+ Assert.assertEquals(1, jsonObject.getInt("price"));
+ Assert.assertEquals("An ice sculpture", jsonObject.get("name"));
+ Assert.assertEquals(7.0, jsonObject.getJSONObject("dimensions").getDouble("length"), 0);
+ Assert.assertEquals(2, pojo.getId());
+ Assert.assertEquals(1, pojo.getPrice());
+ Assert.assertEquals("An ice sculpture", pojo.getName());
+ Assert.assertEquals(7.0, (double)pojo.getDimensions().get("length"), 0);
+ }
+
+ @Test
+ public void testUnknowFieldsInData() throws JSONException
+ {
+ parser.setSchema(null);
+ String tuple = "{" + "\"id\": 2," + "\"id2\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+ + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+ + "\"height\": 9.5" + "}" + "}";
+ parser.beginWindow(0);
+ parser.in.process(tuple.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, objectPort.collectedTuples.size());
+ Assert.assertEquals(1, pojoPort.collectedTuples.size());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ Object obj = pojoPort.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(Product.class, obj.getClass());
+ Product pojo = (Product)obj;
+ JSONObject jsonObject = (JSONObject)objectPort.collectedTuples.get(0);
+ Assert.assertEquals(2, jsonObject.getInt("id"));
+ Assert.assertEquals(1, jsonObject.getInt("price"));
+ Assert.assertEquals("An ice sculpture", jsonObject.get("name"));
+ Assert.assertEquals(7.0, jsonObject.getJSONObject("dimensions").getDouble("length"), 0);
+ Assert.assertEquals(2, pojo.getId());
+ Assert.assertEquals(1, pojo.getPrice());
+ Assert.assertEquals("An ice sculpture", pojo.getName());
+ Assert.assertEquals(7.0, (double)pojo.getDimensions().get("length"), 0);
+ Assert.assertNull(pojo.getWarehouseLocation());
+ }
+
+ @Test
+ public void testInvalidPrice() throws JSONException
+ {
+ String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": -1,"
+ + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+ + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+ + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+ parser.beginWindow(0);
+ parser.in.process(tuple.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> errorKeyValPair = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertEquals(tuple, errorKeyValPair.getKey());
+ Assert.assertEquals("\"/price\":\"number is lower than the required minimum\"", errorKeyValPair.getValue());
+ }
+
+ @Test
+ public void testMultipleViolations() throws JSONException
+ {
+ String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": -1,"
+ + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"width\" : 8.0," + "\"height\": 9.5" + "},"
+ + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4" + "},"
+ + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+ parser.beginWindow(0);
+ parser.in.process(tuple.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> errorKeyValPair = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertEquals(tuple, errorKeyValPair.getKey());
+ Assert.assertEquals(
+ "\"/dimensions\":\"missing required property(ies)\",\"/price\":\"number is lower than the required minimum\"",
+ errorKeyValPair.getValue());
+ }
+
+ @Test
+ public void testJsonSyntaxError() throws JSONException
+ {
+ String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": -1,"
+ + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"width\" : 8.0," + "\"height\": 9.5" + "}"
+ + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4" + "},"
+ + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+ parser.beginWindow(0);
+ parser.in.process(tuple.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> errorKeyValPair = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertEquals(tuple, errorKeyValPair.getKey());
+ }
+
+ @Test
+ public void testValidInputPojoPortNotConnected() throws JSONException
+ {
+ parser.out.setSink(null);
+ String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+ + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+ + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+ + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+ parser.beginWindow(0);
+ parser.in.process(tuple.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ JSONObject jsonObject = (JSONObject)objectPort.collectedTuples.get(0);
+ Assert.assertEquals(2, jsonObject.getInt("id"));
+ Assert.assertEquals(1, jsonObject.getInt("price"));
+ Assert.assertEquals("An ice sculpture", jsonObject.get("name"));
+ Assert.assertEquals(7.0, jsonObject.getJSONObject("dimensions").getDouble("length"), 0);
+ }
+
+ @Test
+ public void testValidInputParsedOutputPortNotConnected() throws JSONException
+ {
+ parser.parsedOutput.setSink(null);
+ String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+ + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+ + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+ + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+ parser.beginWindow(0);
+ parser.in.process(tuple.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(1, pojoPort.collectedTuples.size());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ Object obj = pojoPort.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(Product.class, obj.getClass());
+ Product pojo = (Product)obj;
+ Assert.assertEquals(2, pojo.getId());
+ Assert.assertEquals(1, pojo.getPrice());
+ Assert.assertEquals("An ice sculpture", pojo.getName());
+ Assert.assertEquals(7.0, (double)pojo.getDimensions().get("length"), 0);
+ }
+
+ @Test
+ public void testParserValidInputMetricVerification()
+ {
+ parser.beginWindow(0);
+ Assert.assertEquals(0, parser.parsedOutputCount);
+ Assert.assertEquals(0, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+ + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+ + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+ + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+ parser.in.process(tuple.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, parser.parsedOutputCount);
+ Assert.assertEquals(1, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(1, parser.getEmittedObjectCount());
+ }
+
+ @Test
+ public void testParserInvalidInputMetricVerification()
+ {
+ parser.beginWindow(0);
+ Assert.assertEquals(0, parser.parsedOutputCount);
+ Assert.assertEquals(0, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ String tuple = "{" + "\"id\": 2" + "}";
+ parser.in.process(tuple.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, parser.parsedOutputCount);
+ Assert.assertEquals(1, parser.getIncomingTuplesCount());
+ Assert.assertEquals(1, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ }
+
+ @Test
+ public void testParserMetricResetVerification()
+ {
+ Assert.assertEquals(0, parser.parsedOutputCount);
+ Assert.assertEquals(0, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ String tuple = "{" + "\"id\": 2," + "\"name\": \"An ice sculpture\"," + "\"price\": 1,"
+ + "\"tags\": [\"cold\", \"ice\"]," + "\"dimensions\": {" + "\"length\": 7.0," + "\"width\" : 8.0,"
+ + "\"height\": 9.5" + "}," + "\"warehouseLocation\": {" + "\"latitude\": -78.75," + "\"longitude\": 20.4"
+ + "}," + "\"dateOfManufacture\": \"2013/09/29\"," + "\"dateOfExpiry\": \"2013\"" + "}";
+ parser.beginWindow(0);
+ parser.in.process(tuple.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, parser.parsedOutputCount);
+ Assert.assertEquals(1, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(1, parser.getEmittedObjectCount());
+ parser.beginWindow(1);
+ Assert.assertEquals(0, parser.parsedOutputCount);
+ Assert.assertEquals(0, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ parser.in.process(tuple.getBytes());
+ Assert.assertEquals(1, parser.parsedOutputCount);
+ Assert.assertEquals(1, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(1, parser.getEmittedObjectCount());
+ parser.endWindow();
+ }
+
+ public static class Product
+ {
+ public int id;
+ public int price;
+ public String name;
+ public List<String> tags;
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy/MM/dd")
+ public Date dateOfManufacture;
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy")
+ public Date dateOfExpiry;
+ public Map<String, Object> dimensions;
+ public Map<String, Object> warehouseLocation;
+
+ public int getId()
+ {
+ return id;
+ }
+
+ public void setId(int id)
+ {
+ this.id = id;
+ }
+
+ public int getPrice()
+ {
+ return price;
+ }
+
+ public void setPrice(int price)
+ {
+ this.price = price;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public List<String> getTags()
+ {
+ return tags;
+ }
+
+ public void setTags(List<String> tags)
+ {
+ this.tags = tags;
+ }
+
+ public Date getDateOfManufacture()
+ {
+ return dateOfManufacture;
+ }
+
+ public void setDateOfManufacture(Date dateOfManufacture)
+ {
+ this.dateOfManufacture = dateOfManufacture;
+ }
+
+ public Map<String, Object> getDimensions()
+ {
+ return dimensions;
+ }
+
+ public void setDimensions(Map<String, Object> dimensions)
+ {
+ this.dimensions = dimensions;
+ }
+
+ public Map<String, Object> getWarehouseLocation()
+ {
+ return warehouseLocation;
+ }
+
+ public void setWarehouseLocation(Map<String, Object> warehouseLocation)
+ {
+ this.warehouseLocation = warehouseLocation;
+ }
+
+ public Date getDateOfExpiry()
+ {
+ return dateOfExpiry;
+ }
+
+ public void setDateOfExpiry(Date dateOfExpiry)
+ {
+ this.dateOfExpiry = dateOfExpiry;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Product [id=" + id + ", price=" + price + ", name=" + name + ", tags=" + tags + ", dateOfManufacture="
+ + dateOfManufacture + ", dateOfExpiry=" + dateOfExpiry + ", dimensions=" + dimensions
+ + ", warehouseLocation=" + warehouseLocation + "]";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/contrib/src/test/resources/json-parser-schema.json
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/json-parser-schema.json b/contrib/src/test/resources/json-parser-schema.json
new file mode 100644
index 0000000..a7a2e6c
--- /dev/null
+++ b/contrib/src/test/resources/json-parser-schema.json
@@ -0,0 +1,51 @@
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "title": "Product set",
+ "type": "object",
+ "properties": {
+ "id": {
+ "description": "The unique identifier for a product",
+ "type": "number"
+ },
+ "name": {
+ "type": "string"
+ },
+ "price": {
+ "type": "number",
+ "minimum": 0,
+ "exclusiveMinimum": true
+ },
+ "tags": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ },
+ "minItems": 1,
+ "uniqueItems": true
+ },
+ "dimensions": {
+ "type": "object",
+ "properties": {
+ "length": {"type": "number"},
+ "width": {"type": "number"},
+ "height": {"type": "number"}
+ },
+ "required": ["length", "width", "height"]
+ },
+ "warehouseLocation": {
+ "description": "Coordinates of the warehouse with the product",
+ "$ref": "http://json-schema.org/geo"
+ },
+ "dateOfManufacture": {
+ "description": "manufacturing date",
+ "type": "string"
+
+ },
+ "dateOfExpiry": {
+ "description": "expiry date",
+ "type": "string"
+
+ }
+ },
+ "required": ["id", "name", "price"]
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
deleted file mode 100644
index 4e9800a..0000000
--- a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.parser;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-
-import org.codehaus.jackson.JsonProcessingException;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.netlet.util.DTThrowable;
-
-/**
- * Operator that converts JSON string to Pojo <br>
- * <b>Properties</b> <br>
- * <b>dateFormat</b>: date format e.g dd/MM/yyyy
- *
- * @displayName JsonParser
- * @category Parsers
- * @tags json pojo parser
- * @since 3.2.0
- */
-@InterfaceStability.Evolving
-public class JsonParser extends Parser<String, String>
-{
-
- private transient ObjectReader reader;
- protected String dateFormat;
-
- @Override
- public void setup(OperatorContext context)
- {
- try {
- ObjectMapper mapper = new ObjectMapper();
- mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- if (dateFormat != null) {
- mapper.setDateFormat(new SimpleDateFormat(dateFormat));
- }
- reader = mapper.reader(clazz);
- } catch (Throwable e) {
- throw new RuntimeException("Unable find provided class");
- }
- }
-
- @Override
- public Object convert(String tuple)
- {
- try {
- if (!StringUtils.isEmpty(tuple)) {
- return reader.readValue(tuple);
- }
- } catch (JsonProcessingException e) {
- logger.debug("Error while converting tuple {} {}", tuple, e.getMessage());
- } catch (IOException e) {
- DTThrowable.rethrow(e);
- }
- return null;
- }
-
- @Override
- public String processErrorTuple(String input)
- {
- return input;
- }
-
- /**
- * Get the date format
- *
- * @return Date format string
- */
- public String getDateFormat()
- {
- return dateFormat;
- }
-
- /**
- * Set the date format
- *
- * @param dateFormat
- */
- public void setDateFormat(String dateFormat)
- {
- this.dateFormat = dateFormat;
- }
-
- private static final Logger logger = LoggerFactory.getLogger(JsonParser.class);
-}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/0a3e00ce/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java b/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java
deleted file mode 100644
index 0400d23..0000000
--- a/library/src/test/java/com/datatorrent/lib/parser/JsonParserTest.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.parser;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.io.FileUtils;
-import org.joda.time.DateTime;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.Description;
-
-import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
-import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
-import com.datatorrent.lib.util.TestUtils.TestInfo;
-
-public class JsonParserTest
-{
- JsonParser operator;
- CollectorTestSink<Object> validDataSink;
- CollectorTestSink<String> invalidDataSink;
-
- final ByteArrayOutputStream myOut = new ByteArrayOutputStream();
-
- public JsonParserTest()
- {
- // So that the output is cleaner.
- System.setErr(new PrintStream(myOut));
- }
-
- @Rule
- public TestInfo testMeta = new FSTestWatcher()
- {
- private void deleteDirectory()
- {
- try {
- FileUtils.deleteDirectory(new File(getDir()));
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- @Override
- protected void starting(Description descriptor)
- {
-
- super.starting(descriptor);
- deleteDirectory();
-
- operator = new JsonParser();
- operator.setClazz(Test1Pojo.class);
- validDataSink = new CollectorTestSink<Object>();
- invalidDataSink = new CollectorTestSink<String>();
- TestUtils.setSink(operator.out, validDataSink);
- TestUtils.setSink(operator.err, invalidDataSink);
- operator.setup(null);
-
- operator.beginWindow(0);
- }
-
- @Override
- protected void finished(Description description)
- {
- operator.endWindow();
- operator.teardown();
-
- deleteDirectory();
- super.finished(description);
- }
- };
-
- @Test
- public void testJSONToPOJO()
- {
- String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
- operator.in.put(tuple);
- Assert.assertEquals(1, validDataSink.collectedTuples.size());
- Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
- Object obj = validDataSink.collectedTuples.get(0);
- Assert.assertNotNull(obj);
- Assert.assertEquals(Test1Pojo.class, obj.getClass());
- Test1Pojo pojo = (Test1Pojo)obj;
- Assert.assertEquals(123, pojo.a);
- Assert.assertEquals(234876274, pojo.b);
- Assert.assertEquals("HowAreYou?", pojo.c);
- Assert.assertEquals(3, pojo.d.size());
- Assert.assertEquals("ABC", pojo.d.get(0));
- Assert.assertEquals("PQR", pojo.d.get(1));
- Assert.assertEquals("XYZ", pojo.d.get(2));
- }
-
- @Test
- public void testJSONToPOJODate()
- {
- String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}";
- operator.setDateFormat("dd-MM-yyyy");
- operator.setup(null);
- operator.in.put(tuple);
- Assert.assertEquals(1, validDataSink.collectedTuples.size());
- Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
- Object obj = validDataSink.collectedTuples.get(0);
- Assert.assertNotNull(obj);
- Assert.assertEquals(Test1Pojo.class, obj.getClass());
- Test1Pojo pojo = (Test1Pojo)obj;
- Assert.assertEquals(123, pojo.a);
- Assert.assertEquals(234876274, pojo.b);
- Assert.assertEquals("HowAreYou?", pojo.c);
- Assert.assertEquals(3, pojo.d.size());
- Assert.assertEquals("ABC", pojo.d.get(0));
- Assert.assertEquals("PQR", pojo.d.get(1));
- Assert.assertEquals("XYZ", pojo.d.get(2));
- Assert.assertEquals(2015, new DateTime(pojo.date).getYear());
- Assert.assertEquals(9, new DateTime(pojo.date).getMonthOfYear());
- Assert.assertEquals(15, new DateTime(pojo.date).getDayOfMonth());
- }
-
- @Test
- public void testJSONToPOJOInvalidData()
- {
- String tuple = "{\"a\":123\"b\":234876274,\"c\":\"HowAreYou?\"}";
- operator.in.put(tuple);
- Assert.assertEquals(0, validDataSink.collectedTuples.size());
- Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
- Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
- }
-
- @Test
- public void testJSONToPOJOUnknownFields()
- {
- String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"asd\":433.6}";
- operator.in.put(tuple);
- Assert.assertEquals(1, validDataSink.collectedTuples.size());
- Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
- Object obj = validDataSink.collectedTuples.get(0);
- Assert.assertNotNull(obj);
- Assert.assertEquals(Test1Pojo.class, obj.getClass());
- Test1Pojo pojo = (Test1Pojo)obj;
- Assert.assertEquals(123, pojo.a);
- Assert.assertEquals(234876274, pojo.b);
- Assert.assertEquals("HowAreYou?", pojo.c);
- Assert.assertEquals(null, pojo.d);
- }
-
- @Test
- public void testJSONToPOJOMismatchingFields()
- {
- String tuple = "{\"a\":123,\"c\":234876274,\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
- operator.in.put(tuple);
- Assert.assertEquals(0, validDataSink.collectedTuples.size());
- Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
- Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
- }
-
- @Test
- public void testJSONToPOJOEmptyString()
- {
- String tuple = "";
- operator.in.put(tuple);
- Assert.assertEquals(0, validDataSink.collectedTuples.size());
- Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
- Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
- }
-
- @Test
- public void testJSONToPOJOEmptyJSON()
- {
- String tuple = "{}";
- operator.in.put(tuple);
- Assert.assertEquals(1, validDataSink.collectedTuples.size());
- Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
- Object obj = validDataSink.collectedTuples.get(0);
- Assert.assertNotNull(obj);
- Assert.assertEquals(Test1Pojo.class, obj.getClass());
- Test1Pojo pojo = (Test1Pojo)obj;
- Assert.assertEquals(0, pojo.a);
- Assert.assertEquals(0, pojo.b);
- Assert.assertEquals(null, pojo.c);
- Assert.assertEquals(null, pojo.d);
- }
-
- @Test
- public void testJSONToPOJOArrayInJson()
- {
- String tuple = "{\"a\":123,\"c\":[234,65,23],\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
- operator.in.put(tuple);
- Assert.assertEquals(0, validDataSink.collectedTuples.size());
- Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
- Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
- }
-
- public static class Test1Pojo
- {
- public int a;
- public long b;
- public String c;
- public List<String> d;
- public Date date;
-
- @Override
- public String toString()
- {
- return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]";
- }
- }
-}