You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sh...@apache.org on 2017/05/17 10:27:19 UTC

apex-malhar git commit: APEXMALHAR-2463 FTP input operator sample app, and documentation

Repository: apex-malhar
Updated Branches:
  refs/heads/master 3a3629841 -> c3f86f237


APEXMALHAR-2463 FTP input operator sample app, and documentation


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c3f86f23
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c3f86f23
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c3f86f23

Branch: refs/heads/master
Commit: c3f86f237f6c1253964065a2a309947585a206e4
Parents: 3a36298
Author: francisf <fr...@gmail.com>
Authored: Thu Mar 30 09:57:00 2017 +0530
Committer: francisf <fr...@gmail.com>
Committed: Tue May 16 11:48:39 2017 +0530

----------------------------------------------------------------------
 docs/operators/ftpInputOperator.md              |  72 +++++++++++++++++++
 .../images/ftpInputOperator/classdiagram.png    | Bin 0 -> 27557 bytes
 examples/ftp/README.md                          |  37 ++++++++++
 examples/ftp/pom.xml                            |  37 ++++++++++
 examples/ftp/src/assemble/appPackage.xml        |  59 +++++++++++++++
 .../apache/apex/examples/ftp/Application.java   |  58 +++++++++++++++
 .../apache/apex/examples/ftp/package-info.java  |  19 +++++
 .../src/main/resources/META-INF/properties.xml  |  48 +++++++++++++
 .../ftp/src/test/resources/log4j.properties     |  43 +++++++++++
 examples/pom.xml                                |   1 +
 mkdocs.yml                                      |   1 +
 11 files changed, 375 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/docs/operators/ftpInputOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/ftpInputOperator.md b/docs/operators/ftpInputOperator.md
new file mode 100644
index 0000000..e6f39f7
--- /dev/null
+++ b/docs/operators/ftpInputOperator.md
@@ -0,0 +1,72 @@
+FTP Input Operator
+=============
+
+## Operator Objective
+This operator(`AbstractFTPInputOperator`) is designed to scan a directory from an FTP server for files, read
+and split file content into tuples such as lines or blocks of bytes, and finally
+emit them on the output port for further processing by downstream operators.
+The operator extends the `AbstractFileInputOperator`. It overrides the
+getFSInstance() method and returns an instance of the FTPFileSystem
+(`org.apache.hadoop.fs.ftp.FTPFileSystem`)
+
+## Class Diagram
+![FTPInputOperator class diagram](images/ftpInputOperator/classdiagram.png)
+
+## Operator Information
+1. Operator location : **_malhar-lib_**
+2. Available since : **_2.0.0_**
+3. Java Package : [com.datatorrent.lib.io](https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java)
+
+### Ports
+Because this is an input operator, there are no input ports.
+
+
+| Port  | Description | Type | Mandatory |
+| -------  | -------- | ----- | ------- |
+| *output* | output port on which data is emitted | String | Yes |
+
+
+### Configuration
+
+| Property | Description | Type | Mandatory | Default Value |
+| -------  | -------  | -------  | -------  | -------  |
+| *host* | the hostname of the FTP Server | String | Yes | N/A |
+| *source* | the directory path from where to scan and read files | String | Yes | N/A |
+| *username* | the username for authenticating against the FTP server. This is an optional property and can be skipped when anonymous FTP is enabled | String | Yes | N/A |
+| *password* | the password to be used in conjunction with the above username | String | Yes | N/A |
+
+## Partitioning
+#### Static Partitioning
+Configure parameter `partitionCount` to define the desired number of initial partitions
+(4 in this example).
+
+```xml
+<property>
+  <name>dt.operator.{OperatorName}.prop.partitionCount</name>
+  <value>4</value>
+</property>
+```
+Alternatively, this can be changed in the application code by setting the operator property `partitionCount` to the desired number of partitions:
+
+```java
+FTPStringInputOperator reader = dag.addOperator("Reader", new FTPStringInputOperator());
+reader.setPartitionCount(4);
+```
+
+#### Dynamic Partitioning
+Dynamic partitioning -- changing the number of partitions of one or more operators
+in a running application -- can be achieved in multiple ways:
+- Use the command line tool `apex` or the UI console to change the value of the
+  `partitionCount` property of the running operator. This change is detected in
+  `processStats()` (which is invoked periodically by the platform) where, if the
+  current partition count (`currentPartitions`) and the desired partition count
+  (`partitionCount`) differ, the `repartitionRequired` flag in the response is set.
+  This causes the platform to invoke `definePartitions()` to create a new set of
+  partitions with the desired count.
+- Override `processStats()` and within it, based on the statistics in the
+  incoming parameter or any other factors, define a new desired value of
+  `partitionCount` and finally, if this value differs from the current partition
+  count, set the `repartitionRequired` flag in the response.
+
+### Example application
+An example application for the FTP input operator can be found at [https://github.com/apache/apex-malhar/tree/master/examples/ftp](https://github.com/apache/apex-malhar/tree/master/examples/ftp)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/docs/operators/images/ftpInputOperator/classdiagram.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/ftpInputOperator/classdiagram.png b/docs/operators/images/ftpInputOperator/classdiagram.png
new file mode 100644
index 0000000..8e173a8
Binary files /dev/null and b/docs/operators/images/ftpInputOperator/classdiagram.png differ

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/README.md
----------------------------------------------------------------------
diff --git a/examples/ftp/README.md b/examples/ftp/README.md
new file mode 100644
index 0000000..d4aca6a
--- /dev/null
+++ b/examples/ftp/README.md
@@ -0,0 +1,37 @@
+## FTP Input Operator Example
+
+This example shows how to use `FTPStringInputOperator` which is inherited from the `AbstractFTPInputOperator`. The `FTPStringInputOperator` scans a directory from an FTP server for files, reads lines from the files and
+emits them on the output port for further processing. The tuples emitted by the `FTPStringInputOperator` are processed by the downstream operator `StringFileOutputOperator` which writes them to hdfs.
+
+The properties file `META-INF/properties.xml` shows how to configure the respective operators.
+
+
+Users can choose the application and additional configuration file to use during launch time. In this example, we use the files mentioned above to configure the operator properties.
+
+
+#### **Update Properties from properties.xml - This is needed to run the example:**
+
+- Update these common properties in the file `/src/main/resources/META-INF/properties.xml`:
+
+| Property Name  | Description |
+| -------------  | ----------- |
+| dt.application.FTPInputExample.operator.Reader.host | address of the ftp server |
+| dt.application.FTPInputExample.operator.Reader.userName | user for the ftp server if anonymous ftp is disabled |
+| dt.application.FTPInputExample.operator.Reader.password | password associated with the above user |
+| dt.application.FTPInputExample.operator.Writer.filePath | output file path for the records after formatting |
+| dt.application.FTPInputExample.operator.Writer.outputFileName | output file name for the records to be written after formatting |
+
+### How to compile
+`shell> mvn clean package`
+
+This will generate application package malhar-examples-ftp-3.8.0-SNAPSHOT.apa inside target directory.
+
+### How to run
+Use the application package generated above to launch the application from UI console(if available) or apex command line interface.
+
+`apex> launch target/malhar-examples-ftp-3.8.0-SNAPSHOT.apa`
+
+
+
+
+In case you have issues configuring the operator or running the application, please send an email to users@apache.apex.org.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/pom.xml
----------------------------------------------------------------------
diff --git a/examples/ftp/pom.xml b/examples/ftp/pom.xml
new file mode 100644
index 0000000..a54ec33
--- /dev/null
+++ b/examples/ftp/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>malhar-examples-ftp</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar FTP Example</name>
+  <description>Apex example applications that reads from an FTP server and writes to hdfs.</description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.8.0-SNAPSHOT</version>
+  </parent>
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/ftp/src/assemble/appPackage.xml b/examples/ftp/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/ftp/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/main/java/org/apache/apex/examples/ftp/Application.java
----------------------------------------------------------------------
diff --git a/examples/ftp/src/main/java/org/apache/apex/examples/ftp/Application.java b/examples/ftp/src/main/java/org/apache/apex/examples/ftp/Application.java
new file mode 100644
index 0000000..5488970
--- /dev/null
+++ b/examples/ftp/src/main/java/org/apache/apex/examples/ftp/Application.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.ftp;
+
+import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringFileOutputOperator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.AbstractFTPInputOperator.FTPStringInputOperator;
+
+/**
+ * This application demonstrates the FTPStringInputOperator. It uses the FTPStringInputOperator which reads
+ * data from a directory on an FTP server, and then writes it to a file using the  StringFileOutputOperator.
+ */
+@ApplicationAnnotation(name = "FTPInputExample")
+public class Application implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    // ftp read operator. Configuration through resources/META-INF/properties.xml
+    FTPStringInputOperator reader = dag.addOperator("Reader", new FTPStringInputOperator());
+    //Set properties for the FTP input operator
+    reader.setHost("localhost");
+    reader.setUserName("ftp");
+    reader.setDirectory("sourceDir");
+    reader.setPartitionCount(2);
+
+    // writer that writes strings to a file on hdfs
+    StringFileOutputOperator writer = dag.addOperator("Writer", new StringFileOutputOperator());
+    //Set properties for the output operator
+    writer.setFilePath("malhar_examples/ftp");
+    writer.setFilePath("destination");
+
+    //Connect reader output to writer
+    dag.addStream("data", reader.output, writer.input);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/main/java/org/apache/apex/examples/ftp/package-info.java
----------------------------------------------------------------------
diff --git a/examples/ftp/src/main/java/org/apache/apex/examples/ftp/package-info.java b/examples/ftp/src/main/java/org/apache/apex/examples/ftp/package-info.java
new file mode 100644
index 0000000..3a2339c
--- /dev/null
+++ b/examples/ftp/src/main/java/org/apache/apex/examples/ftp/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.ftp;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/ftp/src/main/resources/META-INF/properties.xml b/examples/ftp/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..8b3ac89
--- /dev/null
+++ b/examples/ftp/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- FTP Input Example -->
+  <property>
+    <name>dt.application.FTPInputExample.operator.Reader.partitionCount</name>
+    <value>2</value>
+  </property>
+  <property>
+    <name>dt.application.FTPInputExample.operator.Reader.host</name>
+    <value>localhost</value>
+  </property>
+  <property>
+    <name>dt.application.FTPInputExample.operator.Reader.userName</name>
+    <value>ftp</value>
+  </property>
+  <property>
+    <name>dt.application.FTPInputExample.operator.Reader.directory</name>
+    <value>sourceDir</value>
+  </property>
+  <property>
+    <name>dt.application.FTPInputExample.operator.Writer.filePath</name>
+    <value>malhar_examples/ftp</value>
+  </property>
+  <property>
+    <name>dt.application.FTPInputExample.operator.Writer.outputFileName</name>
+    <value>destination</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/ftp/src/test/resources/log4j.properties b/examples/ftp/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/ftp/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 16cfe26..cff85a7 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -198,6 +198,7 @@
     <module>throttle</module>
     <module>transform</module>
     <module>kafka</module>
+    <module>ftp</module>
   </modules>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/mkdocs.yml
----------------------------------------------------------------------
diff --git a/mkdocs.yml b/mkdocs.yml
index e23abe6..75a862a 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -16,6 +16,7 @@ pages:
     - File Splitter: operators/file_splitter.md
     - Filter: operators/filter.md
     - Fixed Width Parser: operators/fixedWidthParserOperator.md
+    - FTP Input Operator: operators/ftpInputOperator.md
     - Jdbc Output Operator: operators/AbstractJdbcTransactionableOutputOperator.md
     - JDBC Poller Input: operators/jdbcPollInputOperator.md
     - JMS Input: operators/jmsInputOperator.md