You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sh...@apache.org on 2016/11/09 19:41:07 UTC

apex-malhar git commit: APEXMALHAR-2232: Add documentation for csv parser

Repository: apex-malhar
Updated Branches:
  refs/heads/master 0870480a4 -> 18e49df62


APEXMALHAR-2232: Add documentation for csv parser


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/18e49df6
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/18e49df6
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/18e49df6

Branch: refs/heads/master
Commit: 18e49df622c35547944bf1a5d9a14315661d6515
Parents: 0870480
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Fri Sep 9 16:09:31 2016 +0530
Committer: Priyanka Gugale <pr...@apache.org>
Committed: Wed Nov 9 17:26:09 2016 +0530

----------------------------------------------------------------------
 docs/operators/csvParserOperator.md           | 165 +++++++++++++++++++++
 docs/operators/images/csvParser/CSVParser.png | Bin 0 -> 47140 bytes
 mkdocs.yml                                    |   1 +
 3 files changed, 166 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/18e49df6/docs/operators/csvParserOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/csvParserOperator.md b/docs/operators/csvParserOperator.md
new file mode 100644
index 0000000..311c6bb
--- /dev/null
+++ b/docs/operators/csvParserOperator.md
@@ -0,0 +1,165 @@
+Csv Parser Operator
+=============
+
+## Operator Objective
+This operator is designed to parse delimited records and construct a map or concrete java class also known as ["POJO"](https://en.wikipedia.org/wiki/Plain_Old_Java_Object) out of it. User need to provide the schema to describe the delimited data. Based on schema definition the operator will parse the incoming record to object map and POJO.  User can also provide constraints if any, in the schema. The supported constraints are listed in [constraints table](#constraints). The incoming record will be validated against those constraints. Valid records will be emitted as POJO / map while invalid ones are emitted on error port with error message.
+
+**Note**: field names of POJO must match field names in schema and in the same order as it appears in the incoming data.
+
+## Overview
+The operator is **idempotent**, **fault-tolerant** and **partitionable**.
+
+## Class Diagram
+![](images/csvParser/CSVParser.png)
+
+## Operator Information
+1. Operator location:**_malhar-contrib_**
+2. Available since:**_3.2.0_**
+3. Operator state:**_Evolving_**
+4. Java Package:[com.datatorrent.contrib.parser.CsvParser](https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java)
+
+
+## <a name="props"></a>Properties of Csv Parser
+User need to set the schema which describes delimited data as well as specifies constraints on values if any.
+e.g.
+
+```xml
+{
+  "separator":",",
+  "quoteChar":"\"",
+  "fields":[
+    {
+      "name":"adId",
+      "type":"Integer",
+      "constraints":{
+         "required":"true"
+      }
+    },
+    {
+      "name":"adName",
+      "type":"String",
+      "constraints":{
+         "required":"true",
+         "pattern":"[a-z].*[a-z]$",
+         "maxLength":"10"
+      }
+    },
+    {
+      "name":"bidPrice",
+      "type":"Double",
+      "constraints":{
+         "required":"true",
+         "minValue":"0.1",
+         "maxValue":"3.2"
+      }
+    },
+    {
+      "name":"startDate",
+      "type":"Date",
+      "constraints":{
+         "format":"yyyy-MM-dd HH:mm:ss"
+      }
+    }
+  ]
+}
+```
+
+| **Property** | **Description** | **Type** | **Mandatory** | **Default Value** |
+| -------- | ----------- | ---- | ------------------ | ------------- |
+| *schema* | [Schema](https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java)  describing delimited data | String | YES | N/A |
+ 
+## Platform Attributes that influences operator behavior
+
+| **Attribute** | **Description** | **Type** | **Mandatory** |
+| -------- | ----------- | ---- | ------------------ |
+| *out.TUPLE_CLASS* | TUPLE_CLASS attribute on output port which tells operator the class of POJO which need to be emitted | Class| Yes |
+ 
+## <a name="dataTypes"></a>Supported DataTypes in Schema
+  * Integer
+  * Long
+  * Double
+  * Character
+  * String
+  * Boolean
+  * Date
+  * Float
+
+## <a name="constraints"></a>Schema Constraints 
+|**DataType**| **Constraints** | **Description** |
+| -------- | -------- | -------- |
+| *All data Types* | required | If mentioned, indicates that the data type constraints that follow are required. It cannot be blank/null. It may or may not satisfy other constraints ( like equals/minVal/maxVal etc ) |
+| *All data Types* | equals | If mentioned, indicates that the data string or value declared in the data type constraints must be an exact match with the specified value. `Note: This constraints is not applicable for data type boolean and date` |
+| *String* | Length | The string must be of the length that is specified. |
+| *String* | minLength | The string is at least the length specified as minLength value. |
+| *String* | maxLength | The string can be at the most the length specified as maxLength value. |
+| *String* | pattern | The string must match the specified regular expression. |
+| *Long* | maxValue | The numeric can be at the most the value specified as maxValue. |
+| *Long* | minValue | The numeric is at least the value specified as minValue. |
+| *Double* | maxValue | The numeric can be at the most the value specified as maxValue. |
+| *Double* | minValue | The numeric is at least the value specified as minValue. |
+| *Float* | maxValue | The numeric can be at the most the value specified as maxValue. |
+| *Float* | minValue | The numeric is at least the value specified as minValue. |
+| *Integer* | maxValue | The numeric can be at the most the value specified as maxValue. |
+| *Integer* | minValue | The numeric is at least the value specified as minValue. |
+| *Date* | format | A simple date format as specified in the SimpleDateFormat class: http://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html?is-external=true |
+| *Boolean* | trueValue | String for which boolean value is true. The default values are: true, 1, y, and t. `Note: If you specify trueValue, you must also specify falseValue.` |
+| *Boolean* | falseValue | String for which boolean value is false. The default values are: false, 0, n, and f. `Note: If you specify falseValue, you must also specify trueValue.` |
+
+ 
+## Ports
+
+| **Port** | **Description** | **Type** | **Mandatory** |
+| -------- | ----------- | ---- | ------------------ |
+| *in* | Tuples that needs to be parsed are recieved on this port | byte[] | Yes |
+| *out* | Valid Tuples that are emitted as pojo | Object (POJO) | No |
+| *parsedOutput* | Valid Tuples that are emitted as map | Map | No |
+| *err* | Invalid Tuples are emitted with error message | KeyValPair <String, String\> | No |
+
+## Partitioning
+CSV Parser is both statically and dynamically partitionable.
+### Static Partitioning
+This can be achieved in 2 ways
+
+1. Specifying the partitioner and number of partitions in the populateDAG() method
+  ```java
+  CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class);
+  StatelessPartitioner<CsvParser> partitioner1 = new StatelessPartitioner<CsvParser>(2);
+  dag.setAttribute(csvParser, Context.OperatorContext.PARTITIONER, partitioner1);
+  ```
+
+2. Specifying the partitioner in properties file.
+  ```xml
+   <property>
+     <name>dt.operator.{OperatorName}.attr.PARTITIONER</name>
+     <value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value>
+   </property>
+  ```
+
+where {OperatorName} is the name of the CsvParser operator.
+ Above lines will partition CsvParser statically 2 times. Above value can be changed accordingly to change the number of static partitions.
+
+
+### Dynamic Paritioning
+
+CsvParser can be dynamically partitioned using out-of-the-box partitioner:
+
+#### Throughput based
+Following code can be added to populateDAG method of application to dynamically partition CsvParser:
+```java
+CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class);
+StatelessThroughputBasedPartitioner<CsvParser> partitioner = new StatelessThroughputBasedPartitioner<>();
+partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));
+partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));
+partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));
+dag.setAttribute(csvParser, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));
+dag.setAttribute(csvParser, OperatorContext.PARTITIONER, partitioner);
+```
+
+Above code will dynamically partition csvParser when the throughput changes.
+If the overall throughput of csvParser goes beyond 30000 or less than 10000, the platform will repartition CsvParser
+to balance throughput of a single partition to be between 10000 and 30000.
+CooldownMillis of 10000 will be used as the threshold time for which the throughput change is observed.
+
+## Example
+Example for Csv Parser can be found at: [https://github.com/DataTorrent/examples/tree/master/tutorials/parser](https://github.com/DataTorrent/examples/tree/master/tutorials/parser)
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/18e49df6/docs/operators/images/csvParser/CSVParser.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/csvParser/CSVParser.png b/docs/operators/images/csvParser/CSVParser.png
new file mode 100644
index 0000000..523ba0b
Binary files /dev/null and b/docs/operators/images/csvParser/CSVParser.png differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/18e49df6/mkdocs.yml
----------------------------------------------------------------------
diff --git a/mkdocs.yml b/mkdocs.yml
index 9226703..3dd0137 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -9,6 +9,7 @@ pages:
     - File Splitter: operators/file_splitter.md
     - Block Reader: operators/block_reader.md
     - File Input: operators/fsInputOperator.md
+    - Csv Parser: operators/csvParserOperator.md
     - File Output: operators/file_output.md
     - Enricher: operators/enricher.md
     - Filter: operators/filter.md