You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/01/10 12:02:29 UTC
apex-malhar git commit: APEXMALHAR-2259 Code changes to add fixed
width parser
Repository: apex-malhar
Updated Branches:
refs/heads/master 16b15c21b -> 0852c6594
APEXMALHAR-2259 Code changes to add fixed width parser
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0852c659
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0852c659
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0852c659
Branch: refs/heads/master
Commit: 0852c659400a2bd484a72d4539cdbaced1c5239f
Parents: 16b15c2
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Fri Sep 23 11:43:16 2016 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Tue Jan 10 16:30:50 2017 +0530
----------------------------------------------------------------------
.../contrib/formatter/CsvFormatter.java | 2 +-
.../contrib/parser/CellProcessorBuilder.java | 2 +-
.../contrib/parser/DelimitedSchema.java | 131 +----
.../contrib/parser/FixedWidthParser.java | 493 +++++++++++++++++
.../contrib/parser/FixedWidthSchema.java | 379 +++++++++++++
.../com/datatorrent/contrib/parser/Schema.java | 176 +++++++
.../contrib/parser/FixedWidthTest.java | 526 +++++++++++++++++++
.../src/test/resources/FixedWidthSchema.json | 78 +++
8 files changed, 1670 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
index 2979b44..2bd0e67 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
@@ -40,7 +40,7 @@ import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.contrib.parser.DelimitedSchema;
import com.datatorrent.contrib.parser.DelimitedSchema.Field;
-import com.datatorrent.contrib.parser.DelimitedSchema.FieldType;
+import com.datatorrent.contrib.parser.Schema.FieldType;
import com.datatorrent.lib.formatter.Formatter;
import com.datatorrent.netlet.util.DTThrowable;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
index e7840aa..292031e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
@@ -40,7 +40,7 @@ import org.supercsv.util.CsvContext;
import org.apache.commons.lang3.StringUtils;
-import com.datatorrent.contrib.parser.DelimitedSchema.FieldType;
+import com.datatorrent.contrib.parser.Schema.FieldType;
/**
* Helper class with methods to generate CellProcessor objects. Cell processors
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
index 29b2c92..a47e138 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
@@ -52,40 +52,9 @@ import org.slf4j.LoggerFactory;
* "dd/MM/yyyy" } }, { "name": "securityCode", "type": "Long", "constraints": {
* "minValue": "10", "maxValue": "30" } }, { "name": "active", "type":
* "Boolean", "constraints": { "required": "true" } } ] }}
- *
- * @since 3.4.0
*/
-public class DelimitedSchema
+public class DelimitedSchema extends Schema
{
-
- /**
- * JSON key string for separator
- */
- private static final String SEPARATOR = "separator";
- /**
- * JSON key string for quote character
- */
- private static final String QUOTE_CHAR = "quoteChar";
- /**
- * JSON key string for line delimiter
- */
- private static final String LINE_DELIMITER = "lineDelimiter";
- /**
- * JSON key string for fields array
- */
- private static final String FIELDS = "fields";
- /**
- * JSON key string for name of the field within fields array
- */
- private static final String NAME = "name";
- /**
- * JSON key string for type of the field within fields array
- */
- private static final String TYPE = "type";
- /**
- * JSON key string for constraints for each field
- */
- private static final String CONSTRAINTS = "constraints";
/**
* JSON key string for required constraint
*/
@@ -119,21 +88,26 @@ public class DelimitedSchema
*/
public static final String REGEX_PATTERN = "pattern";
/**
- * JSON key string for date format constraint
- */
- public static final String DATE_FORMAT = "format";
- /**
* JSON key string for locale constraint
*/
public static final String LOCALE = "locale";
/**
- * JSON key string for true value constraint
+ * JSON key string for separator
+ */
+ private static final String SEPARATOR = "separator";
+ /**
+ * JSON key string for quote character
+ */
+ private static final String QUOTE_CHAR = "quoteChar";
+ /**
+ * JSON key string for line delimiter
*/
- public static final String TRUE_VALUE = "trueValue";
+ private static final String LINE_DELIMITER = "lineDelimiter";
/**
- * JSON key string for false value constraint
+ * JSON key string for constraints for each field
*/
- public static final String FALSE_VALUE = "falseValue";
+ private static final String CONSTRAINTS = "constraints";
+ private static final Logger logger = LoggerFactory.getLogger(DelimitedSchema.class);
/**
* delimiter character provided in schema. Default is ,
*/
@@ -147,22 +121,10 @@ public class DelimitedSchema
*/
private String lineDelimiter = "\r\n";
/**
- * This holds the list of field names in the same order as in the schema
- */
- private List<String> fieldNames = new LinkedList<String>();
- /**
* This holds list of {@link Field}
*/
private List<Field> fields = new LinkedList<Field>();
- /**
- * Supported data types
- */
- public enum FieldType
- {
- BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
- };
-
public DelimitedSchema(String json)
{
try {
@@ -207,16 +169,6 @@ public class DelimitedSchema
}
/**
- * Get the list of field names mentioned in schema
- *
- * @return fieldNames
- */
- public List<String> getFieldNames()
- {
- return Collections.unmodifiableList(fieldNames);
- }
-
- /**
* Get the delimiter character
*
* @return delimiterChar
@@ -268,65 +220,16 @@ public class DelimitedSchema
* field has a name, type and a set of associated constraints.
*
*/
- public class Field
+ public class Field extends Schema.Field
{
/**
- * name of the field
- */
- String name;
- /**
- * Data type of the field
- */
- FieldType type;
- /**
* constraints associated with the field
*/
Map<String, Object> constraints = new HashMap<String, Object>();
public Field(String name, String type)
{
- this.name = name;
- this.type = FieldType.valueOf(type.toUpperCase());
- }
-
- /**
- * Get the name of the field
- *
- * @return name
- */
- public String getName()
- {
- return name;
- }
-
- /**
- * Set the name of the field
- *
- * @param name
- */
- public void setName(String name)
- {
- this.name = name;
- }
-
- /**
- * Get {@link FieldType}
- *
- * @return type
- */
- public FieldType getType()
- {
- return type;
- }
-
- /**
- * Set {@link FieldType}
- *
- * @param type
- */
- public void setType(FieldType type)
- {
- this.type = type;
+ super(name, type);
}
/**
@@ -356,6 +259,4 @@ public class DelimitedSchema
}
}
- private static final Logger logger = LoggerFactory.getLogger(DelimitedSchema.class);
-
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthParser.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthParser.java
new file mode 100644
index 0000000..9ee556e
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthParser.java
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.contrib.parser;
+
+import java.lang.reflect.Field;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.ClassUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.univocity.parsers.fixed.FieldAlignment;
+import com.univocity.parsers.fixed.FixedWidthFields;
+import com.univocity.parsers.fixed.FixedWidthParserSettings;
+
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.parser.Parser;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.lib.util.PojoUtils;
+
+/**
+ * Operator that parses a fixed width record against a specified schema <br>
+ * Schema is specified in a json format as per {@link FixedWidthSchema} that
+ * contains field information for each field.<br>
+ * Assumption is that each field in the data should map to a simple
+ * java type.<br>
+ * <br>
+ * <b>Properties</b> <br>
+ * <b>schema</b>:schema as a string<br>
+ * <b>clazz</b>:Pojo class <br>
+ * <b>Ports</b> <br>
+ * <b>in</b>:input tuple as a byte array. Each tuple represents a record<br>
+ * <b>parsedOutput</b>:tuples that are validated against the schema are emitted
+ * as HashMap<String,Object> on this port<br>
+ * Key being the name of the field and Val being the value of the field.
+ * <b>out</b>:tuples that are validated against the schema are emitted as pojo
+ * on this port<br>
+ * <b>err</b>:tuples that do not confine to schema are emitted on this port as
+ * KeyValPair<String,String><br>
+ * Key being the tuple and Val being the reason.
+ *
+ * @displayName FixedWidthParser
+ * @category Parsers
+ * @tags fixedwidth pojo parser
+ */
+public class FixedWidthParser extends Parser<byte[], KeyValPair<String, String>> implements Operator.ActivationListener<Context>
+{
+ private static final Logger logger = LoggerFactory.getLogger(FixedWidthParser.class);
+ public final transient DefaultOutputPort<HashMap<String, Object>> parsedOutput = new DefaultOutputPort<HashMap<String, Object>>();
+ /**
+ * Metric to keep count of number of tuples emitted on {@link #parsedOutput}
+ * port
+ */
+ @AutoMetric
+ private long parsedOutputCount;
+ /**
+ * Contents of the schema.Schema is specified in a json format as per
+ * {@link FixedWidthSchema}
+ */
+ @NotNull
+ private String jsonSchema;
+ /**
+ * Total length of the record
+ */
+ private int recordLength;
+ /**
+ * Schema is read into this object to access fields
+ */
+ private transient FixedWidthSchema schema;
+ /**
+ * List of setters to set the value in POJO to be emitted
+ */
+ private transient List<FixedWidthParser.TypeInfo> setters;
+ /**
+ * header- This will be string of field names, padded with padding character (if required)
+ */
+ private transient String header;
+ /**
+ * Univocity Parser to parse the input tuples
+ */
+ private com.univocity.parsers.fixed.FixedWidthParser univocityParser;
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ parsedOutputCount = 0;
+ }
+
+ @Override
+ public void processTuple(byte[] tuple)
+ {
+ if (tuple == null) {
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
+ logger.error("Tuple could not be parsed. Reason Blank/null tuple");
+ }
+ errorTupleCount++;
+ return;
+ }
+ String incomingString = new String(tuple);
+ if (StringUtils.isBlank(incomingString) || StringUtils.equals(incomingString, getHeader())) {
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<>(incomingString, "Blank/header tuple"));
+ logger.error("Tuple could not be parsed. Reason Blank/header tuple");
+ }
+ errorTupleCount++;
+ return;
+ }
+ if (incomingString.length() < recordLength) {
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<>(incomingString, "Record length mis-match/shorter tuple"));
+ }
+ logger.error("Tuple could not be parsed. Reason Record length mis-match/shorter tuple. " +
+ "Expected length " + recordLength + " Actual length " + incomingString.length());
+ errorTupleCount++;
+ return;
+ }
+ if (incomingString.length() > recordLength) {
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<>(incomingString, "Record length mis-match/longer tuple"));
+ }
+ logger.error("Tuple could not be parsed. Reason Record length mis-match/longer tuple. " +
+ "Expected length " + recordLength + " Actual length " + incomingString.length());
+ errorTupleCount++;
+ return;
+ }
+ try {
+ String[] values = univocityParser.parseLine(incomingString);
+ HashMap<String, Object> toEmit = new HashMap();
+ Object pojo = validateAndSet(values, toEmit);
+ if (parsedOutput.isConnected()) {
+ parsedOutput.emit(toEmit);
+ parsedOutputCount++;
+ }
+ if (out.isConnected() && clazz != null) {
+ out.emit(pojo);
+ emittedObjectCount++;
+ }
+ } catch (Exception e) {
+ if (err.isConnected()) {
+ err.emit(new KeyValPair<>(incomingString, e.getMessage()));
+ }
+ errorTupleCount++;
+ logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
+ }
+ }
+
+ @Override
+ public KeyValPair<String, String> processErrorTuple(byte[] input)
+ {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public Object convert(byte[] tuple)
+ {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ try {
+ schema = new FixedWidthSchema(jsonSchema);
+ recordLength = 0;
+ List<FixedWidthSchema.Field> fields = schema.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ recordLength += fields.get(i).getFieldLength();
+ }
+ createUnivocityParser();
+ } catch (Exception e) {
+ logger.error("Cannot setup Parser Reason {}", e.getMessage());
+ throw e;
+ }
+ }
+
+ /**
+ * Activate the Parser
+ */
+ @Override
+ public void activate(Context context)
+ {
+ try {
+ if (clazz != null) {
+ setters = new ArrayList<>();
+ List<String> fieldNames = schema.getFieldNames();
+ if (fieldNames != null) {
+ for (String fieldName : fieldNames) {
+ addSetter(fieldName);
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Cannot activate Parser Reason {}", e.getMessage());
+ throw e;
+ }
+ }
+
+ /**
+ * Function to create a univocity Parser
+ */
+ private void createUnivocityParser()
+ {
+ List<FixedWidthSchema.Field> fields = schema.getFields();
+ FixedWidthFields fieldWidthFields = new FixedWidthFields();
+
+ for (int i = 0; i < fields.size(); i++) {
+ FixedWidthSchema.Field currentField = fields.get(i);
+ int fieldLength = currentField.getFieldLength();
+ FieldAlignment currentFieldAlignment;
+
+ if (currentField.getAlignment().equalsIgnoreCase("centre")) {
+ currentFieldAlignment = FieldAlignment.CENTER;
+ } else if (currentField.getAlignment().equalsIgnoreCase("left")) {
+ currentFieldAlignment = FieldAlignment.LEFT;
+ } else {
+ currentFieldAlignment = FieldAlignment.RIGHT;
+ }
+ fieldWidthFields.addField(currentField.getName(), fieldLength, currentFieldAlignment, currentField.getPadding());
+ }
+ FixedWidthParserSettings settings = new FixedWidthParserSettings(fieldWidthFields);
+ univocityParser = new com.univocity.parsers.fixed.FixedWidthParser(settings);
+ }
+
+ @Override
+ public void deactivate()
+ {
+
+ }
+
+ /**
+ * Function to add a setter for a field and add it
+ * to the List of setters
+ *
+ * @param fieldName name of the field for which setter is to be added
+ */
+ private void addSetter(String fieldName)
+ {
+ try {
+ Field f = clazz.getDeclaredField(fieldName);
+ FixedWidthParser.TypeInfo t = new FixedWidthParser.TypeInfo(f.getName(),
+ ClassUtils.primitiveToWrapper(f.getType()));
+ t.setter = PojoUtils.createSetter(clazz, t.name, t.type);
+ setters.add(t);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException("Field " + fieldName + " not found in class " + clazz, e);
+ } catch (Exception e) {
+ throw new RuntimeException("Exception while adding a setter" + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Function to validate individual parsed values and set the objects to be emitted
+ * @param values array of String containing individual parsed values
+ * @param toEmit the map to be emitted
+ * @return POJO the object to be returned (if the tuple class is set)
+ */
+ private Object validateAndSet(String[] values, HashMap toEmit)
+ {
+ Object pojoObject = null;
+ try {
+ List<FixedWidthSchema.Field> fields = schema.getFields();
+ try {
+ if (clazz != null) {
+ pojoObject = clazz.newInstance();
+ }
+ } catch (InstantiationException ie) {
+ throw new RuntimeException("Exception in instantiating", ie);
+ }
+ for (int i = 0; i < fields.size(); i++) {
+ FixedWidthSchema.Field currentField = fields.get(i);
+ FixedWidthParser.TypeInfo typeInfo = setters.get(i);
+ validateAndSetCurrentField(currentField,
+ values[i], typeInfo, pojoObject, toEmit);
+ }
+ } catch (StringIndexOutOfBoundsException e) {
+ throw new RuntimeException("Record length and tuple length mismatch ", e);
+ } catch (IllegalAccessException ie) {
+ throw new RuntimeException("Illegal Access ", ie);
+ } catch (Exception e) {
+ throw new RuntimeException("Exception in validation", e);
+ }
+ return pojoObject;
+ }
+
+ /**
+ * Function to validate and set the current field.
+ * @param currentField the field which is to be validated and set
+ * @param value the parsed value of the field
+ * @param typeInfo information about the field in POJO
+ * @param pojoObject POJO which is to be set
+ * @param toEmit the map to be emitted
+ */
+ private void validateAndSetCurrentField(FixedWidthSchema.Field currentField,
+ String value, FixedWidthParser.TypeInfo typeInfo, Object pojoObject, HashMap toEmit)
+ {
+ try {
+ String fieldName = currentField.getName();
+ if (value != null && !value.isEmpty()) {
+ Object result;
+ switch (currentField.getType()) {
+ case INTEGER:
+ result = Integer.parseInt(value);
+ break;
+ case DOUBLE:
+ result = Double.parseDouble(value);
+ break;
+ case STRING:
+ result = value;
+ break;
+ case CHARACTER:
+ result = value.charAt(0);
+ break;
+ case FLOAT:
+ result = Float.parseFloat(value);
+ break;
+ case LONG:
+ result = Long.parseLong(value);
+ break;
+ case SHORT:
+ result = Short.parseShort(value);
+ break;
+ case BOOLEAN:
+ if (value.compareToIgnoreCase(currentField.getTrueValue()) == 0) {
+ result = Boolean.parseBoolean("true");
+ } else if (value.compareToIgnoreCase(currentField.getFalseValue()) == 0) {
+ result = Boolean.parseBoolean("false");
+ } else {
+ throw new NumberFormatException();
+ }
+ break;
+ case DATE:
+ DateFormat df = new SimpleDateFormat(currentField.getDateFormat());
+ df.setLenient(false);
+ result = df.parse(value);
+ break;
+ default:
+ throw new RuntimeException("Invalid Type in Field", new Exception());
+ }
+ toEmit.put(fieldName,result);
+ if (typeInfo != null && pojoObject != null) {
+ typeInfo.setter.set(pojoObject, result);
+ }
+ } else {
+ toEmit.put(fieldName,value);
+ }
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("Error parsing" + value + " to Integer type", e);
+ } catch (ParseException e) {
+ throw new RuntimeException("Error parsing" + value, e);
+ }catch (Exception e) {
+ throw new RuntimeException("Error setting " + value + " in the given class" + typeInfo.toString(), e);
+ }
+ }
+
+ /**
+ * Get the schema
+ *
+ * @return the Json schema
+ */
+ public String getJsonSchema()
+ {
+ return jsonSchema;
+ }
+
+ /**
+ * Set the jsonSchema
+ *
+ * @param jsonSchema schema to be set.
+ */
+ public void setJsonSchema(String jsonSchema)
+ {
+ this.jsonSchema = jsonSchema;
+ }
+
+ /**
+ * Get the header
+ *
+ * @return header- This will be string of field names, padded with padding character (if required)
+ */
+ public String getHeader()
+ {
+ return header;
+ }
+
+ /**
+ * Set the header
+ *
+ * @param header- This will be string of field names, padded with padding character (if required)
+ */
+ public void setHeader(String header)
+ {
+ this.header = header;
+ }
+
+ /**
+ * Get errorTupleCount
+ *
+ * @return errorTupleCount number of erroneous tuples.
+ */
+ @VisibleForTesting
+ public long getErrorTupleCount()
+ {
+ return errorTupleCount;
+ }
+
+ /**
+ * Get emittedObjectCount
+ *
+ * @return emittedObjectCount count of objects emitted.
+ */
+ @VisibleForTesting
+ public long getEmittedObjectCount()
+ {
+ return emittedObjectCount;
+ }
+
+ /**
+ * Get incomingTuplesCount
+ *
+ * @return incomingTuplesCount number of incoming tuples.
+ */
+ @VisibleForTesting
+ public long getIncomingTuplesCount()
+ {
+ return incomingTuplesCount;
+ }
+
+ /**
+ * Get parsedOutputCount
+ *
+ * @return parsedOutPutCount count of well parsed tuples.
+ */
+ @VisibleForTesting
+ public long getParsedOutputCount()
+ {
+ return parsedOutputCount;
+ }
+
+ /**
+ * Objects of this class represents a particular data member of the Class to be emitted.
+ * Each data member has a name, type and a accessor(setter) function associated with it.
+ */
+ static class TypeInfo
+ {
+ String name;
+ Class type;
+ PojoUtils.Setter setter;
+
+ public TypeInfo(String name, Class<?> type)
+ {
+ this.name = name;
+ this.type = type;
+ }
+
+ public String toString()
+ {
+ return "'name': " + name + " 'type': " + type;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthSchema.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthSchema.java b/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthSchema.java
new file mode 100644
index 0000000..e64125b
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/FixedWidthSchema.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.contrib.parser;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * This is schema that defines fields and their constraints for fixed width files
+ * The operators use this information to validate the incoming tuples.
+ * Information from JSON schema is saved in this object and is used by the
+ * operators
+ * <p>
+ * <br>
+ * <br>
+ * Example schema <br>
+ * <br>
+ * {{ "padding": " ","alignment"="left","fields": [ {"name": "adId",
+ * "type": "Integer","padding":"0", "length": 3} , { "name": "adName",
+ * "type": "String", "alignment": "right","fieldLength": 20}, { "name": "bidPrice", "type":
+ * "Double", "length": 5}, { "name": "startDate", "type": "Date", "length": 10,
+ * "format":"dd/MM/yyyy" }, { "name": "securityCode", "type": "Long","length": 10 },
+ * { "name": "active", "type":"Boolean","length": 2} ] }}
+ */
+public class FixedWidthSchema extends Schema
+{
+ /**
+ * JSON key string for record length
+ */
+ public static final String FIELD_LENGTH = "length";
+ /**
+ * JSON key string for Padding Character
+ */
+ public static final String FIELD_PADDING_CHARACTER = "padding";
+ /**
+ * Default Padding Character
+ */
+ public static final char DEFAULT_PADDING_CHARACTER = ' ';
+ /**
+ * Default Alignment
+ */
+ public static final String DEFAULT_ALIGNMENT= "left";
+ /**
+ * JSON key string for Field Alignment
+ */
+ public static final String FIELD_ALIGNMENT ="alignment";
+
+ public static final Logger logger = LoggerFactory.getLogger(FixedWidthSchema.class);
+ /**
+ * This holds list of {@link Field}
+ */
+ private List<Field> fields = new LinkedList<>();
+ /**
+ * This holds the padding character for the entire file
+ */
+ private char globalPadding;
+ /**
+ * This holds the global alignment
+ */
+ private String globalAlignment;
+
+ /**
+ * Constructor for FixedWidthSchema
+ */
+ public FixedWidthSchema(String json)
+ {
+ try {
+ initialize(json);
+ } catch (JSONException | IOException e) {
+ logger.error("{}", e);
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * Get the Padding character
+ * @return the padding character for the entire file
+ */
+ public char getGlobalPadding()
+ {
+ return globalPadding;
+ }
+
+ /**
+ * Set the padding character
+ * @param padding the padding character for the entire file
+ */
+ public void setGlobalPadding(char padding)
+ {
+ this.globalPadding = padding;
+ }
+
+ /**
+ * Get the global alignment
+ * @return globalAlignment the global alignment for the entire file.
+ */
+ public String getGlobalAlignment()
+ {
+ return globalAlignment;
+ }
+
+ /**
+ * Set the global alignment
+ * @param globalAlignment the global alignment for the entire file
+ */
+ public void setGlobalAlignment(String globalAlignment)
+ {
+ this.globalAlignment = globalAlignment;
+ }
+
+ /**
+ * For a given json string, this method sets the field members
+ *
+ * @param json schema as provided by the user.
+ */
+ private void initialize(String json) throws JSONException, IOException
+ {
+
+ JSONObject jo = new JSONObject(json);
+ JSONArray fieldArray = jo.getJSONArray(FIELDS);
+ if (jo.has(FIELD_PADDING_CHARACTER)) {
+ globalPadding = jo.getString(FIELD_PADDING_CHARACTER).charAt(0);
+ } else {
+ globalPadding = DEFAULT_PADDING_CHARACTER;
+ }
+ if (jo.has(FIELD_ALIGNMENT)) {
+ globalAlignment = jo.getString(FIELD_ALIGNMENT);
+ } else {
+ globalAlignment = DEFAULT_ALIGNMENT;
+ }
+
+ for (int i = 0; i < fieldArray.length(); i++) {
+ JSONObject obj = fieldArray.getJSONObject(i);
+ Field field = new Field(obj.getString(NAME),
+ obj.getString(TYPE).toUpperCase(), obj.getInt(FIELD_LENGTH));
+ if(obj.has(FIELD_PADDING_CHARACTER)) {
+ field.setPadding(obj.getString(FIELD_PADDING_CHARACTER).charAt(0));
+ } else {
+ field.setPadding(globalPadding);
+ }
+ if(obj.has(FIELD_ALIGNMENT)) {
+ field.setAlignment(obj.getString(FIELD_ALIGNMENT));
+ } else {
+ field.setAlignment(globalAlignment);
+ }
+ //Get the format if the given data type is Date
+ if (field.getType() == FieldType.DATE) {
+ if (obj.has(DATE_FORMAT)) {
+ field.setDateFormat(obj.getString(DATE_FORMAT));
+ } else {
+ field.setDateFormat(DEFAULT_DATE_FORMAT);
+ }
+
+ }
+ //Get the trueValue and falseValue if the data type is Boolean
+ if (field.getType() == FieldType.BOOLEAN) {
+ if (obj.has(TRUE_VALUE)) {
+ field.setTrueValue(obj.getString(TRUE_VALUE));
+ } else {
+ field.setTrueValue(DEFAULT_TRUE_VALUE);
+ }
+ if (obj.has(FALSE_VALUE)) {
+ field.setFalseValue(obj.getString(FALSE_VALUE));
+ } else {
+ field.setFalseValue(DEFAULT_FALSE_VALUE);
+ }
+
+ }
+ fields.add(field);
+ fieldNames.add(field.name);
+ }
+ }
+
+ /**
+ * Get the list of Fields.
+ *
+ * @return fields list of {@link Field}
+ */
+ public List<Field> getFields()
+ {
+ return Collections.unmodifiableList(fields);
+ }
+
+ /**
+ * Objects of this class represents a particular field in the schema. Each
+ * field has a name, type and a fieldLength.
+ * In case of type Date we need a dateFormat.
+ *
+ */
+ public class Field extends Schema.Field
+ {
+ /**
+ * Length of the field
+ */
+ private int fieldLength;
+ /**
+ * Parameter to specify format of date
+ */
+ private String dateFormat;
+ /**
+ * Parameter to specify true value of Boolean
+ */
+ private String trueValue;
+ /**
+ * Parameter to specify false value of Boolean
+ */
+ private String falseValue;
+ /**
+ * Parameter to specify padding
+ */
+ private char padding;
+ /**
+ * Parameter to specify alignment
+ */
+ private String alignment;
+
+ /**
+ * Constructor for Field
+ * @param name - name of the field.
+ * @param type - type of the field.
+ * @param fieldLength - length of the field.
+ */
+ public Field(String name, String type, Integer fieldLength)
+ {
+ super(name, type);
+ this.fieldLength = fieldLength;
+ this.dateFormat = DEFAULT_DATE_FORMAT;
+ this.trueValue = DEFAULT_TRUE_VALUE;
+ this.falseValue = DEFAULT_FALSE_VALUE;
+ this.padding=' ';
+ this.alignment=DEFAULT_ALIGNMENT;
+ }
+
+ /**
+ * Get the Length of the Field
+ * @return fieldLength length of the field.
+ */
+ public int getFieldLength()
+ {
+ return fieldLength;
+ }
+
+ /**
+ * Set the end pointer of the field
+ *
+ * @param fieldLength length of the field.
+ */
+ public void setFieldLength(Integer fieldLength)
+ {
+ this.fieldLength = fieldLength;
+ }
+
+ /**
+ * Get the dateFormat of the field
+ *
+ * @return dateFormat format of date given.
+ */
+ public String getDateFormat()
+ {
+ return dateFormat;
+ }
+
+ /**
+ * Set the the dateFormat of the field
+ *
+ * @param dateFormat sets the format of date.
+ */
+ public void setDateFormat(String dateFormat)
+ {
+ this.dateFormat = dateFormat;
+ }
+
+ /**
+ * Get the trueValue of the Boolean field
+ * @return trueValue gets the equivalent true value.
+ */
+ public String getTrueValue()
+ {
+ return trueValue;
+ }
+
+ /**
+ * Set the trueValue of the Boolean field
+ *
+ * @param trueValue sets the equivalent true value.
+ */
+ public void setTrueValue(String trueValue)
+ {
+ this.trueValue = trueValue;
+ }
+
+ /**
+ * Get the falseValue of the Boolean field
+ * @return falseValue gets the equivalent false value.
+ */
+ public String getFalseValue()
+ {
+ return falseValue;
+ }
+
+ /**
+ * Set the end pointer of the field
+ *
+ * @param falseValue sets the equivalent false value.
+ */
+ public void setFalseValue(String falseValue)
+ {
+ this.falseValue = falseValue;
+ }
+ /**
+ * Get the field padding
+ * @return padding gets the padding for the individual field.
+ */
+ public char getPadding()
+ {
+ return padding;
+ }
+
+ /**
+ * Set the field padding
+ * @param padding sets the padding for the individual field.
+ */
+ public void setPadding(char padding)
+ {
+ this.padding = padding;
+ }
+
+ /**
+ * Get the field alignment
+ * @return alignment gets the alignment for the individual field.
+ */
+ public String getAlignment()
+ {
+ return alignment;
+ }
+
+ /**
+ * Set the field alignment
+ * @param alignment sets the alignment for the individual field.
+ */
+ public void setAlignment(String alignment)
+ {
+ this.alignment = alignment;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Fields [name=" + name + ", type=" + type + " fieldLength= " + fieldLength + "]";
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/main/java/com/datatorrent/contrib/parser/Schema.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/Schema.java b/contrib/src/main/java/com/datatorrent/contrib/parser/Schema.java
new file mode 100644
index 0000000..c09ff92
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/Schema.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datatorrent.contrib.parser;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * <p>
+ * This is schema that defines fields and their constraints for delimited and fixedWidth files
+ * The operators use this information to validate the incoming tuples.
+ * Information from JSON schema is saved in this object and is used by the
+ * operators
+ */
+public class Schema
+{
+ /**
+ * JSON key string for fields array
+ */
+ public static final String FIELDS = "fields";
+ /**
+ * JSON key string for name of the field within fields array
+ */
+ public static final String NAME = "name";
+ /**
+ * JSON key string for type of the field within fields array
+ */
+ public static final String TYPE = "type";
+ /**
+ * JSON key string for date format constraint
+ */
+ public static final String DATE_FORMAT = "format";
+ /**
+ * JSON key string for true value constraint
+ */
+ public static final String TRUE_VALUE = "trueValue";
+ /**
+ * JSON key string for false value constraint
+ */
+ public static final String FALSE_VALUE = "falseValue";
+ /**
+ * JSON key string for default true value of boolean
+ */
+ public static final String DEFAULT_TRUE_VALUE = "true";
+ /**
+ * JSON key string for default true value of boolean
+ */
+ public static final String DEFAULT_FALSE_VALUE = "false";
+ /**
+ * Default date format
+ */
+ public static final String DEFAULT_DATE_FORMAT = "dd/mm/yy";
+ /**
+ * This holds the list of field names in the same order as in the schema
+ */
+ protected List<String> fieldNames = new LinkedList<String>();
+
+
+ /**
+ * Get the list of field names mentioned in schema
+ *
+ * @return fieldNames
+ */
+ public List<String> getFieldNames()
+ {
+ return Collections.unmodifiableList(fieldNames);
+ }
+
+ /**
+ * Supported data types
+ */
+ public enum FieldType
+ {
+ BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE
+ }
+
+ /**
+ * Objects of this class represents a particular field in the schema. Each
+ * field has a name, type and a set of associated constraints.
+ *
+ */
+ public class Field
+ {
+ /**
+ * name of the field
+ */
+ String name;
+ /**
+ * Data type of the field
+ */
+ FieldType type;
+
+ /**
+ * Parameterized Constructor
+ * @param name name of the field.
+ * @param type data type of the field.
+ */
+ public Field(String name, String type)
+ {
+ this.name = name;
+ this.type = FieldType.valueOf(type.toUpperCase());
+ }
+
+ /**
+ * Default Constructor
+ */
+ public Field()
+ {
+ }
+
+ /**
+ * Get the name of the field
+ *
+ * @return name
+ */
+ public String getName()
+ {
+ return name;
+ }
+
+ /**
+ * Set the name of the field
+ *
+ * @param name
+ */
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ /**
+ * Get {@link FieldType}
+ *
+ * @return type
+ */
+ public FieldType getType()
+ {
+ return type;
+ }
+
+ /**
+ * Set {@link FieldType}
+ *
+ * @param type
+ */
+ public void setType(FieldType type)
+ {
+ this.type = type;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Fields [name=" + name + ", type=" + type + "]";
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/test/java/com/datatorrent/contrib/parser/FixedWidthTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/parser/FixedWidthTest.java b/contrib/src/test/java/com/datatorrent/contrib/parser/FixedWidthTest.java
new file mode 100644
index 0000000..bc72efd
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/parser/FixedWidthTest.java
@@ -0,0 +1,526 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.parser;
+
+import java.util.Date;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+public class FixedWidthTest
+{
+
+ private static final String filename = "FixedWidthSchema.json";
+ @Rule
+ public Watcher watcher = new Watcher();
+ CollectorTestSink<Object> error = new CollectorTestSink<Object>();
+ CollectorTestSink<Object> objectPort = new CollectorTestSink<Object>();
+ CollectorTestSink<Object> pojoPort = new CollectorTestSink<Object>();
+ FixedWidthParser parser = new FixedWidthParser();
+
+ /*
+ * adId,campaignId,adName,bidPrice,startDate,endDate,securityCode,isActive,isOptimized,parentCampaign,weatherTargeted,valid
+ * e.g. in csv 1234,98233,adxyz,0.2,2015-03-08 03:37:12,11/12/2012,12,y,OPTIMIZE,CAMP_AD,Y,yes
+ * 120982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes
+ * Constraints are defined in FixedWidthSchema.json
+ */
+ @Test
+ public void TestParserValidInput()
+ {
+ String input = "120982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes ";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, objectPort.collectedTuples.size());
+ Assert.assertEquals(1, pojoPort.collectedTuples.size());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ Object obj = pojoPort.collectedTuples.get(0);
+ Ad adPojo = (Ad)obj;
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(Ad.class, obj.getClass());
+
+ Assert.assertEquals(12, adPojo.getAdId());
+ Assert.assertTrue("adxyz".equals(adPojo.getAdName()));
+ Assert.assertEquals(0.2, adPojo.getBidPrice(), 0.0);
+ Assert.assertEquals(Date.class, adPojo.getStartDate().getClass());
+ Assert.assertEquals(Date.class, adPojo.getEndDate().getClass());
+ Assert.assertEquals(12, adPojo.getSecurityCode());
+ Assert.assertTrue("CAMP_AD123".equals(adPojo.getParentCampaign()));
+ Assert.assertTrue("yes".equals(adPojo.getValid()));
+ Assert.assertTrue(adPojo.isActive());
+ Assert.assertFalse(adPojo.isOptimized());
+ }
+
+ @Test
+ public void TestParserValidInputPojoPortNotConnected()
+ {
+ parser.out.setSink(null);
+ String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes ";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserValidInputClassNameNotProvided()
+ {
+ parser.setClazz(null);
+ String input = "120982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes ";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserInvalidAdIdInput()
+ {
+ String input = "1c2982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes ";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertEquals(input, errorTuple.getKey());
+ }
+
+ @Test
+ public void TestParserNoCampaignIdInput()
+ {
+ String input = "123 _____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes ";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, objectPort.collectedTuples.size());
+ Assert.assertEquals(1, pojoPort.collectedTuples.size());
+ Object obj = pojoPort.collectedTuples.get(0);
+ Assert.assertNotNull(obj);
+ Assert.assertEquals(Ad.class, obj.getClass());
+ Assert.assertEquals(0, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserInvalidCampaignIdInput()
+ {
+ String input = "1239c2_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes ";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertEquals(input, errorTuple.getKey());
+ }
+
+ @Test
+ public void TestParserInvalidBidPriceInput()
+ {
+ String input = "123982_____adxyz0..2015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes ";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertEquals(input, errorTuple.getKey());
+ }
+
+ @Test
+ public void TestParserInvalidStartDateInput()
+ {
+ String input = "123982_____adxyz0.22015-31-13 03:37:1211/12/201212___y_CAMP_AD123Yyes ";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertEquals(input, errorTuple.getKey());
+ }
+
+ @Test
+ public void TestParserInvalidSecurityCodeInput()
+ {
+ String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212b__y_CAMP_AD123Yyes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertEquals(input, errorTuple.getKey());
+ }
+
+ @Test
+ public void TestParserInvalidisActiveInput()
+ {
+ String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___p_CAMP_AD123Yyes";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertEquals(input, errorTuple.getKey());
+ }
+
+ @Test
+ public void TestParserNullOrBlankInput()
+ {
+ parser.beginWindow(0);
+ parser.in.process(null);
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ }
+
+ @Test
+ public void TestParserHeaderAsInput()
+ {
+ parser.setHeader("aIdcIdadName bidstartDate endDate sCodeBBparentCampWvalid ");
+ String input = "aIdcIdadName bidstartDate endDate sCodeBBparentCampWvalid ";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertEquals(input, errorTuple.getKey());
+ }
+
+ @Test
+ public void TestParserShorterRecord()
+ {
+ String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes ";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertEquals(input, errorTuple.getKey());
+ }
+
+ @Test
+ public void TestParserShorterRecordOnlyPOJOPortConnected()
+ {
+ String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes ";
+ parser.parsedOutput.setSink(null);
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertEquals(input, errorTuple.getKey());
+ }
+
+ @Test
+ public void TestParserLongerRecord()
+ {
+
+ String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes Extra_stuff";
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertEquals(input, errorTuple.getKey());
+ }
+
+ @Test
+ public void TestParserLongerRecordOnlyPOJOPortConnected()
+ {
+ String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes Extra_stuff";
+ parser.parsedOutput.setSink(null);
+ parser.beginWindow(0);
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, objectPort.collectedTuples.size());
+ Assert.assertEquals(0, pojoPort.collectedTuples.size());
+ Assert.assertEquals(1, error.collectedTuples.size());
+ KeyValPair<String, String> errorTuple = (KeyValPair<String, String>)error.collectedTuples.get(0);
+ Assert.assertEquals(input, errorTuple.getKey());
+ }
+
+ @Test
+ public void TestParserValidInputMetricVerification()
+ {
+ parser.beginWindow(0);
+ Assert.assertEquals(0, parser.getParsedOutputCount());
+ Assert.assertEquals(0, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes ";
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, parser.getParsedOutputCount());
+ Assert.assertEquals(1, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(1, parser.getEmittedObjectCount());
+ }
+
+ @Test
+ public void TestParserInvalidInputMetricVerification()
+ {
+ parser.beginWindow(0);
+ Assert.assertEquals(0, parser.getParsedOutputCount());
+ Assert.assertEquals(0, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ parser.in.process("123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes Extra_stuff"
+ .getBytes());
+ parser.endWindow();
+ Assert.assertEquals(0, parser.getParsedOutputCount());
+ Assert.assertEquals(1, parser.getIncomingTuplesCount());
+ Assert.assertEquals(1, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ }
+
+ @Test
+ public void TestParserValidInputMetricResetCheck()
+ {
+ parser.beginWindow(0);
+ Assert.assertEquals(0, parser.getParsedOutputCount());
+ Assert.assertEquals(0, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ String input = "123982_____adxyz0.22015-03-08 03:37:1211/12/201212___y_CAMP_AD123Yyes ";
+ parser.in.process(input.getBytes());
+ parser.endWindow();
+ Assert.assertEquals(1, parser.getParsedOutputCount());
+ Assert.assertEquals(1, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(1, parser.getEmittedObjectCount());
+ parser.beginWindow(1);
+ Assert.assertEquals(0, parser.getParsedOutputCount());
+ Assert.assertEquals(0, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(0, parser.getEmittedObjectCount());
+ parser.in.process(input.getBytes());
+ Assert.assertEquals(1, parser.getParsedOutputCount());
+ Assert.assertEquals(1, parser.getIncomingTuplesCount());
+ Assert.assertEquals(0, parser.getErrorTupleCount());
+ Assert.assertEquals(1, parser.getEmittedObjectCount());
+ parser.endWindow();
+ }
+
+ public static class Ad
+ {
+
+ private int adId;
+ private int campaignId;
+ private String adName;
+ private double bidPrice;
+ private Date startDate;
+ private Date endDate;
+ private long securityCode;
+ private boolean active;
+ private boolean optimized;
+ private String parentCampaign;
+ private Character weatherTargeted;
+ private String valid;
+
+ public Ad()
+ {
+
+ }
+
+ public int getAdId()
+ {
+ return adId;
+ }
+
+ public void setAdId(int adId)
+ {
+ this.adId = adId;
+ }
+
+ public int getCampaignId()
+ {
+ return campaignId;
+ }
+
+ public void setCampaignId(int campaignId)
+ {
+ this.campaignId = campaignId;
+ }
+
+ public String getAdName()
+ {
+ return adName;
+ }
+
+ public void setAdName(String adName)
+ {
+ this.adName = adName;
+ }
+
+ public double getBidPrice()
+ {
+ return bidPrice;
+ }
+
+ public void setBidPrice(double bidPrice)
+ {
+ this.bidPrice = bidPrice;
+ }
+
+ public Date getStartDate()
+ {
+ return startDate;
+ }
+
+ public void setStartDate(Date startDate)
+ {
+ this.startDate = startDate;
+ }
+
+ public Date getEndDate()
+ {
+ return endDate;
+ }
+
+ public void setEndDate(Date endDate)
+ {
+ this.endDate = endDate;
+ }
+
+ public long getSecurityCode()
+ {
+ return securityCode;
+ }
+
+ public void setSecurityCode(long securityCode)
+ {
+ this.securityCode = securityCode;
+ }
+
+ public boolean isActive()
+ {
+ return active;
+ }
+
+ public void setActive(boolean active)
+ {
+ this.active = active;
+ }
+
+ public boolean isOptimized()
+ {
+ return optimized;
+ }
+
+ public void setOptimized(boolean optimized)
+ {
+ this.optimized = optimized;
+ }
+
+ public String getParentCampaign()
+ {
+ return parentCampaign;
+ }
+
+ public void setParentCampaign(String parentCampaign)
+ {
+ this.parentCampaign = parentCampaign;
+ }
+
+ public Character getWeatherTargeted()
+ {
+ return weatherTargeted;
+ }
+
+ public void setWeatherTargeted(Character weatherTargeted)
+ {
+ this.weatherTargeted = weatherTargeted;
+ }
+
+ public String getValid()
+ {
+ return valid;
+ }
+
+ public void setValid(String valid)
+ {
+ this.valid = valid;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Ad [adId=" + adId + ", campaignId=" + campaignId + ", adName=" + adName + ", bidPrice=" + bidPrice
+ + ", startDate=" + startDate + ", endDate=" + endDate + ", securityCode=" + securityCode + ", active="
+ + active + ", optimized=" + optimized + ", parentCampaign=" + parentCampaign + ", weatherTargeted="
+ + weatherTargeted + ", valid=" + valid + "]";
+ }
+ }
+
+ public class Watcher extends TestWatcher
+ {
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ parser.setClazz(Ad.class);
+ parser.setJsonSchema(SchemaUtils.jarResourceFileToString(filename));
+ parser.setup(null);
+ parser.activate(null);
+ parser.err.setSink(error);
+ parser.parsedOutput.setSink(objectPort);
+ parser.out.setSink(pojoPort);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ super.finished(description);
+ error.clear();
+ objectPort.clear();
+ pojoPort.clear();
+ parser.teardown();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0852c659/contrib/src/test/resources/FixedWidthSchema.json
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/FixedWidthSchema.json b/contrib/src/test/resources/FixedWidthSchema.json
new file mode 100644
index 0000000..38d7cc7
--- /dev/null
+++ b/contrib/src/test/resources/FixedWidthSchema.json
@@ -0,0 +1,78 @@
+{
+ "padding": "_",
+ "alignment": "left",
+ "fields": [
+ {
+ "name": "adId",
+ "type": "Integer",
+ "length": "3",
+ "padding": "0"
+ },
+ {
+ "name": "campaignId",
+ "type": "Integer",
+ "length": "3",
+ "padding": " "
+
+ },
+ {
+ "name": "adName",
+ "type": "String",
+ "length": "10",
+ "alignment":"right"
+ },
+ {
+ "name": "bidPrice",
+ "type": "Double",
+ "length": "3"
+ },
+ {
+ "name": "startDate",
+ "type": "Date",
+ "format": "yyyy-MM-dd HH:mm:ss",
+ "length": "19"
+
+ },
+ {
+ "name": "endDate",
+ "type": "Date",
+ "format": "dd/MM/yyyy",
+ "length": "10"
+ },
+ {
+ "name": "securityCode",
+ "type": "Long",
+ "length": "5"
+ },
+ {
+ "name": "active",
+ "type": "Boolean",
+ "length": "1",
+ "trueValue": "y",
+ "falseValue": "n"
+ },
+ {
+ "name": "optimized",
+ "type": "Boolean",
+ "length": "1",
+ "trueValue": "y",
+ "falseValue": "n"
+ },
+ {
+ "name": "parentCampaign",
+ "type": "String",
+ "length": "10"
+ },
+ {
+ "name": "weatherTargeted",
+ "type": "Character",
+ "length": "1"
+ },
+ {
+ "name": "valid",
+ "type": "String",
+ "length": "10",
+ "padding":" "
+ }
+ ]
+}