You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:13:05 UTC
[48/50] [abbrv] incubator-apex-malhar git commit: APEXMALHAR-1961
#comment enhanced existing csv parser to take in schema for validations
APEXMALHAR-1961 #comment enhanced existing csv parser to take in schema for validations
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9e77ef7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9e77ef7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9e77ef7b
Branch: refs/heads/master
Commit: 9e77ef7bd194b7c79c72f301a850ce5cac229f2c
Parents: 49b8556
Author: shubham <sh...@github.com>
Authored: Wed Dec 30 18:02:13 2015 +0530
Committer: Gaurav Gupta <ga...@apache.org>
Committed: Tue Jan 19 09:27:30 2016 -0800
----------------------------------------------------------------------
contrib/pom.xml | 10 +-
.../contrib/parser/CellProcessorBuilder.java | 456 ++++++++++++++++
.../datatorrent/contrib/parser/CsvParser.java | 362 +++++++------
.../contrib/parser/DelimitedSchema.java | 359 +++++++++++++
.../contrib/parser/CsvPOJOParserTest.java | 519 +++++++++++++++----
contrib/src/test/resources/schema.json | 96 ++++
.../com/datatorrent/lib/parser/JsonParser.java | 2 +-
.../java/com/datatorrent/lib/parser/Parser.java | 4 +-
.../com/datatorrent/lib/parser/XmlParser.java | 2 +-
9 files changed, 1522 insertions(+), 288 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 7331940..16e28c8 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -477,7 +477,7 @@
<dependency>
<groupId>net.sf.supercsv</groupId>
<artifactId>super-csv</artifactId>
- <version>2.2.0</version>
+ <version>2.4.0</version>
<optional>true</optional>
</dependency>
<dependency>
@@ -610,12 +610,6 @@
<artifactId>apex-common</artifactId>
<version>${apex.core.version}</version>
<type>jar</type>
- </dependency>
- <dependency>
- <!-- required by Csv parser and formatter -->
- <groupId>net.sf.supercsv</groupId>
- <artifactId>super-csv-joda</artifactId>
- <version>2.3.1</version>
- </dependency>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
new file mode 100644
index 0000000..1993d94
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.util.Map;
+
+import org.supercsv.cellprocessor.Optional;
+import org.supercsv.cellprocessor.ParseBool;
+import org.supercsv.cellprocessor.ParseChar;
+import org.supercsv.cellprocessor.ParseDate;
+import org.supercsv.cellprocessor.ParseDouble;
+import org.supercsv.cellprocessor.ParseInt;
+import org.supercsv.cellprocessor.ParseLong;
+import org.supercsv.cellprocessor.constraint.DMinMax;
+import org.supercsv.cellprocessor.constraint.Equals;
+import org.supercsv.cellprocessor.constraint.LMinMax;
+import org.supercsv.cellprocessor.constraint.StrMinMax;
+import org.supercsv.cellprocessor.constraint.StrRegEx;
+import org.supercsv.cellprocessor.constraint.Strlen;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.cellprocessor.ift.DoubleCellProcessor;
+import org.supercsv.cellprocessor.ift.LongCellProcessor;
+import org.supercsv.util.CsvContext;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.datatorrent.contrib.parser.DelimitedSchema.FieldType;
+
+/**
+ * Helper class with methods to generate CellProcessor objects. Cell processors
+ * are an integral part of reading and writing with Super CSV - they automate
+ * the data type conversions, and enforce constraints. They implement the chain
+ * of responsibility design pattern - each processor has a single, well-defined
+ * purpose and can be chained together with other processors to fully automate
+ * all of the required conversions and constraint validation for a single
+ * delimited record.
+ *
+ */
+public class CellProcessorBuilder
+{
+
+ /**
+ * Method to get cell processors for given field type and constraints
+ *
+ * @param fieldType
+ * data type of the field
+ * @param constraints
+ * a map of constraints
+ * @return
+ */
+ public static CellProcessor getCellProcessor(FieldType fieldType, Map<String, Object> constraints)
+ {
+ switch (fieldType) {
+ case STRING:
+ return getStringCellProcessor(constraints);
+ case INTEGER:
+ return getIntegerCellProcessor(constraints);
+ case LONG:
+ return getLongCellProcessor(constraints);
+ case FLOAT:
+ case DOUBLE:
+ return getDoubleCellProcessor(constraints);
+ case CHARACTER:
+ return getCharCellProcessor(constraints);
+ case BOOLEAN:
+ return getBooleanCellProcessor(constraints);
+ case DATE:
+ return getDateCellProcessor(constraints);
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Method to get cellprocessor for String with constraints. These constraints
+ * are evaluated against the String field for which this cellprocessor is
+ * defined.
+ *
+ * @param constraints
+ * map of constraints applicable to String
+ * @return CellProcessor
+ */
+ private static CellProcessor getStringCellProcessor(Map<String, Object> constraints)
+ {
+ Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
+ .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
+ Integer strLen = constraints.get(DelimitedSchema.LENGTH) == null ? null : Integer.parseInt((String)constraints
+ .get(DelimitedSchema.LENGTH));
+ Integer minLength = constraints.get(DelimitedSchema.MIN_LENGTH) == null ? null : Integer
+ .parseInt((String)constraints.get(DelimitedSchema.MIN_LENGTH));
+ Integer maxLength = constraints.get(DelimitedSchema.MAX_LENGTH) == null ? null : Integer
+ .parseInt((String)constraints.get(DelimitedSchema.MAX_LENGTH));
+ String equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : (String)constraints
+ .get(DelimitedSchema.EQUALS);
+ String pattern = constraints.get(DelimitedSchema.REGEX_PATTERN) == null ? null : (String)constraints
+ .get(DelimitedSchema.REGEX_PATTERN);
+
+ CellProcessor cellProcessor = null;
+ if (StringUtils.isNotBlank(equals)) {
+ cellProcessor = new Equals(equals);
+ } else if (StringUtils.isNotBlank(pattern)) {
+ cellProcessor = new StrRegEx(pattern);
+ } else if (strLen != null) {
+ cellProcessor = new Strlen(strLen);
+ } else if (maxLength != null || minLength != null) {
+ Long min = minLength == null ? 0L : minLength;
+ Long max = maxLength == null ? LMinMax.MAX_LONG : maxLength;
+ cellProcessor = new StrMinMax(min, max);
+ }
+ if (required == null || !required) {
+ cellProcessor = addOptional(cellProcessor);
+ }
+ return cellProcessor;
+ }
+
+ /**
+ * Method to get cellprocessor for Integer with constraints. These constraints
+ * are evaluated against the Integer field for which this cellprocessor is
+ * defined.
+ *
+ * @param constraints
+ * map of constraints applicable to Integer
+ * @return CellProcessor
+ */
+ private static CellProcessor getIntegerCellProcessor(Map<String, Object> constraints)
+ {
+ Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
+ .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
+ Integer equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : Integer.parseInt((String)constraints
+ .get(DelimitedSchema.EQUALS));
+ Integer minValue = constraints.get(DelimitedSchema.MIN_VALUE) == null ? null : Integer.parseInt((String)constraints
+ .get(DelimitedSchema.MIN_VALUE));
+ Integer maxValue = constraints.get(DelimitedSchema.MAX_VALUE) == null ? null : Integer.parseInt((String)constraints
+ .get(DelimitedSchema.MAX_VALUE));
+
+ CellProcessor cellProcessor = null;
+ if (equals != null) {
+ cellProcessor = new Equals(equals);
+ cellProcessor = addParseInt(cellProcessor);
+ } else if (minValue != null || maxValue != null) {
+ cellProcessor = addIntMinMax(minValue, maxValue);
+ } else {
+ cellProcessor = addParseInt(null);
+ }
+ if (required == null || !required) {
+ cellProcessor = addOptional(cellProcessor);
+ }
+ return cellProcessor;
+ }
+
+ /**
+ * Method to get cellprocessor for Long with constraints. These constraints
+ * are evaluated against the Long field for which this cellprocessor is
+ * defined.
+ *
+ * @param constraints
+ * map of constraints applicable to Long
+ * @return CellProcessor
+ */
+ private static CellProcessor getLongCellProcessor(Map<String, Object> constraints)
+ {
+ Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
+ .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
+ Long equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : Long.parseLong((String)constraints
+ .get(DelimitedSchema.EQUALS));
+ Long minValue = constraints.get(DelimitedSchema.MIN_VALUE) == null ? null : Long.parseLong((String)constraints
+ .get(DelimitedSchema.MIN_VALUE));
+ Long maxValue = constraints.get(DelimitedSchema.MAX_VALUE) == null ? null : Long.parseLong((String)constraints
+ .get(DelimitedSchema.MAX_VALUE));
+ CellProcessor cellProcessor = null;
+ if (equals != null) {
+ cellProcessor = new Equals(equals);
+ cellProcessor = addParseLong(cellProcessor);
+ } else if (minValue != null || maxValue != null) {
+ cellProcessor = addLongMinMax(minValue, maxValue);
+ } else {
+ cellProcessor = addParseLong(null);
+ }
+ if (required == null || !required) {
+ cellProcessor = addOptional(cellProcessor);
+ }
+ return cellProcessor;
+ }
+
+ /**
+ * Method to get cellprocessor for Float/Double with constraints. These
+ * constraints are evaluated against the Float/Double field for which this
+ * cellprocessor is defined.
+ *
+ * @param constraints
+ * map of constraints applicable to Float/Double
+ * @return CellProcessor
+ */
+ private static CellProcessor getDoubleCellProcessor(Map<String, Object> constraints)
+ {
+ Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
+ .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
+ Double equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : Double.parseDouble((String)constraints
+ .get(DelimitedSchema.EQUALS));
+ Double minValue = constraints.get(DelimitedSchema.MIN_VALUE) == null ? null : Double
+ .parseDouble((String)constraints.get(DelimitedSchema.MIN_VALUE));
+ Double maxValue = constraints.get(DelimitedSchema.MAX_VALUE) == null ? null : Double
+ .parseDouble((String)constraints.get(DelimitedSchema.MAX_VALUE));
+ CellProcessor cellProcessor = null;
+ if (equals != null) {
+ cellProcessor = new Equals(equals);
+ cellProcessor = addParseDouble(cellProcessor);
+ } else if (minValue != null || maxValue != null) {
+ cellProcessor = addDoubleMinMax(minValue, maxValue);
+ } else {
+ cellProcessor = addParseDouble(null);
+ }
+ if (required == null || !required) {
+ cellProcessor = addOptional(cellProcessor);
+ }
+ return cellProcessor;
+ }
+
+ /**
+ * Method to get cellprocessor for Boolean with constraints. These constraints
+ * are evaluated against the Boolean field for which this cellprocessor is
+ * defined.
+ *
+ * @param constraints
+ * map of constraints applicable to Boolean
+ * @return CellProcessor
+ */
+ private static CellProcessor getBooleanCellProcessor(Map<String, Object> constraints)
+ {
+ Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
+ .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
+ String trueValue = constraints.get(DelimitedSchema.TRUE_VALUE) == null ? null : (String)constraints
+ .get(DelimitedSchema.TRUE_VALUE);
+ String falseValue = constraints.get(DelimitedSchema.FALSE_VALUE) == null ? null : (String)constraints
+ .get(DelimitedSchema.FALSE_VALUE);
+ CellProcessor cellProcessor = null;
+ if (StringUtils.isNotBlank(trueValue) && StringUtils.isNotBlank(falseValue)) {
+ cellProcessor = new ParseBool(trueValue, falseValue);
+ } else {
+ cellProcessor = new ParseBool();
+ }
+ if (required == null || !required) {
+ cellProcessor = addOptional(cellProcessor);
+ }
+ return cellProcessor;
+ }
+
+ /**
+ * Method to get cellprocessor for Date with constraints. These constraints
+ * are evaluated against the Date field for which this cellprocessor is
+ * defined.
+ *
+ * @param constraints
+ * map of constraints applicable to Date
+ * @return CellProcessor
+ */
+ private static CellProcessor getDateCellProcessor(Map<String, Object> constraints)
+ {
+ Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
+ .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
+ String format = constraints.get(DelimitedSchema.DATE_FORMAT) == null ? null : (String)constraints
+ .get(DelimitedSchema.DATE_FORMAT);
+ CellProcessor cellProcessor = null;
+ String fmt = StringUtils.isNotBlank(format) ? format : "dd/MM/yyyy";
+ cellProcessor = new ParseDate(fmt, false);
+ if (required == null || !required) {
+ cellProcessor = addOptional(cellProcessor);
+ }
+ return cellProcessor;
+ }
+
+ /**
+ * Method to get cellprocessor for Char with constraints. These constraints
+ * are evaluated against the Char field for which this cellprocessor is
+ * defined.
+ *
+ * @param constraints
+ * map of constraints applicable to Char
+ * @return CellProcessor
+ */
+ private static CellProcessor getCharCellProcessor(Map<String, Object> constraints)
+ {
+ Boolean required = constraints.get(DelimitedSchema.REQUIRED) == null ? null : Boolean
+ .parseBoolean((String)constraints.get(DelimitedSchema.REQUIRED));
+ Character equals = constraints.get(DelimitedSchema.EQUALS) == null ? null : ((String)constraints
+ .get(DelimitedSchema.EQUALS)).charAt(0);
+
+ CellProcessor cellProcessor = null;
+ if (equals != null) {
+ cellProcessor = new Equals(equals);
+ }
+ cellProcessor = addParseChar(cellProcessor);
+ if (required == null || !required) {
+ cellProcessor = addOptional(cellProcessor);
+ }
+ return cellProcessor;
+ }
+
+ /**
+ * Get a Double Min Max cellprocessor.
+ *
+ * @param minValue
+ * minimum value.
+ * @param maxValue
+ * maximum value.
+ * @return CellProcessor
+ */
+ private static CellProcessor addDoubleMinMax(Double minValue, Double maxValue)
+ {
+ Double min = minValue == null ? DMinMax.MIN_DOUBLE : minValue;
+ Double max = maxValue == null ? DMinMax.MAX_DOUBLE : maxValue;
+ return new DMinMax(min, max);
+ }
+
+ /**
+ * Get a Long Min Max cellprocessor.
+ *
+ * @param minValue
+ * minimum value.
+ * @param maxValue
+ * maximum value.
+ * @return CellProcessor
+ */
+ private static CellProcessor addLongMinMax(Long minValue, Long maxValue)
+ {
+ Long min = minValue == null ? LMinMax.MIN_LONG : minValue;
+ Long max = maxValue == null ? LMinMax.MAX_LONG : maxValue;
+ return new LMinMax(min, max);
+ }
+
+ /**
+ * Get a Int Min Max cellprocessor.
+ *
+ * @param minValue
+ * minimum value.
+ * @param maxValue
+ * maximum value.
+ * @return CellProcessor
+ */
+ private static CellProcessor addIntMinMax(Integer minValue, Integer maxValue)
+ {
+ Integer min = minValue == null ? Integer.MIN_VALUE : minValue;
+ Integer max = maxValue == null ? Integer.MAX_VALUE : maxValue;
+ return new IntMinMax(min, max);
+ }
+
+ /**
+ * Get Optional cellprocessor which means field is not mandatory.
+ *
+ * @param cellProcessor
+ * next processor in the chain.
+ * @return CellProcessor
+ */
+ private static CellProcessor addOptional(CellProcessor cellProcessor)
+ {
+ if (cellProcessor == null) {
+ return new Optional();
+ }
+ return new Optional(cellProcessor);
+ }
+
+ /**
+ * Get cellprocessor to parse String as Integer.
+ *
+ * @param cellProcessor
+ * next processor in the chain.
+ * @return CellProcessor
+ */
+ private static CellProcessor addParseInt(CellProcessor cellProcessor)
+ {
+ if (cellProcessor == null) {
+ return new ParseInt();
+ }
+ return new ParseInt((LongCellProcessor)cellProcessor);
+ }
+
+ /**
+ * Get cellprocessor to parse String as Long.
+ *
+ * @param cellProcessor
+ * next processor in the chain.
+ * @return CellProcessor
+ */
+ private static CellProcessor addParseLong(CellProcessor cellProcessor)
+ {
+ if (cellProcessor == null) {
+ return new ParseLong();
+ }
+ return new ParseLong((LongCellProcessor)cellProcessor);
+ }
+
+ /**
+ * Get cellprocessor to parse String as Double.
+ *
+ * @param cellProcessor
+ * next processor in the chain.
+ * @return CellProcessor
+ */
+ private static CellProcessor addParseDouble(CellProcessor cellProcessor)
+ {
+ if (cellProcessor == null) {
+ return new ParseDouble();
+ }
+ return new ParseDouble((DoubleCellProcessor)cellProcessor);
+ }
+
+ /**
+ * Get cellprocessor to parse String as Character.
+ *
+ * @param cellProcessor
+ * next processor in the chain.
+ * @return CellProcessor
+ */
+ private static CellProcessor addParseChar(CellProcessor cellProcessor)
+ {
+ if (cellProcessor == null) {
+ return new ParseChar();
+ }
+ return new ParseChar((DoubleCellProcessor)cellProcessor);
+ }
+
+ /**
+ * Custom Cell processor to handle min max constraints for Integers
+ */
+ private static class IntMinMax extends LMinMax
+ {
+ public IntMinMax(int min, int max)
+ {
+ super(min, max);
+ }
+
+ @Override
+ public Object execute(Object value, CsvContext context)
+ {
+ Long result = (Long)super.execute(value, context);
+ return result.intValue();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
index af1eebe..bb72e82 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
@@ -19,287 +19,271 @@
package com.datatorrent.contrib.parser;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.supercsv.cellprocessor.Optional;
-import org.supercsv.cellprocessor.ParseBool;
-import org.supercsv.cellprocessor.ParseChar;
-import org.supercsv.cellprocessor.ParseDate;
-import org.supercsv.cellprocessor.ParseDouble;
-import org.supercsv.cellprocessor.ParseInt;
-import org.supercsv.cellprocessor.ParseLong;
import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.exception.SuperCsvException;
import org.supercsv.io.CsvBeanReader;
+import org.supercsv.io.CsvMapReader;
import org.supercsv.prefs.CsvPreference;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.contrib.parser.DelimitedSchema.Field;
import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.ReusableStringReader;
import com.datatorrent.netlet.util.DTThrowable;
/**
- * Operator that converts CSV string to Pojo <br>
+ * Operator that parses a delimited tuple against a specified schema <br>
+ * Schema is specified in a json format as per {@link DelimitedSchema} that
+ * contains field information and constraints for each field.<br>
* Assumption is that each field in the delimited data should map to a simple
* java type.<br>
* <br>
* <b>Properties</b> <br>
- * <b>fieldInfo</b>:User need to specify fields and their types as a comma
- * separated string having format <NAME>:<TYPE>|<FORMAT> in
- * the same order as incoming data. FORMAT refers to dates with dd/mm/yyyy as
- * default e.g name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy <br>
- * <b>fieldDelimiter</b>: Default is comma <br>
- * <b>lineDelimiter</b>: Default is '\r\n'
+ * <b>schema</b>:schema as a string<br>
+ * <b>clazz</b>:Pojo class <br>
+ * <b>Ports</b> <br>
+ * <b>in</b>:input tuple as a byte array. Each tuple represents a record<br>
+ * <b>parsedOutput</b>:tuples that are validated against the schema are emitted
+ * as Map<String,Object> on this port<br>
+ * <b>out</b>:tuples that are validated against the schema are emitted as pojo
+ * on this port<br>
+ * <b>err</b>:tuples that do not confine to schema are emitted on this port as
+ * KeyValPair<String,String><br>
+ * Key being the tuple and Val being the reason.
*
* @displayName CsvParser
* @category Parsers
* @tags csv pojo parser
* @since 3.2.0
*/
-public class CsvParser extends Parser<String, String>
+@InterfaceStability.Evolving
+public class CsvParser extends Parser<byte[], KeyValPair<String, String>>
{
-
- private ArrayList<Field> fields;
- @NotNull
- protected int fieldDelimiter;
- protected String lineDelimiter;
-
+ /**
+ * Map Reader to read delimited records
+ */
+ private transient CsvMapReader csvMapReader;
+ /**
+ * Bean Reader to read delimited records
+ */
+ private transient CsvBeanReader csvBeanReader;
+ /**
+ * Reader used by csvMapReader and csvBeanReader
+ */
+ private transient ReusableStringReader csvStringReader;
+ /**
+ * Contents of the schema.Schema is specified in a json format as per
+ * {@link DelimitedSchema}
+ */
@NotNull
- protected String fieldInfo;
-
- protected transient String[] nameMapping;
- protected transient CellProcessor[] processors;
- private transient CsvBeanReader csvReader;
-
- public enum FIELD_TYPE
- {
- BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
- };
+ private String schema;
+ /**
+ * Schema is read into this object to access fields
+ */
+ private transient DelimitedSchema delimitedParserSchema;
+ /**
+ * Cell processors are an integral part of reading and writing with Super CSV
+ * they automate the data type conversions, and enforce constraints.
+ */
+ private transient CellProcessor[] processors;
+ /**
+ * Names of all the fields in the same order of incoming records
+ */
+ private transient String[] nameMapping;
+ /**
+ * header-this will be delimiter separated string of field names
+ */
+ private transient String header;
+ /**
+ * Reading preferences that are passed through schema
+ */
+ private transient CsvPreference preference;
- @NotNull
- private transient ReusableStringReader csvStringReader = new ReusableStringReader();
+ /**
+ * metric to keep count of number of tuples emitted on {@link #parsedOutput}
+ * port
+ */
+ @AutoMetric
+ long parsedOutputCount;
- public CsvParser()
+ @Override
+ public void beginWindow(long windowId)
{
- fields = new ArrayList<Field>();
- fieldDelimiter = ',';
- lineDelimiter = "\r\n";
+ super.beginWindow(windowId);
+ parsedOutputCount = 0;
}
@Override
public void setup(OperatorContext context)
{
- super.setup(context);
-
- logger.info("field info {}", fieldInfo);
- fields = new ArrayList<Field>();
- String[] fieldInfoTuple = fieldInfo.split(",");
- for (int i = 0; i < fieldInfoTuple.length; i++) {
- String[] fieldTuple = fieldInfoTuple[i].split(":");
- Field field = new Field();
- field.setName(fieldTuple[0]);
- String[] typeFormat = fieldTuple[1].split("\\|");
- field.setType(typeFormat[0].toUpperCase());
- if (typeFormat.length > 1) {
- field.setFormat(typeFormat[1]);
- }
- getFields().add(field);
- }
-
- CsvPreference preference = new CsvPreference.Builder('"', fieldDelimiter, lineDelimiter).build();
- csvReader = new CsvBeanReader(csvStringReader, preference);
- int countKeyValue = getFields().size();
- logger.info("countKeyValue {}", countKeyValue);
- nameMapping = new String[countKeyValue];
- processors = new CellProcessor[countKeyValue];
- initialise(nameMapping, processors);
+ delimitedParserSchema = new DelimitedSchema(schema);
+ preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
+ delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
+ nameMapping = delimitedParserSchema.getFieldNames().toArray(
+ new String[delimitedParserSchema.getFieldNames().size()]);
+ header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
+ processors = getProcessor(delimitedParserSchema.getFields());
+ csvStringReader = new ReusableStringReader();
+ csvMapReader = new CsvMapReader(csvStringReader, preference);
+ csvBeanReader = new CsvBeanReader(csvStringReader, preference);
}
- private void initialise(String[] nameMapping, CellProcessor[] processors)
- {
- for (int i = 0; i < getFields().size(); i++) {
- FIELD_TYPE type = getFields().get(i).type;
- nameMapping[i] = getFields().get(i).name;
- if (type == FIELD_TYPE.DOUBLE) {
- processors[i] = new Optional(new ParseDouble());
- } else if (type == FIELD_TYPE.INTEGER) {
- processors[i] = new Optional(new ParseInt());
- } else if (type == FIELD_TYPE.FLOAT) {
- processors[i] = new Optional(new ParseDouble());
- } else if (type == FIELD_TYPE.LONG) {
- processors[i] = new Optional(new ParseLong());
- } else if (type == FIELD_TYPE.SHORT) {
- processors[i] = new Optional(new ParseInt());
- } else if (type == FIELD_TYPE.STRING) {
- processors[i] = new Optional();
- } else if (type == FIELD_TYPE.CHARACTER) {
- processors[i] = new Optional(new ParseChar());
- } else if (type == FIELD_TYPE.BOOLEAN) {
- processors[i] = new Optional(new ParseBool());
- } else if (type == FIELD_TYPE.DATE) {
- String dateFormat = getFields().get(i).format;
- processors[i] = new Optional(new ParseDate(dateFormat == null ? "dd/MM/yyyy" : dateFormat));
- }
- }
- }
@Override
- public Object convert(String tuple)
+ public Object convert(byte[] tuple)
{
- try {
- csvStringReader.open(tuple);
- return csvReader.read(clazz, nameMapping, processors);
- } catch (IOException e) {
- logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
- return null;
- }
+ throw new UnsupportedOperationException("Not supported");
}
@Override
- public void teardown()
+ public void processTuple(byte[] tuple)
{
- try {
- if (csvReader != null) {
- csvReader.close();
+ if (tuple == null) {
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
}
- } catch (IOException e) {
- DTThrowable.rethrow(e);
- }
- }
-
- @Override
- public String processErorrTuple(String input)
- {
- return input;
- }
-
- public static class Field
- {
- String name;
- String format;
- FIELD_TYPE type;
-
- public String getName()
- {
- return name;
+ errorTupleCount++;
+ return;
}
-
- public void setName(String name)
- {
- this.name = name;
- }
-
- public FIELD_TYPE getType()
- {
- return type;
- }
-
- public void setType(String type)
- {
- this.type = FIELD_TYPE.valueOf(type);
+ String incomingString = new String(tuple);
+ if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, header)) {
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<String, String>(incomingString, "Blank/header tuple"));
+ }
+ errorTupleCount++;
+ return;
}
+ try {
+ if (parsedOutput.isConnected()) {
+ csvStringReader.open(incomingString);
+ Map<String, Object> map = csvMapReader.read(nameMapping, processors);
+ parsedOutput.emit(map);
+ parsedOutputCount++;
+ }
- public String getFormat()
- {
- return format;
- }
+ if (out.isConnected() && clazz != null) {
+ csvStringReader.open(incomingString);
+ Object obj = csvBeanReader.read(clazz, nameMapping, processors);
+ out.emit(obj);
+ emittedObjectCount++;
+ }
- public void setFormat(String format)
- {
- this.format = format;
+ } catch (SuperCsvException | IOException e) {
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
+ }
+ errorTupleCount++;
+ logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
}
-
}
- /**
- * Gets the array list of the fields, a field being a POJO containing the name
- * of the field and type of field.
- *
- * @return An array list of Fields.
- */
- public ArrayList<Field> getFields()
+ @Override
+ public KeyValPair<String, String> processErrorTuple(byte[] input)
{
- return fields;
+ throw new UnsupportedOperationException("Not supported");
}
/**
- * Sets the array list of the fields, a field being a POJO containing the name
- * of the field and type of field.
- *
- * @param fields
- * An array list of Fields.
+ * Returns array of cellprocessors, one for each field
*/
- public void setFields(ArrayList<Field> fields)
+ private CellProcessor[] getProcessor(List<Field> fields)
{
- this.fields = fields;
+ CellProcessor[] processor = new CellProcessor[fields.size()];
+ int fieldCount = 0;
+ for (Field field : fields) {
+ processor[fieldCount++] = CellProcessorBuilder.getCellProcessor(field.getType(), field.getConstraints());
+ }
+ return processor;
}
- /**
- * Gets the delimiter which separates fields in incoming data.
- *
- * @return fieldDelimiter
- */
- public int getFieldDelimiter()
+ @Override
+ public void teardown()
{
- return fieldDelimiter;
+ try {
+ csvMapReader.close();
+ } catch (IOException e) {
+ logger.error("Error while closing csv map reader {}", e.getMessage());
+ DTThrowable.wrapIfChecked(e);
+ }
+ try {
+ csvBeanReader.close();
+ } catch (IOException e) {
+ logger.error("Error while closing csv bean reader {}", e.getMessage());
+ DTThrowable.wrapIfChecked(e);
+ }
}
/**
- * Sets the delimiter which separates fields in incoming data.
+ * Get the schema
*
- * @param fieldDelimiter
+ * @return
*/
- public void setFieldDelimiter(int fieldDelimiter)
+ public String getSchema()
{
- this.fieldDelimiter = fieldDelimiter;
+ return schema;
}
/**
- * Gets the delimiter which separates lines in incoming data.
+ * Set the schema
*
- * @return lineDelimiter
+ * @param schema
*/
- public String getLineDelimiter()
+ public void setSchema(String schema)
{
- return lineDelimiter;
+ this.schema = schema;
}
/**
- * Sets the delimiter which separates line in incoming data.
+ * Get errorTupleCount
*
- * @param lineDelimiter
+ * @return errorTupleCount
*/
- public void setLineDelimiter(String lineDelimiter)
+ @VisibleForTesting
+ public long getErrorTupleCount()
{
- this.lineDelimiter = lineDelimiter;
+ return errorTupleCount;
}
/**
- * Gets the name of the fields with type and format ( for date ) as comma
- * separated string in same order as incoming data. e.g
- * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+ * Get emittedObjectCount
*
- * @return fieldInfo
+ * @return emittedObjectCount
*/
- public String getFieldInfo()
+ @VisibleForTesting
+ public long getEmittedObjectCount()
{
- return fieldInfo;
+ return emittedObjectCount;
}
/**
- * Sets the name of the fields with type and format ( for date ) as comma
- * separated string in same order as incoming data. e.g
- * name:string,dept:string,eid:integer,dateOfJoining:date|dd/mm/yyyy
+ * Get incomingTuplesCount
*
- * @param fieldInfo
+ * @return incomingTuplesCount
*/
- public void setFieldInfo(String fieldInfo)
+ @VisibleForTesting
+ public long getIncomingTuplesCount()
{
- this.fieldInfo = fieldInfo;
+ return incomingTuplesCount;
}
+ /**
+ * output port to emit validate records as map
+ */
+ public final transient DefaultOutputPort<Map<String, Object>> parsedOutput = new DefaultOutputPort<Map<String, Object>>();
private static final Logger logger = LoggerFactory.getLogger(CsvParser.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
new file mode 100644
index 0000000..19f6a4b
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * This is schema that defines fields and their constraints for delimited files
+ * The operators use this information to validate the incoming tuples.
+ * Information from JSON schema is saved in this object and is used by the
+ * operators
+ * <p>
+ * <br>
+ * <br>
+ * Example schema <br>
+ * <br>
+ * {@code{ "separator": ",", "quoteChar":"\"", "fields": [ "name": "adId",
+ * "type": "Integer", "constraints": "required": "true" } , { "name": "adName",
+ * "type": "String", "constraints": { "required": "true", "pattern":
+ * "[a-z].*[a-z]$", "maxLength": "20" } }, { "name": "bidPrice", "type":
+ * "Double", "constraints": { "required": "true", "minValue": "0.1", "maxValue":
+ * "3.2" } }, { "name": "startDate", "type": "Date", "constraints": { "format":
+ * "dd/MM/yyyy" } }, { "name": "securityCode", "type": "Long", "constraints": {
+ * "minValue": "10", "maxValue": "30" } }, { "name": "active", "type":
+ * "Boolean", "constraints": { "required": "true" } } ] }}
+ */
+public class DelimitedSchema
+{
+
+ /**
+ * JSON key string for separator
+ */
+ private static final String SEPARATOR = "separator";
+ /**
+ * JSON key string for quote character
+ */
+ private static final String QUOTE_CHAR = "quoteChar";
+ /**
+ * JSON key string for line delimiter
+ */
+ private static final String LINE_DELIMITER = "lineDelimiter";
+ /**
+ * JSON key string for fields array
+ */
+ private static final String FIELDS = "fields";
+ /**
+ * JSON key string for name of the field within fields array
+ */
+ private static final String NAME = "name";
+ /**
+ * JSON key string for type of the field within fields array
+ */
+ private static final String TYPE = "type";
+ /**
+ * JSON key string for constraints for each field
+ */
+ private static final String CONSTRAINTS = "constraints";
+ /**
+ * JSON key string for required constraint
+ */
+ public static final String REQUIRED = "required";
+ /**
+ * JSON key string for equals constraint
+ */
+ public static final String EQUALS = "equals";
+ /**
+ * JSON key string for length constraint
+ */
+ public static final String LENGTH = "length";
+ /**
+ * JSON key string for min length constraint
+ */
+ public static final String MIN_LENGTH = "minLength";
+ /**
+ * JSON key string for max length constraint
+ */
+ public static final String MAX_LENGTH = "maxLength";
+ /**
+ * JSON key string for min value constraint
+ */
+ public static final String MIN_VALUE = "minValue";
+ /**
+ * JSON key string for max value constraint
+ */
+ public static final String MAX_VALUE = "maxValue";
+ /**
+ * JSON key string for regex pattern constraint
+ */
+ public static final String REGEX_PATTERN = "pattern";
+ /**
+ * JSON key string for date format constraint
+ */
+ public static final String DATE_FORMAT = "format";
+ /**
+ * JSON key string for locale constraint
+ */
+ public static final String LOCALE = "locale";
+ /**
+ * JSON key string for true value constraint
+ */
+ public static final String TRUE_VALUE = "trueValue";
+ /**
+ * JSON key string for false value constraint
+ */
+ public static final String FALSE_VALUE = "falseValue";
+ /**
+ * delimiter character provided in schema. Default is ,
+ */
+ private int delimiterChar = ',';
+ /**
+ * quote character provided in schema. Default is "
+ */
+ private char quoteChar = '\"';
+ /**
+ * line delimiter character provided in schema. Default is new line character
+ */
+ private String lineDelimiter = "\r\n";
+ /**
+ * This holds the list of field names in the same order as in the schema
+ */
+ private List<String> fieldNames = new LinkedList<String>();
+ /**
+ * This holds list of {@link Field}
+ */
+ private List<Field> fields = new LinkedList<Field>();
+
+ /**
+ * Supported data types
+ */
+ public enum FieldType
+ {
+ BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
+ };
+
+ public DelimitedSchema(String json)
+ {
+ try {
+ initialize(json);
+ } catch (JSONException | IOException e) {
+ logger.error("{}", e);
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * For a given json string, this method sets the field members
+ *
+ * @param json
+ * @throws JSONException
+ * @throws IOException
+ */
+ private void initialize(String json) throws JSONException, IOException
+ {
+ JSONObject jo = new JSONObject(json);
+ if (jo.has(SEPARATOR)) {
+ delimiterChar = ((String)jo.getString(SEPARATOR)).charAt(0);
+ }
+ if (jo.has(QUOTE_CHAR)) {
+ quoteChar = ((String)jo.getString(QUOTE_CHAR)).charAt(0);
+ }
+ if (jo.has(LINE_DELIMITER)) {
+ lineDelimiter = (String)jo.getString(LINE_DELIMITER);
+ }
+
+ JSONArray fieldArray = jo.getJSONArray(FIELDS);
+ for (int i = 0; i < fieldArray.length(); i++) {
+ JSONObject obj = fieldArray.getJSONObject(i);
+ Field field = new Field(obj.getString(NAME), obj.getString(TYPE));
+ fields.add(field);
+ fieldNames.add(field.name);
+ if (obj.has(CONSTRAINTS)) {
+ JSONObject constraints = obj.getJSONObject(CONSTRAINTS);
+ field.constraints = new ObjectMapper().readValue(constraints.toString(), HashMap.class);
+ }
+ }
+ }
+
+ /**
+ * Get the list of field names mentioned in schema
+ *
+ * @return fieldNames
+ */
+ public List<String> getFieldNames()
+ {
+ return Collections.unmodifiableList(fieldNames);
+ }
+
+ /**
+ * Get the delimiter character
+ *
+ * @return delimiterChar
+ */
+ public int getDelimiterChar()
+ {
+ return delimiterChar;
+ }
+
+ /**
+ * Get the quoteChar
+ *
+ * @return quoteChar
+ */
+ public char getQuoteChar()
+ {
+ return quoteChar;
+ }
+
+ /**
+ * Get the line delimiter
+ *
+ * @return lineDelimiter
+ */
+ public String getLineDelimiter()
+ {
+ return lineDelimiter;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DelimitedSchema [delimiterChar=" + delimiterChar + ", quoteChar=" + quoteChar + ", lineDelimiter="
+ + lineDelimiter + ", fieldNames=" + fieldNames + ", fields=" + fields + "]";
+ }
+
+ /**
+ * Get the list of Fields.
+ *
+ * @return fields
+ */
+ public List<Field> getFields()
+ {
+ return Collections.unmodifiableList(fields);
+ }
+
+ /**
+ * Objects of this class represents a particular field in the schema. Each
+ * field has a name, type and a set of associated constraints.
+ *
+ */
+ public class Field
+ {
+ /**
+ * name of the field
+ */
+ String name;
+ /**
+ * Data type of the field
+ */
+ FieldType type;
+ /**
+ * constraints associated with the field
+ */
+ Map<String, Object> constraints = new HashMap<String, Object>();
+
+ public Field(String name, String type)
+ {
+ this.name = name;
+ this.type = FieldType.valueOf(type.toUpperCase());
+ }
+
+ /**
+ * Get the name of the field
+ *
+ * @return name
+ */
+ public String getName()
+ {
+ return name;
+ }
+
+ /**
+ * Set the name of the field
+ *
+ * @param name
+ */
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ /**
+ * Get {@link FieldType}
+ *
+ * @return type
+ */
+ public FieldType getType()
+ {
+ return type;
+ }
+
+ /**
+ * Set {@link FieldType}
+ *
+ * @param type
+ */
+ public void setType(FieldType type)
+ {
+ this.type = type;
+ }
+
+ /**
+ * Get the map of constraints associated with the field
+ *
+ * @return constraints
+ */
+ public Map<String, Object> getConstraints()
+ {
+ return constraints;
+ }
+
+ /**
+ * Sets the map of constraints associated with the field
+ *
+ * @param constraints
+ */
+ public void setConstraints(Map<String, Object> constraints)
+ {
+ this.constraints = constraints;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Fields [name=" + name + ", type=" + type + ", constraints=" + constraints + "]";
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(DelimitedSchema.class);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java
index c9a4179..dc77aa8 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/parser/CsvPOJOParserTest.java
@@ -20,169 +20,514 @@ package com.datatorrent.contrib.parser;
import java.util.Date;
-import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
public class CsvPOJOParserTest
{
- CsvParser operator;
- CollectorTestSink<Object> validDataSink;
- CollectorTestSink<String> invalidDataSink;
+ private static final String filename = "schema.json";
+ CollectorTestSink<Object> error = new CollectorTestSink<Object>();
+ CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>();
+ CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
+ CsvParser parser = new CsvParser();
@Rule
public Watcher watcher = new Watcher();
public class Watcher extends TestWatcher
{
-
@Override
protected void starting(Description description)
{
super.starting(description);
- operator = new CsvParser();
- operator.setClazz(EmployeeBean.class);
- operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date");
- validDataSink = new CollectorTestSink<Object>();
- invalidDataSink = new CollectorTestSink<String>();
- TestUtils.setSink(operator.out, validDataSink);
- TestUtils.setSink(operator.err, invalidDataSink);
+ parser.setClazz(Ad.class);
+ parser.setSchema(SchemaUtils.jarResourceFileToString(filename));
+ parser.setup(null);
+ parser.err.setSink(error);
+ parser.parsedOutput.setSink(objectPort);
+ parser.out.setSink(pojoPort);
}
@Override
protected void finished(Description description)
{
super.finished(description);
- operator.teardown();
+ error.clear();
+ objectPort.clear();
+ pojoPort.clear();
+ parser.teardown();
}
-
}
+ /*
+ * adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,isActive,isOptimized,parentCampaign,weatherTargeted,valid
+ * 1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes
+ * Constraints are defined in schema.json
+ */
@Test
- public void testCsvToPojoWriterDefault()
+ public void TestParserValidInput()
{
- operator.setup(null);
- String tuple = "john,cs,1,01/01/2015";
- operator.in.process(tuple);
- Assert.assertEquals(1, validDataSink.collectedTuples.size());
- Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
- Object obj = validDataSink.collectedTuples.get(0);
+ String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, objectPort.collectedTuples.size());
+ Assert.assertEquals(1, pojoPort.collectedTuples.size());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ Object obj = pojoPort.collectedTuples.get(0);
+ Ad adPojo = (Ad)obj;
Assert.assertNotNull(obj);
- Assert.assertEquals(EmployeeBean.class, obj.getClass());
- EmployeeBean pojo = (EmployeeBean)obj;
- Assert.assertEquals("john", pojo.getName());
- Assert.assertEquals("cs", pojo.getDept());
- Assert.assertEquals(1, pojo.getEid());
- Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
- pojo.getDateOfJoining()));
+ Assert.assertEquals(Ad.class, obj.getClass());
+ Assert.assertEquals(1234, adPojo.getAdId());
+ Assert.assertTrue("adxyz".equals(adPojo.getAdName()));
+ Assert.assertEquals(0.2, adPojo.getBidPrice(), 0.0);
+ Assert.assertEquals(Date.class, adPojo.getStartDate().getClass());
+ Assert.assertEquals(Date.class, adPojo.getEndDate().getClass());
+ Assert.assertEquals(12, adPojo.getSecurityCode());
+ Assert.assertTrue("CAMP_AD".equals(adPojo.getParentCampaign()));
+ Assert.assertTrue(adPojo.isActive());
+ Assert.assertFalse(adPojo.isOptimized());
+ Assert.assertTrue("yes".equals(adPojo.getValid()));
}
@Test
- public void testCsvToPojoWriterDateFormat()
+ public void TestParserValidInputPojoPortNotConnected()
{
- operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy");
- operator.setup(null);
- String tuple = "john,cs,1,01-JAN-2015";
- operator.in.process(tuple);
- Assert.assertEquals(1, validDataSink.collectedTuples.size());
- Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
- Object obj = validDataSink.collectedTuples.get(0);
+ parser.out.setSink(null);
+ String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserValidInputClassNameNotProvided()
+ {
+ parser.setClazz(null);
+ String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserInvalidAdIdInput()
+ {
+ String input = ",98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserNoCampaignIdInput()
+ {
+ String input = "1234,,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, objectPort.collectedTuples.size());
+ Assert.assertEquals(1, pojoPort.collectedTuples.size());
+ Object obj = pojoPort.collectedTuples.get(0);
Assert.assertNotNull(obj);
- Assert.assertEquals(EmployeeBean.class, obj.getClass());
- EmployeeBean pojo = (EmployeeBean)obj;
- Assert.assertEquals("john", pojo.getName());
- Assert.assertEquals("cs", pojo.getDept());
- Assert.assertEquals(1, pojo.getEid());
- Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
- pojo.getDateOfJoining()));
+ Assert.assertEquals(Ad.class, obj.getClass());
+ Assert.assertEquals(0, error.collectedTuples.size());
}
@Test
- public void testCsvToPojoWriterDateFormatMultiple()
+ public void TestParserInvalidCampaignIdInput()
{
- operator.setFieldInfo("name:string,dept:string,eid:integer,dateOfJoining:date|dd-MMM-yyyy,dateOfBirth:date");
- operator.setup(null);
- String tuple = "john,cs,1,01-JAN-2015,01/01/2015";
- operator.in.process(tuple);
- Assert.assertEquals(1, validDataSink.collectedTuples.size());
- Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
- Object obj = validDataSink.collectedTuples.get(0);
+ String input = "1234,9833,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserInvalidAdNameInput()
+ {
+ String input = "1234,98233,adxyz123,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserInvalidBidPriceInput()
+ {
+ String input = "1234,98233,adxyz,3.3,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserInvalidStartDateInput()
+ {
+ String input = "1234,98233,adxyz,0.2,2015-30-08 02:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserInvalidSecurityCodeInput()
+ {
+ String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,85,y,,CAMP_AD,Y,yes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserInvalidisActiveInput()
+ {
+ String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,yo,,CAMP_AD,Y,yes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserInvalidParentCampaignInput()
+ {
+ String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP,Y,yes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserValidisOptimized()
+ {
+ String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes";
+ parser.in.process(input.getBytes());
+ Assert.assertEquals(1, objectPort.collectedTuples.size());
+ Assert.assertEquals(1, pojoPort.collectedTuples.size());
+ Object obj = pojoPort.collectedTuples.get(0);
Assert.assertNotNull(obj);
- Assert.assertEquals(EmployeeBean.class, obj.getClass());
- EmployeeBean pojo = (EmployeeBean)obj;
- Assert.assertEquals("john", pojo.getName());
- Assert.assertEquals("cs", pojo.getDept());
- Assert.assertEquals(1, pojo.getEid());
- Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
- pojo.getDateOfJoining()));
- Assert.assertEquals(new DateTime().withDate(2015, 1, 1).withMillisOfDay(0).withTimeAtStartOfDay(), new DateTime(
- pojo.getDateOfBirth()));
+ Assert.assertEquals(Ad.class, obj.getClass());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserInValidisOptimized()
+ {
+ String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION,CAMP,Y,yes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserInValidWeatherTargeting()
+ {
+ String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,NO,yes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserNullOrBlankInput()
+ {
+ parser.beginWindow(0);
+ parser.in.process(null);
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserHeaderAsInput()
+ {
+ String input = "adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,active,optimized,parentCampaign,weatherTargeted,valid";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserLessFields()
+ {
+ parser.beginWindow(0);
+ parser.in.process("1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION".getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
}
- public static class EmployeeBean
+ @Test
+ public void TestParserMoreFields()
{
- private String name;
- private String dept;
- private int eid;
- private Date dateOfJoining;
- private Date dateOfBirth;
+ parser.beginWindow(0);
+ parser.in.process("1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION,CAMP_AD,Y,yes,ExtraField"
+ .getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
- public String getName()
+ @Test
+ public void TestParserValidInputMetricVerification()
+ {
+ parser.beginWindow(0);
+ Assert.assertEquals(0, parser.parsedOutputCount);
+ Assert.assertEquals(0, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, parser.parsedOutputCount);
+ Assert.assertEquals(1, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(1, parser.getEmittedObjectCount());
+ }
+
+ @Test
+ public void TestParserInvalidInputMetricVerification()
+ {
+ parser.beginWindow(0);
+ Assert.assertEquals(0, parser.parsedOutputCount);
+ Assert.assertEquals(0, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ parser.in.process("1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZATION,CAMP_AD,Y,yes,ExtraField"
+ .getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, parser.parsedOutputCount);
+ Assert.assertEquals(1, parser.getIncomingTuplesCount());
+ Assert.assertEquals(1, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ }
+
+ @Test
+ public void TestParserValidInputMetricResetCheck()
+ {
+ parser.beginWindow(0);
+ Assert.assertEquals(0, parser.parsedOutputCount);
+ Assert.assertEquals(0, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ String input = "1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,,CAMP_AD,Y,yes";
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, parser.parsedOutputCount);
+ Assert.assertEquals(1, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(1, parser.getEmittedObjectCount());
+ parser.beginWindow(1);
+ Assert.assertEquals(0, parser.parsedOutputCount);
+ Assert.assertEquals(0, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ parser.in.process(input.getBytes());
+ Assert.assertEquals(1, parser.parsedOutputCount);
+ Assert.assertEquals(1, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(1, parser.getEmittedObjectCount());
+ parser.endWindow();
+ }
+
+ public static class Ad
+ {
+
+ private int adId;
+ private int campaignId;
+ private String adName;
+ private double bidPrice;
+ private Date startDate;
+ private Date endDate;
+ private long securityCode;
+ private boolean active;
+ private boolean optimized;
+ private String parentCampaign;
+ private Character weatherTargeted;
+ private String valid;
+
+ public Ad()
+ {
+
+ }
+
+ public int getAdId()
+ {
+ return adId;
+ }
+
+ public void setAdId(int adId)
+ {
+ this.adId = adId;
+ }
+
+ public int getCampaignId()
+ {
+ return campaignId;
+ }
+
+ public void setCampaignId(int campaignId)
+ {
+ this.campaignId = campaignId;
+ }
+
+ public String getAdName()
+ {
+ return adName;
+ }
+
+ public void setAdName(String adName)
+ {
+ this.adName = adName;
+ }
+
+ public double getBidPrice()
+ {
+ return bidPrice;
+ }
+
+ public void setBidPrice(double bidPrice)
+ {
+ this.bidPrice = bidPrice;
+ }
+
+ public Date getStartDate()
+ {
+ return startDate;
+ }
+
+ public void setStartDate(Date startDate)
+ {
+ this.startDate = startDate;
+ }
+
+ public Date getEndDate()
{
- return name;
+ return endDate;
}
- public void setName(String name)
+ public void setEndDate(Date endDate)
{
- this.name = name;
+ this.endDate = endDate;
}
- public String getDept()
+ public long getSecurityCode()
{
- return dept;
+ return securityCode;
}
- public void setDept(String dept)
+ public void setSecurityCode(long securityCode)
{
- this.dept = dept;
+ this.securityCode = securityCode;
}
- public int getEid()
+ public boolean isActive()
{
- return eid;
+ return active;
}
- public void setEid(int eid)
+ public void setActive(boolean active)
{
- this.eid = eid;
+ this.active = active;
}
- public Date getDateOfJoining()
+ public boolean isOptimized()
{
- return dateOfJoining;
+ return optimized;
}
- public void setDateOfJoining(Date dateOfJoining)
+ public void setOptimized(boolean optimized)
{
- this.dateOfJoining = dateOfJoining;
+ this.optimized = optimized;
}
- public Date getDateOfBirth()
+ public String getParentCampaign()
{
- return dateOfBirth;
+ return parentCampaign;
}
- public void setDateOfBirth(Date dateOfBirth)
+ public void setParentCampaign(String parentCampaign)
+ {
+ this.parentCampaign = parentCampaign;
+ }
+
+ public Character getWeatherTargeted()
+ {
+ return weatherTargeted;
+ }
+
+ public void setWeatherTargeted(Character weatherTargeted)
+ {
+ this.weatherTargeted = weatherTargeted;
+ }
+
+ public String getValid()
+ {
+ return valid;
+ }
+
+ public void setValid(String valid)
+ {
+ this.valid = valid;
+ }
+
+ @Override
+ public String toString()
{
- this.dateOfBirth = dateOfBirth;
+ return "Ad [adId=" + adId + ", campaignId=" + campaignId + ", adName=" + adName + ", bidPrice=" + bidPrice
+ + ", startDate=" + startDate + ", endDate=" + endDate + ", securityCode=" + securityCode + ", active="
+ + active + ", optimized=" + optimized + ", parentCampaign=" + parentCampaign + ", weatherTargeted="
+ + weatherTargeted + ", valid=" + valid + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/contrib/src/test/resources/schema.json
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/schema.json b/contrib/src/test/resources/schema.json
new file mode 100644
index 0000000..13e2789
--- /dev/null
+++ b/contrib/src/test/resources/schema.json
@@ -0,0 +1,96 @@
+{
+ "separator": ",",
+ "quoteChar":"\"",
+ "fields": [
+ {
+ "name": "adId",
+ "type": "Integer",
+ "constraints": {
+ "required": "true"
+ }
+ },
+ {
+ "name": "campaignId",
+ "type": "Integer",
+ "constraints": {
+ "equals": "98233"
+ }
+ },
+ {
+ "name": "adName",
+ "type": "String",
+ "constraints": {
+ "required": "true",
+ "pattern": "[a-z].*[a-z]$",
+ "maxLength": "10"
+ }
+ },
+ {
+ "name": "bidPrice",
+ "type": "Double",
+ "constraints": {
+ "required": "true",
+ "minValue": "0.1",
+ "maxValue": "3.2"
+ }
+ },
+ {
+ "name": "startDate",
+ "type": "Date",
+ "constraints": {
+ "format": "yyyy-MM-dd HH:mm:ss",
+ "locale":"IN"
+ }
+ },
+ {
+ "name": "endDate",
+ "type": "Date",
+ "constraints": {
+ "format": "dd/MM/yyyy"
+ }
+ },
+ {
+ "name": "securityCode",
+ "type": "Long",
+ "constraints": {
+ "minValue": "10",
+ "maxValue": "30"
+ }
+ },
+ {
+ "name": "active",
+ "type": "Boolean",
+ "constraints": {
+ "required": "true"
+ }
+ },
+ {
+ "name": "optimized",
+ "type": "Boolean",
+ "constraints": {
+ "trueValue":"OPTIMIZE",
+ "falseValue":"NO_OPTIMIZE"
+ }
+ },
+ {
+ "name": "parentCampaign",
+ "type": "String",
+ "constraints": {
+ "required": "true",
+ "equals": "CAMP_AD"
+ }
+ },
+ {
+ "name": "weatherTargeted",
+ "type": "Character",
+ "constraints": {
+ "required": "true",
+ "equals": "Y"
+ }
+ },
+ {
+ "name": "valid",
+ "type": "String"
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
index 9e8ee0f..4e9800a 100644
--- a/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
+++ b/library/src/main/java/com/datatorrent/lib/parser/JsonParser.java
@@ -81,7 +81,7 @@ public class JsonParser extends Parser<String, String>
}
@Override
- public String processErorrTuple(String input)
+ public String processErrorTuple(String input)
{
return input;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/library/src/main/java/com/datatorrent/lib/parser/Parser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/Parser.java b/library/src/main/java/com/datatorrent/lib/parser/Parser.java
index 5bcc1c5..4f591f1 100644
--- a/library/src/main/java/com/datatorrent/lib/parser/Parser.java
+++ b/library/src/main/java/com/datatorrent/lib/parser/Parser.java
@@ -82,7 +82,7 @@ public abstract class Parser<INPUT, ERROROUT> extends BaseOperator implements Co
Object tuple = convert(inputTuple);
if (tuple == null && err.isConnected()) {
errorTupleCount++;
- err.emit(processErorrTuple(inputTuple));
+ err.emit(processErrorTuple(inputTuple));
return;
}
if (out.isConnected()) {
@@ -91,7 +91,7 @@ public abstract class Parser<INPUT, ERROROUT> extends BaseOperator implements Co
}
}
- public abstract ERROROUT processErorrTuple(INPUT input);
+ public abstract ERROROUT processErrorTuple(INPUT input);
@Override
public void beginWindow(long windowId)
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9e77ef7b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
index 3d416e1..c8eeacc 100644
--- a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
+++ b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java
@@ -123,7 +123,7 @@ public class XmlParser extends Parser<String, String>
}
@Override
- public String processErorrTuple(String input)
+ public String processErrorTuple(String input)
{
return input;
}