You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ra...@apache.org on 2016/09/20 15:55:42 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2184: Add documentation for File Input Operator

Repository: apex-malhar
Updated Branches:
  refs/heads/master 0c4b3fce2 -> 36582fff2


APEXMALHAR-2184: Add documentation for File Input Operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/17515e30
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/17515e30
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/17515e30

Branch: refs/heads/master
Commit: 17515e308a1ac5a9bc889e8626733e785bc0c060
Parents: b6c48bb
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Wed Aug 10 18:09:24 2016 +0530
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Mon Sep 19 12:12:46 2016 +0530

----------------------------------------------------------------------
 docs/operators/fsInputOperator.md               | 339 +++++++++++++++++++
 .../images/fsInput/operatorsClassDiagram.png    | Bin 0 -> 71104 bytes
 2 files changed, 339 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/17515e30/docs/operators/fsInputOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/fsInputOperator.md b/docs/operators/fsInputOperator.md
new file mode 100644
index 0000000..3449216
--- /dev/null
+++ b/docs/operators/fsInputOperator.md
@@ -0,0 +1,339 @@
+File Input Operator
+=============
+
+## Operator Objective
+This operator is designed to scan a directory for files, read and split file content into tuples
+such as lines or a block of bytes, and finally emit them on output ports defined in concrete
+subclasses for further processing by downstream operators.
+It can be used with any filesystem supported by Hadoop like HDFS, S3, ftp, NFS etc.
+
+## Overview
+The operator is **idempotent**, **fault-tolerant** and **partitionable**.
+
+Logic for directory scanning is encapsulated in the `DirectoryScanner` static inner class
+which provides functions such as matching file names against a regular expression, tracking files
+that have already been processed (so that they are not processed again), filtering files based
+on the hashcode of the file names in the presence of partitioning so that each file is
+processed by a unique partition. This class can be extended if necessary to provide
+additional capabilities such as scanning multiple directories.
+
+It tracks the current file offset as part of checkpoint state. It it fails and is restarted
+by the platform, it will seek to the saved offset to avoid duplicate processing. Exactly once processing
+for fault tolerance is handled using window data manager. For more details check the blog about [Fault-Tolerant File Processing](#https://www.datatorrent.com/blog/fault-tolerant-file-processing/).
+It supports both static and dynamic partitioning.
+
+## Use Cases
+This operator is suitable for use in an environment where small to medium sized files are
+deposited in a specific directory on a regular basis. For very large files a better alternative
+is the `FileSplitter` and `BlockReader` combination since they allow such files to be processed
+by multiple partitions to achieve higher throughput. Additionally, files which are continually
+modified by other processes are not suitable for processing with this operator since they may
+yield unpredictable results.
+
+## How to Use?
+The tuple type in the abstract class is a generic parameter.
+Concrete subclasses need to choose an appropriate class (such as `String` or `byte[]`) for the
+generic parameter and also implement a couple of abstract methods: `readEntity()` to read
+the next tuple from the currently open file and `emit()` to process the next tuple.
+
+In principle, no ports need be defined in the rare case that the operator simply writes
+tuples to some external sink or merely maintains aggregated statistics. But in most common
+scenarios, the tuples need to be sent to one or more downstream operators for additional
+processing such as parsing, enrichment or aggregation; in such cases, appropriate
+output ports are defined and the `emit()` implementation dispatches tuples to the
+desired output ports.
+
+A simple concrete implementation is provided in Malhar: `LineByLineFileInputOperator`.
+It uses `String` for the generic parameter, defines a single output port and processes each
+line of the input file as a tuple. It is discussed further below.
+
+## Partitioning
+#### Static Partitioning
+Configure parameter `partitionCount` to define the desired number of initial partitions
+(4 in this example).
+
+```xml
+<property>
+  <name>dt.operator.{OperatorName}.prop.partitionCount</name>
+  <value>4</value>
+</property>
+```
+
+where _{OperatorName}_ is the name of the input operator.
+
+#### Dynamic Partitioning
+Dynamic partitioning -- changing the number of partitions of one or more operators
+in a running application -- can be achieved in multiple ways:
+- Use the command line tool `apex` or the UI console to change the value of the
+  `partitionCount` property of the running operator. This change is detected in
+  `processStats()` (which is invoked periodically by the platform) where, if the
+  current partition count (`currentPartitions`) and the desired partition count
+  (`partitionCount`) differ, the `repartitionRequired` flag in the response is set.
+  This causes the platform to invoke `definePartitions()` to create a new set of
+  partitions with the desired count.
+- Override `processStats()` and within it, based on the statistics in the
+  incoming parameter or any other factors, define a new desired value of
+  `partitionCount` and finally, if this value differs from the current partition
+  count, set the `repartitionRequired` flag in the response.
+
+The details of actually creating the new set of partitions can be customized by overriding
+the `definePartitions()` method. There are a couple of things to keep in mind when doing this.
+The first is that repartitioning needs some care when the operator has state (as is the
+case here): Existing state from current operator partitions needs to redistributed to the
+new partitions in a logically consistent way. The second is that some or all of the
+current set of partitions, which is an input parameter to `definePartitions()`, can be
+copied over to the new set; such partitions will continue running and will not be
+restarted. Any existing partitions that are not present in the new set will be shutdown.
+The current re-partitioning logic does not preserve any existing partitions, so upon
+a repartition event, all existing partitions are shutdown and the new ones started.
+
+## Operator Information
+1. Operator location: ***malhar-library***
+2. Available since: ***1.0.2***
+3. Operator state: ***Stable***
+3. Java Packages:
+    * Operator: ***[com.datatorrent.lib.io.fs.AbstractFileInputOperator](https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/io/fs/AbstractFileInputOperator.html)***
+
+### AbstractFileInputOperator
+This is the abstract implementation that, as noted above, scans a single directory.
+It can be extended to modify functionality or add new capabilities. For example, the
+directory scanner can be overriden to monitor multiple directories. [This](#https://github.com/DataTorrent/examples/tree/master/tutorials/fileIO-multiDir) example demostrates how to do that.
+As noted in the overview above, this class has no ports, so concrete subclasses will need to
+provide them if necessary.
+
+![AbstractFileInputOperator.png](images/fsInput/operatorsClassDiagram.png)
+
+### <a name="AbstractFileInputOperatorProps"></a>Properties of AbstractFileInputOperator
+Several properties are available to configure the behavior of this operator and they are
+summarized in the table below. Of these, only `directory` is required: it specifies
+the path of the monitored directory. It can be set like this:
+
+```xml
+<property>
+  <name>dt.operator.{OperatorName}.prop.directory</name>
+  <value>/tmp/fileInput</value>
+</property>
+```
+
+ If new files appear with high frequency in this directory
+and they need to be processed as soon as they appear, reduce the value of `scanIntervalMillis`;
+if they appear rarely or if some delay in processing a new file is acceptable, increase it.
+Obviously, smaller values will result in greater IO activity for the corresponding filesystem.
+
+The platform invokes the `emitTuples()` callback multiple time in each streaming window; within
+a single such call, if a large number of tuples are emitted, there is some risk that they
+may overwhelm the downstream operators especially if they are performing some compute intensive
+operation. For such cases, output can be throttled by reducing the value of the
+`emitBatchSize` property. Conversely, if the downstream operators can handle the load, increase
+the value to enhance throughput.
+
+The `partitionCount` parameter has already been discussed above.
+
+Occasionally, some files get into a bad state and cause errors when an attempt is made to
+read from them. The causes vary depending on the filesystem type ranging from corrupted
+filesystems to network issues. In such cases, the operator will retry reading from such
+files a limited number of times before blacklisting those files. This retry count is
+defined by the `maxRetryCount` property.
+
+Finally, the specific scanner class used to monitor the input directories can be configured
+by setting the `scanner` property.
+
+| **Property** | **Description** | **Type** | **Mandatory** | **Default Value** |
+| -------- | ----------- | ---- | ------------------ | ------------- |
+| *directory* | absolute path of directory to be scanned | String | Yes | N/A |
+| *scanIntervalMillis* | Interval in milliseconds after which directory should be scanned for new files | int | No | 5000 |
+| *emitBatchSize* | Maximum number of tuples to emit in a single call to the `emitTuples()` callback (see explanation above). | int | No | 1000 |
+| *partitionCount* | Desired number of partitions | int | No | 1 |
+| *maxRetryCount* | Maximum number of times the operator will attempt to process a file | int |No | 5 |
+| *scanner* | Scanner to scan new files in directory | [DirectoryScanner](#DirectoryScanner) | No | DirectoryScanner |
+
+#### <a name="DirectoryScanner"></a>Properties of DirectoryScanner
+The directory scanner has one optional property: a regular expression to filter files
+of interest. If absent, all files in the source directory are processed. It can be
+set like this:
+
+```xml
+<property>
+  <name>dt.operator.{OperatorName}.prop.scanner.filePatternRegexp</name>
+  <value>/tmp/fileInput</value>
+</property>
+```
+
+| **Property** | **Description** | **Type** | **Mandatory** | **Default Value** |
+| -------- | ----------- | ---- | ------------------ | ------------- |
+| *filePatternRegexp* | regex to select files from input directory | String | No | N/A |
+
+
+### Ports
+This operator has no ports.
+
+## Abstract Methods
+As described above, concrete subclasses need to provide implementations for these two
+methods:
+
+```java
+void emit(T tuple);
+T readEntity();
+```
+
+Examples of implementations are in the `LineByLineFileInputOperator` operator and also in
+the example at the end of this guide.
+
+## Derived Classes
+
+### 1. AbstractFTPInputOperator
+The class is used to read files from FTP file system. As for the above abstract class, concrete
+subclasses need to implement the
+[readEntity](https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/io/fs/AbstractFileInputOperator.html#readEntity) and
+[emit](https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/io/fs/AbstractFileInputOperator.html#emit) methods.
+
+#### <a name="AbstractFTPInputOperatorProps"></a>Properties
+This operator defines following additional properties beyond those defined in the
+[parent class](#AbstractFileInputOperatorProps).
+
+| **Property** | **Description** | **Type** | **Mandatory** | **Default Value** |
+| -------- | ----------- | ---- | ------------------ | ------------- |
+| *host*| Hostname of ftp server.| String | Yes | N/A |
+| *port*| Port of ftp server.| int | No | 21 (default ftp port) |
+| *userName*| Username which is used for login to the server. | String | No | anonymous |
+| *password*| Password which is used for login to the server. | String | No | gues |
+
+#### Ports
+This operator has no ports.
+
+### 2. FTPStringInputOperator
+This class extends AbstractFTPInputOperator and  implements abstract methods to read files available on FTP file system line by line.
+
+#### <a name="FTPStringInputOperatorProps"></a>Properties
+This operator defines no additional properties beyond those defined in the
+[parent class](#AbstractFTPInputOperatorProps).
+
+#### Ports
+| **Port** | **Description** | **Type** | **Mandatory** |
+| -------- | ----------- | ---- | ------------------ |
+| *output* | Tuples that are read from file are emitted on this port | String | Yes |
+
+### 3. AbstractParquetFileReader
+
+Reads Parquet files from input directory using GroupReadSupport. Derived classes need to implement [convertGroup(Group)](https://www.datatorrent.com/docs/apidocs/com/datatorrent/contrib/parquet/AbstractParquetFileReader.html#convertGroup(Group)) method to convert Group to other type. Also it should implement  [readEntity()](https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/io/fs/AbstractFileInputOperator.html#readEntity()) and [emit(T)](https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/io/fs/AbstractFileInputOperator.html#emit(T)) methods.
+
+#### <a name="AbstractParquetFileReaderProps"></a>Properties of AbstractParquetFileReader
+This operator defines following additional properties beyond those defined in the
+[parent class](#AbstractFileInputOperatorProps).
+
+| **Property** | **Description** | **Type** | **Mandatory** | **Default Value** |
+| -------- | ----------- | ---- | ------------------ | ------------- |
+| *parquetSchema*| Parquet Schema to parse record. | String | Yes | N/A |
+
+#### Ports
+This operator has no ports.
+
+### 4. AbstractThroughputFileInputOperator
+
+This operator extends `AbstractFileInputOperator` by providing the capability to partition
+dynamically based the file backlog. The user can set the preferred number of pending files per operator as well as the maximum number of operators and define a re-partition interval. If a physical operator runs out of files to process and an amount of time greater than or equal to the repartition interval has passed then a new number of operators are created to accommodate the remaining pending files. Derived classes need to implement [readEntity()](https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/io/fs/AbstractFileInputOperator.html#readEntity()) and [emit(T)](https://www.datatorrent.com/docs/apidocs/com/datatorrent/lib/io/fs/AbstractFileInputOperator.html#emit(T)) methods.
+
+#### <a name="AbstractThroughputFileInputOperatorProps"></a>Properties of AbstractThroughputFileInputOperator
+This operator defines following additional properties beyond those defined in the
+[parent class](#AbstractFileInputOperatorProps).
+
+| **Property** | **Description** | **Type** | **Mandatory** | **Default Value** |
+| -------- | ----------- | ---- | ------------------ | ------------- |
+| *repartitionInterval*| The minimum amount of time that must pass in milliseconds before the operator can be repartitioned. | long | No | 5 minutes |
+| *preferredMaxPendingFilesPerOperator* | the preferred number of pending files per operator. | int | No | 10 |
+| *partitionCount* | the maximum number of partitions for the operator. | int | No | 1 |
+
+#### Ports
+This operator has no ports.
+
+### 5. LineByLineFileInputOperator
+As mentioned in the overview above, this operator defines a single output port; it reads files
+as lines and emits them as Java Strings on the output port. The output port *must* be connected.
+Lines are extracted using the Java `BufferedReader` class and the default character encoding.
+An example illustrating the use of a custom encoding (such as UTF_8) is provided below
+
+#### Properties
+This operator defines no additional properties beyond those defined in the
+[parent class](#AbstractFileInputOperatorProps).
+
+#### Ports
+| **Port** | **Description** | **Type** | **Mandatory** |
+| -------- | ----------- | ---- | ------------------ |
+| *output* | Tuples that are read from file are emitted on this port | String | Yes |
+
+## Example Implementation Using a Custom Character Encoding
+This example demonstrates how to extend the `AbstractFileInputOperator` to read
+UTF-8 encoded data.
+
+```
+public class EncodedDataReader extends AbstractFileInputOperator<String>
+{
+  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
+  protected transient BufferedReader br;
+
+  protected InputStream openFile(Path path) throws IOException
+  {
+    InputStream is = super.openFile(path);
+    br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
+    return is;
+  }
+
+  @Override
+  protected void closeFile(InputStream is) throws IOException
+  {
+    super.closeFile(is);
+    br.close();
+    br = null;
+  }
+
+  @Override
+  protected String readEntity() throws IOException
+  {
+    return br.readLine();
+  }
+
+  @Override
+  protected void emit(String tuple)
+  {
+    output.emit(tuple);
+  }
+}
+
+```
+
+## Common Implementation Scenarios
+Sometimes, downstream operators need to know which file each tuple came from; there are a
+number of ways of achieving this, each with its own tradeoffs. Some alternatives:
+
+- If the generic tuple type is a String, each tuple can be prefixed with the file name
+  with a suitable separator, for example: `foo.txt: first line`. This works but
+  has obvious additional costs in both processing (to parse out the two pieces of each
+  tuple) and network bandwidth utilization.
+- Define a custom tuple class with two fields: one for the file name and one for tuple data.
+  The costs are similar to the previous approach though the code is simpler since
+  parsing is handled behind the scenes by the serialization process.
+- Define the tuple type to be `Object` and emit either a custom `Tuple` object for actual
+  tuple data or **BOF**/**EOF** objects with the name of the file when a new file begins
+  or the current file ends. Here, the additional bandwidth consumed is
+  minimal (just 2 additional tuples at file boundaries) but the type of each tuple needs
+  to be checked using `instanceof` in the downstream operators which has some runtime cost.
+- Similar to the previous approach but define an additional control port dedicated to
+  the BOF/EOF control tuples. This approach eliminates the runtime cost of using `instanceof`
+  but some care is needed because (a) the order of tuples arriving at multiple input ports
+  in downstream operators cannot be guaranteed -- for example, the BOF/EOF control tuples
+  may arrive before some of the actual data tuples; and (b) since the operator may read
+  more than one file in a single streaming window, the downstream operator may not be
+  able to tell which tuples belong to which file. One way of dealing with this is to
+  stop emitting data tuples until the next `endWindow()` callback when an EOF is detected
+  for the current file; that way, if the downstream operator receives an EOF control tuple,
+  it has the guarantee that all the data tuples received in the same window belong to the
+  current file.
+
+Of course, other strategies are possible depending on the needs of the particular situation.
+
+When used in a long-running application where a very large number of files are are processed
+over time, the internal state (consisting of properties like `processedFiles`) may grow
+correspondingly and this may have some performance impact since each checkpoint saves the
+entire operator state. In such situations, it is useful to explore options such as moving
+processed files to another directory and trimming operator state variables suitably.
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/17515e30/docs/operators/images/fsInput/operatorsClassDiagram.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/fsInput/operatorsClassDiagram.png b/docs/operators/images/fsInput/operatorsClassDiagram.png
new file mode 100644
index 0000000..31c7a0d
Binary files /dev/null and b/docs/operators/images/fsInput/operatorsClassDiagram.png differ


[2/2] apex-malhar git commit: Merge branch 'APEXMALHAR-2184' of https://github.com/DT-Priyanka/incubator-apex-malhar

Posted by ra...@apache.org.
Merge branch 'APEXMALHAR-2184' of https://github.com/DT-Priyanka/incubator-apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/36582fff
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/36582fff
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/36582fff

Branch: refs/heads/master
Commit: 36582fff2413c7f6d4376f9840129b1f20fb34c3
Parents: 0c4b3fc 17515e3
Author: Munagala V. Ramanath <ra...@datatorrent.com>
Authored: Tue Sep 20 08:50:14 2016 -0700
Committer: Munagala V. Ramanath <ra...@datatorrent.com>
Committed: Tue Sep 20 08:50:14 2016 -0700

----------------------------------------------------------------------
 docs/operators/fsInputOperator.md               | 339 +++++++++++++++++++
 .../images/fsInput/operatorsClassDiagram.png    | Bin 0 -> 71104 bytes
 2 files changed, 339 insertions(+)
----------------------------------------------------------------------