You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2015/11/30 22:06:58 UTC
[52/98] [abbrv] incubator-apex-malhar git commit: MLHR-1838 Added
pojo parsers and formatters(csv, json, xml)
MLHR-1838 Added pojo parsers and formatters(csv,json,xml)
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/3f4fe186
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/3f4fe186
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/3f4fe186
Branch: refs/heads/master
Commit: 3f4fe18665c59dadb8ad289f696df983bdc451ce
Parents: e1a4550
Author: shubham <sh...@github.com>
Authored: Fri Sep 11 16:26:03 2015 +0530
Committer: shubham <sh...@github.com>
Committed: Wed Oct 14 10:53:12 2015 +0530
----------------------------------------------------------------------
contrib/pom.xml | 12 +
.../contrib/converter/Converter.java | 43 +++
.../contrib/schema/formatter/CsvFormatter.java | 285 +++++++++++++++++
.../contrib/schema/formatter/Formatter.java | 101 ++++++
.../contrib/schema/formatter/JsonFormatter.java | 109 +++++++
.../contrib/schema/formatter/XmlFormatter.java | 172 ++++++++++
.../contrib/schema/parser/CsvParser.java | 314 +++++++++++++++++++
.../contrib/schema/parser/JsonParser.java | 106 +++++++
.../contrib/schema/parser/Parser.java | 102 ++++++
.../contrib/schema/parser/XmlParser.java | 141 +++++++++
.../schema/formatter/CsvFormatterTest.java | 147 +++++++++
.../schema/formatter/JsonFormatterTest.java | 186 +++++++++++
.../schema/formatter/XmlFormatterTest.java | 226 +++++++++++++
.../contrib/schema/parser/CsvParserTest.java | 172 ++++++++++
.../contrib/schema/parser/JsonParserTest.java | 212 +++++++++++++
.../contrib/schema/parser/XmlParserTest.java | 254 +++++++++++++++
16 files changed, 2582 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index abed040..91ef5c7 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -606,5 +606,17 @@
<version>${dt.framework.version}</version>
<type>jar</type>
</dependency>
+ <dependency>
+ <!-- required by Xml parser and formatter -->
+ <groupId>com.thoughtworks.xstream</groupId>
+ <artifactId>xstream</artifactId>
+ <version>1.4.8</version>
+ </dependency>
+ <dependency>
+ <!-- required by Csv parser and formatter -->
+ <groupId>net.sf.supercsv</groupId>
+ <artifactId>super-csv-joda</artifactId>
+ <version>2.3.1</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java b/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java
new file mode 100644
index 0000000..ebf2925
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/converter/Converter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.converter;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Operators that are converting tuples from one format to another must
+ * implement this interface. Eg. Parsers or formatters , that parse data of
+ * certain format and convert them to another format.
+ *
+ * @param <INPUT>
+ * @param <OUTPUT>
+ */
+@InterfaceStability.Evolving
+public interface Converter<INPUT, OUTPUT>
+{
+ /**
+ * Provide the implementation for converting tuples from one format to the
+ * other
+ *
+ * @param INPUT
+ * tuple of certain format
+ * @return OUTPUT tuple of converted format
+ */
+ public OUTPUT convert(INPUT tuple);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java
new file mode 100644
index 0000000..924acc6
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/CsvFormatter.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.schema.formatter;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.supercsv.cellprocessor.FmtDate;
+import org.supercsv.cellprocessor.Optional;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.exception.SuperCsvException;
+import org.supercsv.io.CsvBeanWriter;
+import org.supercsv.io.ICsvBeanWriter;
+import org.supercsv.prefs.CsvPreference;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Operator that converts POJO to CSV string <br>
+ * Assumption is that each field in the delimited data should map to a simple
+ * java type.<br>
+ * <br>
+ * <b>Properties</b> <br>
+ * <b>fieldInfo</b>:User need to specify fields and their types as a comma
+ * separated string having format <NAME>:<TYPE>|<FORMAT> in
+ * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as
+ * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+ *
+ * @displayName CsvFormatter
+ * @category Formatter
+ * @tags pojo csv formatter
+ */
+@InterfaceStability.Evolving
+public class CsvFormatter extends Formatter<String>
+{
+
+ private ArrayList<Field> fields;
+ @NotNull
+ protected String classname;
+ @NotNull
+ protected int fieldDelimiter;
+ protected String lineDelimiter;
+
+ @NotNull
+ protected String fieldInfo;
+
+ public enum FIELD_TYPE
+ {
+ BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
+ };
+
+ protected transient String[] nameMapping;
+ protected transient CellProcessor[] processors;
+ protected transient CsvPreference preference;
+
+ public CsvFormatter()
+ {
+ fields = new ArrayList<Field>();
+ fieldDelimiter = ',';
+ lineDelimiter = "\r\n";
+
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+
+ //fieldInfo information
+ fields = new ArrayList<Field>();
+ String[] fieldInfoTuple = fieldInfo.split(",");
+ for (int i = 0; i < fieldInfoTuple.length; i++) {
+ String[] fieldTuple = fieldInfoTuple[i].split(":");
+ Field field = new Field();
+ field.setName(fieldTuple[0]);
+ String[] typeFormat = fieldTuple[1].split("\\|");
+ field.setType(typeFormat[0].toUpperCase());
+ if (typeFormat.length > 1) {
+ field.setFormat(typeFormat[1]);
+ }
+ getFields().add(field);
+ }
+ preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
+ int countKeyValue = getFields().size();
+ nameMapping = new String[countKeyValue];
+ processors = new CellProcessor[countKeyValue];
+ initialise(nameMapping, processors);
+
+ }
+
+ private void initialise(String[] nameMapping, CellProcessor[] processors)
+ {
+ for (int i = 0; i < getFields().size(); i++) {
+ FIELD_TYPE type = getFields().get(i).type;
+ nameMapping[i] = getFields().get(i).name;
+ if (type == FIELD_TYPE.DATE) {
+ String dateFormat = getFields().get(i).format;
+ processors[i] = new Optional(new FmtDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
+ } else {
+ processors[i] = new Optional();
+ }
+ }
+
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+
+ }
+
+ @Override
+ public void deactivate()
+ {
+
+ }
+
+ @Override
+ public String convert(Object tuple)
+ {
+ try {
+ StringWriter stringWriter = new StringWriter();
+ ICsvBeanWriter beanWriter = new CsvBeanWriter(stringWriter, preference);
+ beanWriter.write(tuple, nameMapping, processors);
+ beanWriter.flush();
+ beanWriter.close();
+ return stringWriter.toString();
+ } catch (SuperCsvException e) {
+ logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ return null;
+ }
+
+ public static class Field
+ {
+ String name;
+ String format;
+ FIELD_TYPE type;
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public FIELD_TYPE getType()
+ {
+ return type;
+ }
+
+ public void setType(String type)
+ {
+ this.type = FIELD_TYPE.valueOf(type);
+ }
+
+ public String getFormat()
+ {
+ return format;
+ }
+
+ public void setFormat(String format)
+ {
+ this.format = format;
+ }
+ }
+
+ /**
+ * Gets the array list of the fields, a field being a POJO containing the name
+ * of the field and type of field.
+ *
+ * @return An array list of Fields.
+ */
+ public ArrayList<Field> getFields()
+ {
+ return fields;
+ }
+
+ /**
+ * Sets the array list of the fields, a field being a POJO containing the name
+ * of the field and type of field.
+ *
+ * @param fields
+ * An array list of Fields.
+ */
+ public void setFields(ArrayList<Field> fields)
+ {
+ this.fields = fields;
+ }
+
+ /**
+ * Gets the delimiter which separates fields in incoming data.
+ *
+ * @return fieldDelimiter
+ */
+ public int getFieldDelimiter()
+ {
+ return fieldDelimiter;
+ }
+
+ /**
+ * Sets the delimiter which separates fields in incoming data.
+ *
+ * @param fieldDelimiter
+ */
+ public void setFieldDelimiter(int fieldDelimiter)
+ {
+ this.fieldDelimiter = fieldDelimiter;
+ }
+
+ /**
+ * Gets the delimiter which separates lines in incoming data.
+ *
+ * @return lineDelimiter
+ */
+ public String getLineDelimiter()
+ {
+ return lineDelimiter;
+ }
+
+ /**
+ * Sets the delimiter which separates line in incoming data.
+ *
+ * @param lineDelimiter
+ */
+ public void setLineDelimiter(String lineDelimiter)
+ {
+ this.lineDelimiter = lineDelimiter;
+ }
+
+ /**
+ * Gets the name of the fields with type and format in data as comma separated
+ * string in same order as incoming data. e.g
+ * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+ *
+ * @return fieldInfo
+ */
+ public String getFieldInfo()
+ {
+ return fieldInfo;
+ }
+
+ /**
+ * Sets the name of the fields with type and format in data as comma separated
+ * string in same order as incoming data. e.g
+ * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+ *
+ * @param fieldInfo
+ */
+ public void setFieldInfo(String fieldInfo)
+ {
+ this.fieldInfo = fieldInfo;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(CsvFormatter.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java
new file mode 100644
index 0000000..19a78e0
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/Formatter.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.schema.formatter;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.contrib.converter.Converter;
+
+/**
+ * Abstract class that implements Converter interface. This is a schema enabled
+ * Formatter <br>
+ * Sub classes need to implement the convert method <br>
+ * <b>Port Interface</b><br>
+ * <b>in</b>: expects <Object> this is a schema enabled port<br>
+ * <b>out</b>: emits <OUTPUT> <br>
+ * <b>err</b>: emits <Object> error port that emits input tuple that could
+ * not be converted<br>
+ * <br>
+ *
+ * @displayName Parser
+ * @tags parser converter
+ * @param <INPUT>
+ */
+@InterfaceStability.Evolving
+public abstract class Formatter<OUTPUT> extends BaseOperator implements Converter<Object, OUTPUT>,
+ ActivationListener<Context>
+{
+ protected transient Class<?> clazz;
+
+ @OutputPortFieldAnnotation
+ public transient DefaultOutputPort<OUTPUT> out = new DefaultOutputPort<OUTPUT>();
+
+ @OutputPortFieldAnnotation(optional = true)
+ public transient DefaultOutputPort<Object> err = new DefaultOutputPort<Object>();
+
+ @InputPortFieldAnnotation(schemaRequired = true)
+ public transient DefaultInputPort<Object> in = new DefaultInputPort<Object>()
+ {
+ public void setup(PortContext context)
+ {
+ clazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+
+ @Override
+ public void process(Object inputTuple)
+ {
+ OUTPUT tuple = convert(inputTuple);
+ if (tuple == null && err.isConnected()) {
+ err.emit(inputTuple);
+ return;
+ }
+ if (out.isConnected()) {
+ out.emit(tuple);
+ }
+ }
+ };
+
+ /**
+ * Get the class that needs to be formatted
+ *
+ * @return Class<?>
+ */
+ public Class<?> getClazz()
+ {
+ return clazz;
+ }
+
+ /**
+ * Set the class of tuple that needs to be formatted
+ *
+ * @param clazz
+ */
+ public void setClazz(Class<?> clazz)
+ {
+ this.clazz = clazz;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java
new file mode 100644
index 0000000..344ac60
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/JsonFormatter.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.schema.formatter;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Operator that converts POJO to JSON string <br>
+ * <b>Properties</b> <br>
+ * <b>dateFormat</b>: date format e.g dd/MM/yyyy
+ *
+ * @displayName JsonFormatter
+ * @category Formatter
+ * @tags pojo json formatter
+ */
+@InterfaceStability.Evolving
+public class JsonFormatter extends Formatter<String>
+{
+ private transient ObjectWriter writer;
+ protected String dateFormat;
+
+ @Override
+ public void activate(Context context)
+ {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ if (dateFormat != null) {
+ mapper.setDateFormat(new SimpleDateFormat(dateFormat));
+ }
+ writer = mapper.writerWithType(clazz);
+ mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
+ mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, true);
+ mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, true);
+ } catch (Throwable e) {
+ throw new RuntimeException("Unable find provided class");
+ }
+ }
+
+ @Override
+ public void deactivate()
+ {
+
+ }
+
+ @Override
+ public String convert(Object tuple)
+ {
+ try {
+ return writer.writeValueAsString(tuple);
+ } catch (JsonGenerationException | JsonMappingException e) {
+ logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ return null;
+ }
+
+ /**
+ * Get the date format
+ *
+ * @return Date format string
+ */
+ public String getDateFormat()
+ {
+ return dateFormat;
+ }
+
+ /**
+ * Set the date format
+ *
+ * @param dateFormat
+ */
+ public void setDateFormat(String dateFormat)
+ {
+ this.dateFormat = dateFormat;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(JsonFormatter.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java
new file mode 100644
index 0000000..b387031
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/schema/formatter/XmlFormatter.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.schema.formatter;
+
+import java.io.Writer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.XStreamException;
+import com.thoughtworks.xstream.converters.basic.DateConverter;
+import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
+import com.thoughtworks.xstream.io.xml.CompactWriter;
+import com.thoughtworks.xstream.io.xml.XppDriver;
+
+/**
+ * @displayName XmlParser
+ * @category Formatter
+ * @tags xml pojo formatter
+ */
+@InterfaceStability.Evolving
+public class XmlFormatter extends Formatter<String>
+{
+
+ private transient XStream xstream;
+
+ protected String alias;
+ protected String dateFormat;
+ protected boolean prettyPrint;
+
+ public XmlFormatter()
+ {
+ alias = null;
+ dateFormat = null;
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+ if (prettyPrint) {
+ xstream = new XStream();
+ } else {
+ xstream = new XStream(new XppDriver()
+ {
+ @Override
+ public HierarchicalStreamWriter createWriter(Writer out)
+ {
+ return new CompactWriter(out, getNameCoder());
+ }
+ });
+ }
+ if (alias != null) {
+ try {
+ xstream.alias(alias, clazz);
+ } catch (Throwable e) {
+ throw new RuntimeException("Unable find provided class");
+ }
+ }
+ if (dateFormat != null) {
+ xstream.registerConverter(new DateConverter(dateFormat, new String[] {}));
+ }
+ }
+
+ @Override
+ public void deactivate()
+ {
+
+ }
+
+ @Override
+ public String convert(Object tuple)
+ {
+ try {
+ return xstream.toXML(tuple);
+ } catch (XStreamException e) {
+ logger.debug("Error while converting tuple {} {} ",tuple,e.getMessage());
+ return null;
+ }
+ }
+
+ /**
+ * Gets the alias This is an optional step. Without it XStream would work
+ * fine, but the XML element names would contain the fully qualified name of
+ * each class (including package) which would bulk up the XML a bit.
+ *
+ * @return alias.
+ */
+ public String getAlias()
+ {
+ return alias;
+ }
+
+ /**
+ * Sets the alias This is an optional step. Without it XStream would work
+ * fine, but the XML element names would contain the fully qualified name of
+ * each class (including package) which would bulk up the XML a bit.
+ *
+ * @param alias
+ * .
+ */
+ public void setAlias(String alias)
+ {
+ this.alias = alias;
+ }
+
+ /**
+ * Gets the date format e.g dd/mm/yyyy - this will be how a date would be
+ * formatted
+ *
+ * @return dateFormat.
+ */
+ public String getDateFormat()
+ {
+ return dateFormat;
+ }
+
+ /**
+ * Sets the date format e.g dd/mm/yyyy - this will be how a date would be
+ * formatted
+ *
+ * @param dateFormat
+ * .
+ */
+ public void setDateFormat(String dateFormat)
+ {
+ this.dateFormat = dateFormat;
+ }
+
+ /**
+ * Returns true if pretty print is enabled.
+ *
+ * @return prettyPrint
+ */
+ public boolean isPrettyPrint()
+ {
+ return prettyPrint;
+ }
+
+ /**
+ * Sets pretty print option.
+ *
+ * @param prettyPrint
+ */
+ public void setPrettyPrint(boolean prettyPrint)
+ {
+ this.prettyPrint = prettyPrint;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(XmlFormatter.class);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java
new file mode 100644
index 0000000..4fd39fb
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/CsvParser.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.schema.parser;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.supercsv.cellprocessor.Optional;
+import org.supercsv.cellprocessor.ParseBool;
+import org.supercsv.cellprocessor.ParseChar;
+import org.supercsv.cellprocessor.ParseDate;
+import org.supercsv.cellprocessor.ParseDouble;
+import org.supercsv.cellprocessor.ParseInt;
+import org.supercsv.cellprocessor.ParseLong;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.io.CsvBeanReader;
+import org.supercsv.prefs.CsvPreference;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.util.ReusableStringReader;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Operator that converts CSV string to Pojo <br>
+ * Assumption is that each field in the delimited data should map to a simple
+ * java type.<br>
+ * <br>
+ * <b>Properties</b> <br>
+ * <b>fieldInfo</b>:User need to specify fields and their types as a comma
+ * separated string having format <NAME>:<TYPE>|<FORMAT> in
+ * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as
+ * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy <br>
+ * <b>fieldDelimiter</b>: Default is comma <br>
+ * <b>lineDelimiter</b>: Default is '\r\n'
+ *
+ * @displayName CsvParser
+ * @category Parsers
+ * @tags csv pojo parser
+ */
+@InterfaceStability.Evolving
+public class CsvParser extends Parser<String>
+{
+
+ private ArrayList<Field> fields;
+ @NotNull
+ protected int fieldDelimiter;
+ protected String lineDelimiter;
+
+ @NotNull
+ protected String fieldInfo;
+
+ protected transient String[] nameMapping;
+ protected transient CellProcessor[] processors;
+ private transient CsvBeanReader csvReader;
+
+ public enum FIELD_TYPE
+ {
+ BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
+ };
+
+ @NotNull
+ private transient ReusableStringReader csvStringReader = new ReusableStringReader();
+
+ public CsvParser()
+ {
+ fields = new ArrayList<Field>();
+ fieldDelimiter = ',';
+ lineDelimiter = "\r\n";
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+
+ logger.info("field info {}", fieldInfo);
+ fields = new ArrayList<Field>();
+ String[] fieldInfoTuple = fieldInfo.split(",");
+ for (int i = 0; i < fieldInfoTuple.length; i++) {
+ String[] fieldTuple = fieldInfoTuple[i].split(":");
+ Field field = new Field();
+ field.setName(fieldTuple[0]);
+ String[] typeFormat = fieldTuple[1].split("\\|");
+ field.setType(typeFormat[0].toUpperCase());
+ if (typeFormat.length > 1) {
+ field.setFormat(typeFormat[1]);
+ }
+ getFields().add(field);
+ }
+
+ CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
+ csvReader = new CsvBeanReader(csvStringReader, preference);
+ int countKeyValue = getFields().size();
+ logger.info("countKeyValue {}", countKeyValue);
+ nameMapping = new String[countKeyValue];
+ processors = new CellProcessor[countKeyValue];
+ initialise(nameMapping, processors);
+ }
+
+ private void initialise(String[] nameMapping, CellProcessor[] processors)
+ {
+ for (int i = 0; i < getFields().size(); i++) {
+ FIELD_TYPE type = getFields().get(i).type;
+ nameMapping[i] = getFields().get(i).name;
+ if (type == FIELD_TYPE.DOUBLE) {
+ processors[i] = new Optional(new ParseDouble());
+ } else if (type == FIELD_TYPE.INTEGER) {
+ processors[i] = new Optional(new ParseInt());
+ } else if (type == FIELD_TYPE.FLOAT) {
+ processors[i] = new Optional(new ParseDouble());
+ } else if (type == FIELD_TYPE.LONG) {
+ processors[i] = new Optional(new ParseLong());
+ } else if (type == FIELD_TYPE.SHORT) {
+ processors[i] = new Optional(new ParseInt());
+ } else if (type == FIELD_TYPE.STRING) {
+ processors[i] = new Optional();
+ } else if (type == FIELD_TYPE.CHARACTER) {
+ processors[i] = new Optional(new ParseChar());
+ } else if (type == FIELD_TYPE.BOOLEAN) {
+ processors[i] = new Optional(new ParseBool());
+ } else if (type == FIELD_TYPE.DATE) {
+ String dateFormat = getFields().get(i).format;
+ processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
+ }
+ }
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+
+ }
+
+ @Override
+ public void deactivate()
+ {
+
+ }
+
+ @Override
+ public Object convert(String tuple)
+ {
+ try {
+ csvStringReader.open(tuple);
+ return csvReader.read(clazz, nameMapping, processors);
+ } catch (IOException e) {
+ logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
+ return null;
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ try {
+ if (csvReader != null) {
+ csvReader.close();
+ }
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ public static class Field
+ {
+ String name;
+ String format;
+ FIELD_TYPE type;
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public FIELD_TYPE getType()
+ {
+ return type;
+ }
+
+ public void setType(String type)
+ {
+ this.type = FIELD_TYPE.valueOf(type);
+ }
+
+ public String getFormat()
+ {
+ return format;
+ }
+
+ public void setFormat(String format)
+ {
+ this.format = format;
+ }
+
+ }
+
+ /**
+ * Gets the array list of the fields, a field being a POJO containing the name
+ * of the field and type of field.
+ *
+ * @return An array list of Fields.
+ */
+ public ArrayList<Field> getFields()
+ {
+ return fields;
+ }
+
+ /**
+ * Sets the array list of the fields, a field being a POJO containing the name
+ * of the field and type of field.
+ *
+ * @param fields
+ * An array list of Fields.
+ */
+ public void setFields(ArrayList<Field> fields)
+ {
+ this.fields = fields;
+ }
+
+ /**
+ * Gets the delimiter which separates fields in incoming data.
+ *
+ * @return fieldDelimiter
+ */
+ public int getFieldDelimiter()
+ {
+ return fieldDelimiter;
+ }
+
+ /**
+ * Sets the delimiter which separates fields in incoming data.
+ *
+ * @param fieldDelimiter
+ */
+ public void setFieldDelimiter(int fieldDelimiter)
+ {
+ this.fieldDelimiter = fieldDelimiter;
+ }
+
+ /**
+ * Gets the delimiter which separates lines in incoming data.
+ *
+ * @return lineDelimiter
+ */
+ public String getLineDelimiter()
+ {
+ return lineDelimiter;
+ }
+
+ /**
+ * Sets the delimiter which separates line in incoming data.
+ *
+ * @param lineDelimiter
+ */
+ public void setLineDelimiter(String lineDelimiter)
+ {
+ this.lineDelimiter = lineDelimiter;
+ }
+
+ /**
+ * Gets the name of the fields with type and format ( for date ) as comma
+ * separated string in same order as incoming data. e.g
+ * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+ *
+ * @return fieldInfo
+ */
+ public String getFieldInfo()
+ {
+ return fieldInfo;
+ }
+
+ /**
+ * Sets the name of the fields with type and format ( for date ) as comma
+ * separated string in same order as incoming data. e.g
+ * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+ *
+ * @param fieldInfo
+ */
+ public void setFieldInfo(String fieldInfo)
+ {
+ this.fieldInfo = fieldInfo;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(CsvParser.class);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java
new file mode 100644
index 0000000..db45b33
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/JsonParser.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.schema.parser;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.DTThrowable;
+
+/**
+ * Operator that converts JSON string to Pojo <br>
+ * <b>Properties</b> <br>
+ * <b>dateFormat</b>: date format e.g dd/MM/yyyy
+ *
+ * @displayName JsonParser
+ * @category Parsers
+ * @tags json pojo parser
+ */
+@InterfaceStability.Evolving
+public class JsonParser extends Parser<String>
+{
+
+ private transient ObjectReader reader;
+ protected String dateFormat;
+
+ @Override
+ public void activate(Context context)
+ {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ if (dateFormat != null) {
+ mapper.setDateFormat(new SimpleDateFormat(dateFormat));
+ }
+ reader = mapper.reader(clazz);
+ } catch (Throwable e) {
+ throw new RuntimeException("Unable find provided class");
+ }
+ }
+
+ @Override
+ public void deactivate()
+ {
+ }
+
+ @Override
+ public Object convert(String tuple)
+ {
+ try {
+ return reader.readValue(tuple);
+ } catch (JsonProcessingException e) {
+ logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ return null;
+ }
+
+ /**
+ * Get the date format
+ *
+ * @return Date format string
+ */
+ public String getDateFormat()
+ {
+ return dateFormat;
+ }
+
+ /**
+ * Set the date format
+ *
+ * @param dateFormat
+ */
+ public void setDateFormat(String dateFormat)
+ {
+ this.dateFormat = dateFormat;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(JsonParser.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java
new file mode 100644
index 0000000..e5ff7f5
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/Parser.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.schema.parser;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.contrib.converter.Converter;
+
+/**
+ * Abstract class that implements Converter interface. This is a schema enabled
+ * Parser <br>
+ * Sub classes need to implement the convert method <br>
+ * <br>
+ * <b>Port Interface</b><br>
+ * <b>in</b>: expects <INPUT><br>
+ * <b>out</b>: emits <Object> this is a schema enabled port<br>
+ * <b>err</b>: emits <INPUT> error port that emits input tuple that could
+ * not be converted<br>
+ * <br>
+ *
+ * @displayName Parser
+ * @tags parser converter
+ * @param <INPUT>
+ */
+@InterfaceStability.Evolving
+public abstract class Parser<INPUT> extends BaseOperator implements Converter<INPUT, Object>,
+ ActivationListener<Context>
+{
+ protected transient Class<?> clazz;
+
+ @OutputPortFieldAnnotation(schemaRequired = true)
+ public transient DefaultOutputPort<Object> out = new DefaultOutputPort<Object>()
+ {
+ public void setup(PortContext context)
+ {
+ clazz = context.getValue(Context.PortContext.TUPLE_CLASS);
+ }
+ };
+
+ @OutputPortFieldAnnotation(optional = true)
+ public transient DefaultOutputPort<INPUT> err = new DefaultOutputPort<INPUT>();
+
+ public transient DefaultInputPort<INPUT> in = new DefaultInputPort<INPUT>()
+ {
+ @Override
+ public void process(INPUT inputTuple)
+ {
+ Object tuple = convert(inputTuple);
+ if (tuple == null && err.isConnected()) {
+ err.emit(inputTuple);
+ return;
+ }
+ if (out.isConnected()) {
+ out.emit(tuple);
+ }
+ }
+ };
+
+ /**
+ * Get the class that needs to be formatted
+ *
+ * @return Class<?>
+ */
+ public Class<?> getClazz()
+ {
+ return clazz;
+ }
+
+ /**
+ * Set the class of tuple that needs to be formatted
+ *
+ * @param clazz
+ */
+ public void setClazz(Class<?> clazz)
+ {
+ this.clazz = clazz;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java
new file mode 100644
index 0000000..4931497
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/schema/parser/XmlParser.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.schema.parser;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.XStreamException;
+import com.thoughtworks.xstream.converters.basic.DateConverter;
+
+import com.datatorrent.api.Context;
+
+/**
+ * Operator that converts XML string to Pojo <br>
+ * <b>Properties</b> <br>
+ * <b>alias</b>:This maps to the root element of the XML string. If not
+ * specified, parser would expect the root element to be fully qualified name of
+ * the Pojo Class. <br>
+ * <b>dateFormats</b>: Comma separated string of date formats e.g
+ * dd/mm/yyyy,dd-mmm-yyyy where first one would be considered default
+ *
+ * @displayName XmlParser
+ * @category Parsers
+ * @tags xml pojo parser
+ */
+@InterfaceStability.Evolving
+public class XmlParser extends Parser<String>
+{
+
+ private transient XStream xstream;
+ protected String alias;
+ protected String dateFormats;
+
+ public XmlParser()
+ {
+ alias = null;
+ dateFormats = null;
+ }
+
+ @Override
+ public void activate(Context context)
+ {
+ xstream = new XStream();
+ if (alias != null) {
+ try {
+ xstream.alias(alias, clazz);
+ } catch (Throwable e) {
+ throw new RuntimeException("Unable find provided class");
+ }
+ }
+ if (dateFormats != null) {
+ String[] dateFormat = dateFormats.split(",");
+ xstream.registerConverter(new DateConverter(dateFormat[0], dateFormat));
+ }
+ }
+
+ @Override
+ public void deactivate()
+ {
+
+ }
+
+ @Override
+ public Object convert(String tuple)
+ {
+ try {
+ return xstream.fromXML(tuple);
+ } catch (XStreamException e) {
+ logger.debug("Error while converting tuple {} {}", tuple,e.getMessage());
+ return null;
+ }
+ }
+
+ /**
+ * Gets the alias
+ *
+ * @return alias.
+ */
+ public String getAlias()
+ {
+ return alias;
+ }
+
+ /**
+ * Sets the alias This maps to the root element of the XML string. If not
+ * specified, parser would expect the root element to be fully qualified name
+ * of the Pojo Class.
+ *
+ * @param alias
+ * .
+ */
+ public void setAlias(String alias)
+ {
+ this.alias = alias;
+ }
+
+ /**
+ * Gets the comma separated string of date formats e.g dd/mm/yyyy,dd-mmm-yyyy
+ * where first one would be considered default
+ *
+ * @return dateFormats.
+ */
+ public String getDateFormats()
+ {
+ return dateFormats;
+ }
+
+ /**
+ * Sets the comma separated string of date formats e.g dd/mm/yyyy,dd-mmm-yyyy
+ * where first one would be considered default
+ *
+ * @param dateFormats
+ * .
+ */
+ public void setDateFormats(String dateFormats)
+ {
+ this.dateFormats = dateFormats;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(XmlParser.class);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java
new file mode 100644
index 0000000..8ecc088
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/CsvFormatterTest.java
@@ -0,0 +1,147 @@
+package com.datatorrent.contrib.schema.formatter;
+
+import java.util.Date;
+
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.contrib.schema.formatter.CsvFormatter;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class CsvFormatterTest
+{
+
+ CsvFormatter operator;
+ CollectorTestSink<Object> validDataSink;
+ CollectorTestSink<String> invalidDataSink;
+
+ @Rule
+ public Watcher watcher = new Watcher();
+
+ public class Watcher extends TestWatcher
+ {
+
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ operator = new CsvFormatter();
+ operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date");
+ operator.setLineDelimiter("\r\n");
+ validDataSink = new CollectorTestSink<Object>();
+ invalidDataSink = new CollectorTestSink<String>();
+ TestUtils.setSink(operator.out, validDataSink);
+ TestUtils.setSink(operator.err, invalidDataSink);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ super.finished(description);
+ operator.teardown();
+ }
+
+ }
+
+ @Test
+ public void testPojoReaderToCsv()
+ {
+ operator.setup(null);
+ EmployeeBean emp = new EmployeeBean();
+ emp.setName("john");
+ emp.setDept("cs");
+ emp.setEid(1);
+ emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate());
+ operator.in.process(emp);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ String csvOp = (String)validDataSink.collectedTuples.get(0);
+ Assert.assertNotNull(csvOp);
+ Assert.assertEquals("john,cs,1,01/01/2015" + operator.getLineDelimiter(), csvOp);
+ }
+
+ @Test
+ public void testPojoReaderToCsvMultipleDate()
+ {
+ operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date,dateOfBirth:date|dd-MMM-yyyy");
+ operator.setup(null);
+ EmployeeBean emp = new EmployeeBean();
+ emp.setName("john");
+ emp.setDept("cs");
+ emp.setEid(1);
+ emp.setDateOfJoining(new DateTime().withDate(2015, 1, 1).toDate());
+ emp.setDateOfBirth(new DateTime().withDate(2015, 1, 1).toDate());
+ operator.in.process(emp);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ String csvOp = (String)validDataSink.collectedTuples.get(0);
+ Assert.assertNotNull(csvOp);
+ Assert.assertEquals("john,cs,1,01/01/2015,01-Jan-2015" + operator.getLineDelimiter(), csvOp);
+ }
+
+ public static class EmployeeBean
+ {
+
+ private String name;
+ private String dept;
+ private int eid;
+ private Date dateOfJoining;
+ private Date dateOfBirth;
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public String getDept()
+ {
+ return dept;
+ }
+
+ public void setDept(String dept)
+ {
+ this.dept = dept;
+ }
+
+ public int getEid()
+ {
+ return eid;
+ }
+
+ public void setEid(int eid)
+ {
+ this.eid = eid;
+ }
+
+ public Date getDateOfJoining()
+ {
+ return dateOfJoining;
+ }
+
+ public void setDateOfJoining(Date dateOfJoining)
+ {
+ this.dateOfJoining = dateOfJoining;
+ }
+
+ public Date getDateOfBirth()
+ {
+ return dateOfBirth;
+ }
+
+ public void setDateOfBirth(Date dateOfBirth)
+ {
+ this.dateOfBirth = dateOfBirth;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java
new file mode 100644
index 0000000..4040c63
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/JsonFormatterTest.java
@@ -0,0 +1,186 @@
+package com.datatorrent.contrib.schema.formatter;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.lib.util.TestUtils.TestInfo;
+import com.google.common.collect.Lists;
+
+public class JsonFormatterTest
+{
+ JsonFormatter operator;
+ CollectorTestSink<Object> validDataSink;
+ CollectorTestSink<String> invalidDataSink;
+
+ final ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+
+ public JsonFormatterTest()
+ {
+ // So that the output is cleaner.
+ System.setErr(new PrintStream(myOut));
+ }
+
+ @Rule
+ public TestInfo testMeta = new FSTestWatcher()
+ {
+ private void deleteDirectory()
+ {
+ try {
+ FileUtils.deleteDirectory(new File(getDir()));
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ protected void starting(Description descriptor)
+ {
+ super.starting(descriptor);
+ deleteDirectory();
+
+ operator = new JsonFormatter();
+
+ validDataSink = new CollectorTestSink<Object>();
+ invalidDataSink = new CollectorTestSink<String>();
+ TestUtils.setSink(operator.out, validDataSink);
+ TestUtils.setSink(operator.err, invalidDataSink);
+ operator.setup(null);
+ operator.activate(null);
+
+ operator.beginWindow(0);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ operator.endWindow();
+ operator.teardown();
+
+ deleteDirectory();
+ super.finished(description);
+ }
+ };
+
+ @Test
+ public void testJSONToPOJO()
+ {
+ Test1Pojo pojo = new Test1Pojo();
+ pojo.a = 123;
+ pojo.b = 234876274;
+ pojo.c = "HowAreYou?";
+ pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ");
+
+ operator.in.put(pojo);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":null}";
+ Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
+ }
+
+ @Test
+ public void testJSONToPOJODate()
+ {
+ Test1Pojo pojo = new Test1Pojo();
+ pojo.a = 123;
+ pojo.b = 234876274;
+ pojo.c = "HowAreYou?";
+ pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ");
+ pojo.date = new DateTime().withYear(2015).withMonthOfYear(9).withDayOfMonth(15).toDate();
+ operator.setDateFormat("dd-MM-yyyy");
+ operator.setup(null);
+ operator.activate(null);
+ operator.in.put(pojo);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}";
+ Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
+ }
+
+ @Test
+ public void testJSONToPOJONullFields()
+ {
+ Test1Pojo pojo = new Test1Pojo();
+ pojo.a = 123;
+ pojo.b = 234876274;
+ pojo.c = "HowAreYou?";
+ pojo.d = null;
+
+ operator.in.put(pojo);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":null,\"date\":null}";
+ Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
+ }
+
+ @Test
+ public void testJSONToPOJOEmptyPOJO()
+ {
+ Test1Pojo pojo = new Test1Pojo();
+ operator.in.put(pojo);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ String expectedJSONString = "{\"a\":0,\"b\":0,\"c\":null,\"d\":null,\"date\":null}";
+ System.out.println(validDataSink.collectedTuples.get(0));
+ Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
+ }
+
+ @Test
+ public void testJSONToPOJONullPOJO()
+ {
+ operator.in.put(null);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ String expectedJSONString = "null";
+ Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
+ }
+
+ @Test
+ public void testJSONToPOJONoFieldPOJO()
+ {
+ operator.endWindow();
+ operator.teardown();
+ operator.setClazz(Test2Pojo.class);
+ operator.setup(null);
+ operator.beginWindow(1);
+
+ Test2Pojo o = new Test2Pojo();
+ operator.in.put(o);
+ Assert.assertEquals(0, validDataSink.collectedTuples.size());
+ Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+ Assert.assertEquals(o, invalidDataSink.collectedTuples.get(0));
+ }
+
+ public static class Test1Pojo
+ {
+ public int a;
+ public long b;
+ public String c;
+ public List<String> d;
+ public Date date;
+
+ @Override
+ public String toString()
+ {
+ return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]";
+ }
+ }
+
+ public static class Test2Pojo
+ {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java
new file mode 100644
index 0000000..2bc1aec
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/schema/formatter/XmlFormatterTest.java
@@ -0,0 +1,226 @@
+package com.datatorrent.contrib.schema.formatter;
+
+import java.util.Date;
+
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.contrib.schema.formatter.XmlFormatter;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class XmlFormatterTest
+{
+
+ XmlFormatter operator;
+ CollectorTestSink<Object> validDataSink;
+ CollectorTestSink<String> invalidDataSink;
+
+ @Rule
+ public Watcher watcher = new Watcher();
+
+ public class Watcher extends TestWatcher
+ {
+
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ operator = new XmlFormatter();
+ operator.setClazz(EmployeeBean.class);
+ operator.setDateFormat("yyyy-MM-dd");
+ validDataSink = new CollectorTestSink<Object>();
+ invalidDataSink = new CollectorTestSink<String>();
+ TestUtils.setSink(operator.out, validDataSink);
+ TestUtils.setSink(operator.err, invalidDataSink);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ super.finished(description);
+ operator.teardown();
+ }
+
+ }
+
+ @Test
+ public void testPojoToXmlWithoutAlias()
+ {
+ EmployeeBean e = new EmployeeBean();
+ e.setName("john");
+ e.setEid(1);
+ e.setDept("cs");
+ e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
+
+ operator.setup(null);
+ operator.activate(null);
+ operator.in.process(e);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ String expected = "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" + "<name>john</name>"
+ + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>"
+ + "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>";
+ Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
+ }
+
+ @Test
+ public void testXmlToPojoWithAlias()
+ {
+ EmployeeBean e = new EmployeeBean();
+ e.setName("john");
+ e.setEid(1);
+ e.setDept("cs");
+ e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
+
+ operator.setAlias("EmployeeBean");
+ operator.setup(null);
+ operator.activate(null);
+ operator.in.process(e);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ String expected = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>"
+ + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>";
+ Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
+ }
+
+ @Test
+ public void testXmlToPojoWithPrettyPrint()
+ {
+ EmployeeBean e = new EmployeeBean();
+ e.setName("john");
+ e.setEid(1);
+ e.setDept("cs");
+ e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
+
+ operator.setAlias("EmployeeBean");
+ operator.setPrettyPrint(true);
+ operator.setup(null);
+ operator.activate(null);
+ operator.in.process(e);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ String expected = "<EmployeeBean>\n" + " <name>john</name>\n" + " <dept>cs</dept>\n" + " <eid>1</eid>\n"
+ + " <dateOfJoining>2015-01-01</dateOfJoining>\n" + "</EmployeeBean>";
+ Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
+ }
+
+ @Test
+ public void testPojoToXmlWithoutAliasHeirarchical()
+ {
+ EmployeeBean e = new EmployeeBean();
+ e.setName("john");
+ e.setEid(1);
+ e.setDept("cs");
+ e.setDateOfJoining(new DateTime().withYear(2015).withMonthOfYear(1).withDayOfYear(1).toDate());
+ Address address = new Address();
+ address.setCity("new york");
+ address.setCountry("US");
+ e.setAddress(address);
+
+ operator.setup(null);
+ operator.activate(null);
+ operator.in.process(e);
+ System.out.println(validDataSink.collectedTuples.get(0));
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ String expected = "<com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>" + "<name>john</name>"
+ + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>"
+ + "<city>new york</city>" + "<country>US</country>" + "</address>"
+ + "</com.datatorrent.contrib.schema.formatter.XmlFormatterTest_-EmployeeBean>";
+ Assert.assertEquals(expected, validDataSink.collectedTuples.get(0));
+ }
+
+ public static class EmployeeBean
+ {
+
+ private String name;
+ private String dept;
+ private int eid;
+ private Date dateOfJoining;
+ private Address address;
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public String getDept()
+ {
+ return dept;
+ }
+
+ public void setDept(String dept)
+ {
+ this.dept = dept;
+ }
+
+ public int getEid()
+ {
+ return eid;
+ }
+
+ public void setEid(int eid)
+ {
+ this.eid = eid;
+ }
+
+ public Date getDateOfJoining()
+ {
+ return dateOfJoining;
+ }
+
+ public void setDateOfJoining(Date dateOfJoining)
+ {
+ this.dateOfJoining = dateOfJoining;
+ }
+
+ public Address getAddress()
+ {
+ return address;
+ }
+
+ public void setAddress(Address address)
+ {
+ this.address = address;
+ }
+ }
+
+ public static class Address
+ {
+
+ private String city;
+ private String country;
+
+ public String getCity()
+ {
+ return city;
+ }
+
+ public void setCity(String city)
+ {
+ this.city = city;
+ }
+
+ public String getCountry()
+ {
+ return country;
+ }
+
+ public void setCountry(String country)
+ {
+ this.country = country;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java
new file mode 100644
index 0000000..3c31ad0
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/CsvParserTest.java
@@ -0,0 +1,172 @@
+package com.datatorrent.contrib.schema.parser;
+
+import java.util.Date;
+
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.contrib.schema.parser.CsvParser;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class CsvParserTest
+{
+
+ CsvParser operator;
+ CollectorTestSink<Object> validDataSink;
+ CollectorTestSink<String> invalidDataSink;
+
+ @Rule
+ public Watcher watcher = new Watcher();
+
+ public class Watcher extends TestWatcher
+ {
+
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ operator = new CsvParser();
+ operator.setClazz(EmployeeBean.class);
+ operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date");
+ validDataSink = new CollectorTestSink<Object>();
+ invalidDataSink = new CollectorTestSink<String>();
+ TestUtils.setSink(operator.out, validDataSink);
+ TestUtils.setSink(operator.err, invalidDataSink);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ super.finished(description);
+ operator.teardown();
+ }
+
+ }
+
+ @Test
+ public void testCsvToPojoWriterDefault()
+ {
+ operator.setup(null);
+ String tuple = "john,cs,1,01/01/2015";
+ operator.in.process(tuple);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ Object obj = validDataSink.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(EmployeeBean.class, obj.getClass());
+ EmployeeBean pojo = (EmployeeBean)obj;
+ Assert.assertEquals("john", pojo.getName());
+ Assert.assertEquals("cs", pojo.getDept());
+ Assert.assertEquals(1, pojo.getEid());
+ Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
+ pojo.getDateOfJoining()));
+ }
+
+ @Test
+ public void testCsvToPojoWriterDateFormat()
+ {
+ operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy");
+ operator.setup(null);
+ String tuple = "john,cs,1,01-JAN-2015";
+ operator.in.process(tuple);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ Object obj = validDataSink.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(EmployeeBean.class, obj.getClass());
+ EmployeeBean pojo = (EmployeeBean)obj;
+ Assert.assertEquals("john", pojo.getName());
+ Assert.assertEquals("cs", pojo.getDept());
+ Assert.assertEquals(1, pojo.getEid());
+ Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
+ pojo.getDateOfJoining()));
+ }
+
+ @Test
+ public void testCsvToPojoWriterDateFormatMultiple()
+ {
+ operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy,dateOfBirth:date");
+ operator.setup(null);
+ String tuple = "john,cs,1,01-JAN-2015,01/01/2015";
+ operator.in.process(tuple);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ Object obj = validDataSink.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(EmployeeBean.class, obj.getClass());
+ EmployeeBean pojo = (EmployeeBean)obj;
+ Assert.assertEquals("john", pojo.getName());
+ Assert.assertEquals("cs", pojo.getDept());
+ Assert.assertEquals(1, pojo.getEid());
+ Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
+ pojo.getDateOfJoining()));
+ Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
+ pojo.getDateOfBirth()));
+ }
+
+ public static class EmployeeBean
+ {
+
+ private String name;
+ private String dept;
+ private int eid;
+ private Date dateOfJoining;
+ private Date dateOfBirth;
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public String getDept()
+ {
+ return dept;
+ }
+
+ public void setDept(String dept)
+ {
+ this.dept = dept;
+ }
+
+ public int getEid()
+ {
+ return eid;
+ }
+
+ public void setEid(int eid)
+ {
+ this.eid = eid;
+ }
+
+ public Date getDateOfJoining()
+ {
+ return dateOfJoining;
+ }
+
+ public void setDateOfJoining(Date dateOfJoining)
+ {
+ this.dateOfJoining = dateOfJoining;
+ }
+
+ public Date getDateOfBirth()
+ {
+ return dateOfBirth;
+ }
+
+ public void setDateOfBirth(Date dateOfBirth)
+ {
+ this.dateOfBirth = dateOfBirth;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java
new file mode 100644
index 0000000..b453508
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/JsonParserTest.java
@@ -0,0 +1,212 @@
+package com.datatorrent.contrib.schema.parser;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.lib.util.TestUtils.TestInfo;
+
+public class JsonParserTest
+{
+ JsonParser operator;
+ CollectorTestSink<Object> validDataSink;
+ CollectorTestSink<String> invalidDataSink;
+
+ final ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+
+ public JsonParserTest()
+ {
+ // So that the output is cleaner.
+ System.setErr(new PrintStream(myOut));
+ }
+
+ @Rule
+ public TestInfo testMeta = new FSTestWatcher()
+ {
+ private void deleteDirectory()
+ {
+ try {
+ FileUtils.deleteDirectory(new File(getDir()));
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ protected void starting(Description descriptor)
+ {
+
+ super.starting(descriptor);
+ deleteDirectory();
+
+ operator = new JsonParser();
+ operator.setClazz(Test1Pojo.class);
+ validDataSink = new CollectorTestSink<Object>();
+ invalidDataSink = new CollectorTestSink<String>();
+ TestUtils.setSink(operator.out, validDataSink);
+ TestUtils.setSink(operator.err, invalidDataSink);
+ operator.setup(null);
+ operator.activate(null);
+
+ operator.beginWindow(0);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ operator.endWindow();
+ operator.teardown();
+
+ deleteDirectory();
+ super.finished(description);
+ }
+ };
+
+ @Test
+ public void testJSONToPOJO()
+ {
+ String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
+ operator.in.put(tuple);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ Object obj = validDataSink.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(Test1Pojo.class, obj.getClass());
+ Test1Pojo pojo = (Test1Pojo)obj;
+ Assert.assertEquals(123, pojo.a);
+ Assert.assertEquals(234876274, pojo.b);
+ Assert.assertEquals("HowAreYou?", pojo.c);
+ Assert.assertEquals(3, pojo.d.size());
+ Assert.assertEquals("ABC", pojo.d.get(0));
+ Assert.assertEquals("PQR", pojo.d.get(1));
+ Assert.assertEquals("XYZ", pojo.d.get(2));
+ }
+
+ @Test
+ public void testJSONToPOJODate()
+ {
+ String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}";
+ operator.setDateFormat("dd-MM-yyyy");
+ operator.setup(null);
+ operator.activate(null);
+ operator.in.put(tuple);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ Object obj = validDataSink.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(Test1Pojo.class, obj.getClass());
+ Test1Pojo pojo = (Test1Pojo)obj;
+ Assert.assertEquals(123, pojo.a);
+ Assert.assertEquals(234876274, pojo.b);
+ Assert.assertEquals("HowAreYou?", pojo.c);
+ Assert.assertEquals(3, pojo.d.size());
+ Assert.assertEquals("ABC", pojo.d.get(0));
+ Assert.assertEquals("PQR", pojo.d.get(1));
+ Assert.assertEquals("XYZ", pojo.d.get(2));
+ Assert.assertEquals(2015, new DateTime(pojo.date).getYear());
+ Assert.assertEquals(9, new DateTime(pojo.date).getMonthOfYear());
+ Assert.assertEquals(15, new DateTime(pojo.date).getDayOfMonth());
+ }
+
+ @Test
+ public void testJSONToPOJOInvalidData()
+ {
+ String tuple = "{\"a\":123\"b\":234876274,\"c\":\"HowAreYou?\"}";
+ operator.in.put(tuple);
+ Assert.assertEquals(0, validDataSink.collectedTuples.size());
+ Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+ Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
+ }
+
+ @Test
+ public void testJSONToPOJOUnknownFields()
+ {
+ String tuple = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"asd\":433.6}";
+ operator.in.put(tuple);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ Object obj = validDataSink.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(Test1Pojo.class, obj.getClass());
+ Test1Pojo pojo = (Test1Pojo)obj;
+ Assert.assertEquals(123, pojo.a);
+ Assert.assertEquals(234876274, pojo.b);
+ Assert.assertEquals("HowAreYou?", pojo.c);
+ Assert.assertEquals(null, pojo.d);
+ }
+
+ @Test
+ public void testJSONToPOJOMismatchingFields()
+ {
+ String tuple = "{\"a\":123,\"c\":234876274,\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
+ operator.in.put(tuple);
+ Assert.assertEquals(0, validDataSink.collectedTuples.size());
+ Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+ Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
+ }
+
+ @Test
+ public void testJSONToPOJOEmptyString()
+ {
+ String tuple = "";
+ operator.in.put(tuple);
+ Assert.assertEquals(0, validDataSink.collectedTuples.size());
+ Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+ Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
+ }
+
+ @Test
+ public void testJSONToPOJOEmptyJSON()
+ {
+ String tuple = "{}";
+ operator.in.put(tuple);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ Object obj = validDataSink.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(Test1Pojo.class, obj.getClass());
+ Test1Pojo pojo = (Test1Pojo)obj;
+ Assert.assertEquals(0, pojo.a);
+ Assert.assertEquals(0, pojo.b);
+ Assert.assertEquals(null, pojo.c);
+ Assert.assertEquals(null, pojo.d);
+ }
+
+ @Test
+ public void testJSONToPOJOArrayInJson()
+ {
+ String tuple = "{\"a\":123,\"c\":[234,65,23],\"b\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"]}";
+ operator.in.put(tuple);
+ Assert.assertEquals(0, validDataSink.collectedTuples.size());
+ Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+ Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
+ }
+
+ public static class Test1Pojo
+ {
+ public int a;
+ public long b;
+ public String c;
+ public List<String> d;
+ public Date date;
+
+ @Override
+ public String toString()
+ {
+ return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date + "]";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3f4fe186/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java
new file mode 100644
index 0000000..4298951
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/schema/parser/XmlParserTest.java
@@ -0,0 +1,254 @@
+package com.datatorrent.contrib.schema.parser;
+
+import java.util.Date;
+
+import org.joda.time.DateTime;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class XmlParserTest
+{
+ XmlParser operator;
+ CollectorTestSink<Object> validDataSink;
+ CollectorTestSink<String> invalidDataSink;
+
+ @Rule
+ public Watcher watcher = new Watcher();
+
+ public class Watcher extends TestWatcher
+ {
+
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ operator = new XmlParser();
+ operator.setClazz(EmployeeBean.class);
+ operator.setDateFormats("yyyy-MM-dd"); //setting default date pattern
+ validDataSink = new CollectorTestSink<Object>();
+ invalidDataSink = new CollectorTestSink<String>();
+ TestUtils.setSink(operator.out, validDataSink);
+ TestUtils.setSink(operator.err, invalidDataSink);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ super.finished(description);
+ operator.teardown();
+ }
+
+ }
+
+ @Test
+ public void testXmlToPojoWithoutAlias()
+ {
+ String tuple = "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + "<name>john</name>"
+ + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>"
+ + "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>";
+
+ operator.setup(null);
+ operator.activate(null);
+ operator.in.process(tuple);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ Object obj = validDataSink.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(EmployeeBean.class, obj.getClass());
+ EmployeeBean pojo = (EmployeeBean)obj;
+ Assert.assertEquals("john", pojo.getName());
+ Assert.assertEquals("cs", pojo.getDept());
+ Assert.assertEquals(1, pojo.getEid());
+ Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear());
+ Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear());
+ Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth());
+ }
+
+ @Test
+ public void testXmlToPojoWithAliasDateFormat()
+ {
+ String tuple = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>"
+ + "<dateOfJoining>2015-JAN-01</dateOfJoining>" + "</EmployeeBean>";
+
+ operator.setAlias("EmployeeBean");
+ operator.setDateFormats("yyyy-MM-dd,yyyy-MMM-dd");
+ operator.setup(null);
+ operator.activate(null);
+ operator.in.process(tuple);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ Object obj = validDataSink.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(EmployeeBean.class, obj.getClass());
+ EmployeeBean pojo = (EmployeeBean)obj;
+ Assert.assertEquals("john", pojo.getName());
+ Assert.assertEquals("cs", pojo.getDept());
+ Assert.assertEquals(1, pojo.getEid());
+ Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear());
+ Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear());
+ Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth());
+ }
+
+ @Test
+ public void testXmlToPojoWithAlias()
+ {
+ String tuple = "<EmployeeBean>" + "<name>john</name>" + "<dept>cs</dept>" + "<eid>1</eid>"
+ + "<dateOfJoining>2015-01-01</dateOfJoining>" + "</EmployeeBean>";
+
+ operator.setAlias("EmployeeBean");
+ operator.setup(null);
+ operator.activate(null);
+ operator.in.process(tuple);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ Object obj = validDataSink.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(EmployeeBean.class, obj.getClass());
+ EmployeeBean pojo = (EmployeeBean)obj;
+ Assert.assertEquals("john", pojo.getName());
+ Assert.assertEquals("cs", pojo.getDept());
+ Assert.assertEquals(1, pojo.getEid());
+ Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear());
+ Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear());
+ Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth());
+ }
+
+ @Test
+ public void testXmlToPojoIncorrectXML()
+ {
+ String tuple = "<EmployeeBean>"
+ + "<firstname>john</firstname>" //incorrect field name
+ + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01 00:00:00.00 IST</dateOfJoining>"
+ + "</EmployeeBean>";
+
+ operator.setAlias("EmployeeBean");
+ operator.setup(null);
+ operator.activate(null);
+ operator.in.process(tuple);
+ Assert.assertEquals(0, validDataSink.collectedTuples.size());
+ Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+ Assert.assertEquals(tuple, invalidDataSink.collectedTuples.get(0));
+ }
+
+ @Test
+ public void testXmlToPojoWithoutAliasHeirarchical()
+ {
+ String tuple = "<com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>" + "<name>john</name>"
+ + "<dept>cs</dept>" + "<eid>1</eid>" + "<dateOfJoining>2015-01-01</dateOfJoining>" + "<address>"
+ + "<city>new york</city>" + "<country>US</country>" + "</address>"
+ + "</com.datatorrent.contrib.schema.parser.XmlParserTest_-EmployeeBean>";
+
+ operator.setup(null);
+ operator.activate(null);
+ operator.in.process(tuple);
+ Assert.assertEquals(1, validDataSink.collectedTuples.size());
+ Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
+ Object obj = validDataSink.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(EmployeeBean.class, obj.getClass());
+ EmployeeBean pojo = (EmployeeBean)obj;
+ Assert.assertEquals("john", pojo.getName());
+ Assert.assertEquals("cs", pojo.getDept());
+ Assert.assertEquals(1, pojo.getEid());
+ Assert.assertEquals(Address.class, pojo.getAddress().getClass());
+ Assert.assertEquals("new york", pojo.getAddress().getCity());
+ Assert.assertEquals("US", pojo.getAddress().getCountry());
+ Assert.assertEquals(2015, new DateTime(pojo.getDateOfJoining()).getYear());
+ Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getMonthOfYear());
+ Assert.assertEquals(1, new DateTime(pojo.getDateOfJoining()).getDayOfMonth());
+ }
+
+ public static class EmployeeBean
+ {
+
+ private String name;
+ private String dept;
+ private int eid;
+ private Date dateOfJoining;
+ private Address address;
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ public String getDept()
+ {
+ return dept;
+ }
+
+ public void setDept(String dept)
+ {
+ this.dept = dept;
+ }
+
+ public int getEid()
+ {
+ return eid;
+ }
+
+ public void setEid(int eid)
+ {
+ this.eid = eid;
+ }
+
+ public Date getDateOfJoining()
+ {
+ return dateOfJoining;
+ }
+
+ public void setDateOfJoining(Date dateOfJoining)
+ {
+ this.dateOfJoining = dateOfJoining;
+ }
+
+ public Address getAddress()
+ {
+ return address;
+ }
+
+ public void setAddress(Address address)
+ {
+ this.address = address;
+ }
+ }
+
+ public static class Address
+ {
+
+ private String city;
+ private String country;
+
+ public String getCity()
+ {
+ return city;
+ }
+
+ public void setCity(String city)
+ {
+ this.city = city;
+ }
+
+ public String getCountry()
+ {
+ return country;
+ }
+
+ public void setCountry(String country)
+ {
+ this.country = country;
+ }
+ }
+
+}