You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sh...@apache.org on 2017/01/16 09:48:39 UTC
apex-malhar git commit: APEXMALHAR-2183 Csv formatter documetation
Repository: apex-malhar
Updated Branches:
refs/heads/master 4ab457f18 -> 0885bfad2
APEXMALHAR-2183 Csv formatter documetation
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0885bfad
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0885bfad
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0885bfad
Branch: refs/heads/master
Commit: 0885bfad2d1f35287b8a319948cb897f371b9a2e
Parents: 4ab457f
Author: venkateshDT <ve...@datatorrent.com>
Authored: Tue Jan 10 00:43:44 2017 -0800
Committer: venkateshDT <ve...@datatorrent.com>
Committed: Mon Jan 16 01:24:53 2017 -0800
----------------------------------------------------------------------
docs/operators/csvformatter.md | 115 ++++++++++++++++++++++++++++++++++++
mkdocs.yml | 2 +-
2 files changed, 116 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0885bfad/docs/operators/csvformatter.md
----------------------------------------------------------------------
diff --git a/docs/operators/csvformatter.md b/docs/operators/csvformatter.md
new file mode 100644
index 0000000..6b35d8c
--- /dev/null
+++ b/docs/operators/csvformatter.md
@@ -0,0 +1,115 @@
+CsvFormatter
+============
+
+## Operator Objective
+This operator receives a POJO ([Plain Old Java Object](https://en.wikipedia.org/wiki/Plain_Old_Java_Object)) as an incoming tuple, converts the data in
+the incoming POJO to a custom delimited string and emits the delimited string.
+
+CsvFormatter supports schema definition as a JSON string.
+
+CsvFormatter does not hold any state and is **idempotent**, **fault-tolerant** and **statically/dynamically partitionable**.
+
+## Operator Information
+1. Operator location: ***malhar-contrib***
+2. Available since: ***3.2.0***
+3. Operator state: ***Evolving***
+3. Java Packages:
+ * Operator: ***[com.datatorrent.contrib.formatter.CsvFormatter](https://www.datatorrent.com/docs/apidocs/com/datatorrent/contrib/formatter/CsvFormatter.html)***
+
+## Properties, Attributes and Ports
+### <a name="props"></a>Properties of POJOEnricher
+| **Property** | **Description** | **Type** | **Mandatory** | **Default Value** |
+| -------- | ----------- | ---- | ------------------ | ------------- |
+| *schema* | Contents of the schema.Schema is specified in a json format. | String | Yes | N/A |
+
+
+### Platform Attributes that influences operator behavior
+| **Attribute** | **Description** | **Type** | **Mandatory** |
+| -------- | ----------- | ---- | ------------------ |
+| *in.TUPLE_CLASS* | TUPLE_CLASS attribute on input port which tells operator the class of POJO which will be incoming | Class or FQCN| Yes |
+
+
+### Ports
+| **Port** | **Description** | **Type** | **Mandatory** |
+| -------- | ----------- | ---- | ------------------ |
+| *in* | Tuples which need to be formatted are received on this port | Object (POJO) | Yes |
+| *out* | Tuples that are formatted are emitted from this port | String | No |
+| *err* | Tuples that could not be converted are emitted on this port | Object | No |
+
+## Limitations
+Current CsvFormatter contain following limitations:
+
+1. The field names in schema and the pojo field names should match.For eg. if name of the schema field is "customerName", then POJO should contain a field with the same name.
+2. Field wise validation/formatting is not yet supported.
+3. The fields will be written to the file in the same order as specified in schema.json
+
+## Example
+Example for CsvFormatter can be found at: [https://github.com/DataTorrent/examples/tree/master/tutorials/csvformatter](https://github.com/DataTorrent/examples/tree/master/tutorials/csvformatter)
+
+## Advanced
+
+### <a name="JSONFileFormat"></a> Schema format for CsvFormatter
+CsvFormatter expects schema to be a String in JSON format:
+
+
+Example for format of schema:
+```json
+{
+ "separator": ",",
+ "quoteChar": "\"",
+ "lineDelimiter": "\n",
+ "fields": [
+ {
+ "name": "campaignId",
+ "type": "Integer"
+ },
+ {
+ "name": "startDate",
+ "type": "Date",
+ "constraints": {
+ "format": "yyyy-MM-dd"
+ }
+ }
+ ]
+}
+```
+
+
+### Partitioning of CsvFormatter
+Being stateless operator, CsvFormatter will ensure built-in partitioners present in Malhar library can be directly used by setting properties as follows:
+
+#### Stateless partioning of CsvFormatter
+Stateless partitioning will ensure that CsvFormatter will be partitioned right at the start of the application and will remain partitioned throughout the lifetime of the DAG.
+CsvFormatter can be stateless partitioned by adding following lines to properties.xml:
+
+```xml
+ <property>
+ <name>dt.operator.{OperatorName}.attr.PARTITIONER</name>
+ <value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value>
+ </property>
+```
+
+where {OperatorName} is the name of the CsvFormatter operator.
+Above lines will partition CsvFormatter statically 2 times. Above value can be changed accordingly to change the number of static partitions.
+
+
+#### Dynamic Partitioning of CsvFormatter
+Dynamic partitioning is a feature of Apex platform which changes the partition of the operator based on certain conditions.
+CsvFormatter can be dynamically partitioned using below out-of-the-box partitioner:
+
+##### Throughput based
+Following code can be added to populateDAG method of application to dynamically partition CsvFormatter:
+```java
+ StatelessThroughputBasedPartitioner<CsvFormatter> partitioner = new StatelessThroughputBasedPartitioner<>();
+ partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));
+ partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));
+ partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));
+ dag.setAttribute(csvFormatter, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));
+ dag.setAttribute(csvFormatter, OperatorContext.PARTITIONER, partitioner);
+```
+
+Above code will dynamically partition CsvFormatter when throughput changes.
+If overall throughput of CsvFormatter goes beyond 30000 or less than 10000, the platform will repartition CsvFormatter
+to balance throughput of a single partition to be between 10000 and 30000.
+CooldownMillis of 10000 will be used as threshold time for which throughput change is observed.
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0885bfad/mkdocs.yml
----------------------------------------------------------------------
diff --git a/mkdocs.yml b/mkdocs.yml
index d19cb7c..6a17cc5 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -19,4 +19,4 @@ pages:
- Json Formatter: operators/jsonFormatter.md
- Transform Operator: operators/transform.md
- Xml Parser: operators/xmlParserOperator.md
-
+ - Csv Formatter: operators/csvformatter.md