You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sh...@apache.org on 2017/05/17 10:27:19 UTC
apex-malhar git commit: APEXMALHAR-2463 FTP input operator sample app,
and documentation
Repository: apex-malhar
Updated Branches:
refs/heads/master 3a3629841 -> c3f86f237
APEXMALHAR-2463 FTP input operator sample app, and documentation
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c3f86f23
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c3f86f23
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c3f86f23
Branch: refs/heads/master
Commit: c3f86f237f6c1253964065a2a309947585a206e4
Parents: 3a36298
Author: francisf <fr...@gmail.com>
Authored: Thu Mar 30 09:57:00 2017 +0530
Committer: francisf <fr...@gmail.com>
Committed: Tue May 16 11:48:39 2017 +0530
----------------------------------------------------------------------
docs/operators/ftpInputOperator.md | 72 +++++++++++++++++++
.../images/ftpInputOperator/classdiagram.png | Bin 0 -> 27557 bytes
examples/ftp/README.md | 37 ++++++++++
examples/ftp/pom.xml | 37 ++++++++++
examples/ftp/src/assemble/appPackage.xml | 59 +++++++++++++++
.../apache/apex/examples/ftp/Application.java | 58 +++++++++++++++
.../apache/apex/examples/ftp/package-info.java | 19 +++++
.../src/main/resources/META-INF/properties.xml | 48 +++++++++++++
.../ftp/src/test/resources/log4j.properties | 43 +++++++++++
examples/pom.xml | 1 +
mkdocs.yml | 1 +
11 files changed, 375 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/docs/operators/ftpInputOperator.md
----------------------------------------------------------------------
diff --git a/docs/operators/ftpInputOperator.md b/docs/operators/ftpInputOperator.md
new file mode 100644
index 0000000..e6f39f7
--- /dev/null
+++ b/docs/operators/ftpInputOperator.md
@@ -0,0 +1,72 @@
+FTP Input Operator
+=============
+
+## Operator Objective
+This operator(`AbstractFTPInputOperator`) is designed to scan a directory from an FTP server for files, read
+and split file content into tuples such as lines or blocks of bytes, and finally
+emit them on the output port for further processing by downstream operators.
+The operator extends the `AbstractFileInputOperator`. It overrides the
+getFSInstance() method and returns an instance of the FTPFileSystem
+(`org.apache.hadoop.fs.ftp.FTPFileSystem`)
+
+## Class Diagram
+![FTPInputOperator class diagram](images/ftpInputOperator/classdiagram.png)
+
+## Operator Information
+1. Operator location : **_malhar-lib_**
+2. Available since : **_2.0.0_**
+3. Java Package : [com.datatorrent.lib.io](https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java)
+
+### Ports
+Because this is an input operator, there are no input ports.
+
+
+| Port | Description | Type | Mandatory |
+| ------- | -------- | ----- | ------- |
+| *output* | output port on which data is emitted | String | Yes |
+
+
+### Configuration
+
+| Property | Description | Type | Mandatory | Default Value |
+| ------- | ------- | ------- | ------- | ------- |
+| *host* | the hostname of the FTP Server | String | Yes | N/A |
+| *source* | the directory path from where to scan and read files | String | Yes | N/A |
+| *username* | the username for authenticating against the FTP server. This is an optional property and can be skipped when anonymous FTP is enabled | String | Yes | N/A |
+| *password* | the password to be used in conjunction with the above username | String | Yes | N/A |
+
+## Partitioning
+#### Static Partitioning
+Configure parameter `partitionCount` to define the desired number of initial partitions
+(4 in this example).
+
+```xml
+<property>
+ <name>dt.operator.{OperatorName}.prop.partitionCount</name>
+ <value>4</value>
+</property>
+```
+Alternatively, this can be changed in the application code by setting the operator property `partitionCount` to the desired number of partitions:
+
+```java
+FTPStringInputOperator reader = dag.addOperator("Reader", new FTPStringInputOperator());
+reader.setPartitionCount(4);
+```
+
+#### Dynamic Partitioning
+Dynamic partitioning -- changing the number of partitions of one or more operators
+in a running application -- can be achieved in multiple ways:
+- Use the command line tool `apex` or the UI console to change the value of the
+ `partitionCount` property of the running operator. This change is detected in
+ `processStats()` (which is invoked periodically by the platform) where, if the
+ current partition count (`currentPartitions`) and the desired partition count
+ (`partitionCount`) differ, the `repartitionRequired` flag in the response is set.
+ This causes the platform to invoke `definePartitions()` to create a new set of
+ partitions with the desired count.
+- Override `processStats()` and within it, based on the statistics in the
+ incoming parameter or any other factors, define a new desired value of
+ `partitionCount` and finally, if this value differs from the current partition
+ count, set the `repartitionRequired` flag in the response.
+
+### Example application
+An example application for the FTP input operator can be found at [https://github.com/apache/apex-malhar/tree/master/examples/ftp](https://github.com/apache/apex-malhar/tree/master/examples/ftp)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/docs/operators/images/ftpInputOperator/classdiagram.png
----------------------------------------------------------------------
diff --git a/docs/operators/images/ftpInputOperator/classdiagram.png b/docs/operators/images/ftpInputOperator/classdiagram.png
new file mode 100644
index 0000000..8e173a8
Binary files /dev/null and b/docs/operators/images/ftpInputOperator/classdiagram.png differ
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/README.md
----------------------------------------------------------------------
diff --git a/examples/ftp/README.md b/examples/ftp/README.md
new file mode 100644
index 0000000..d4aca6a
--- /dev/null
+++ b/examples/ftp/README.md
@@ -0,0 +1,37 @@
+## FTP Input Operator Example
+
+This example shows how to use `FTPStringInputOperator` which is inherited from the `AbstractFTPInputOperator`. The `FTPStringInputOperator` scans a directory from an FTP server for files, reads lines from the files and
+emits them on the output port for further processing. The tuples emitted by the `FTPStringInputOperator` are processed by the downstream operator `StringFileOutputOperator` which writes them to hdfs.
+
+The properties file `META-INF/properties.xml` shows how to configure the respective operators.
+
+
+Users can choose the application and additional configuration file to use during launch time. In this example, we use the files mentioned above to configure the operator properties.
+
+
+#### **Update Properties from properties.xml - This is needed to run the example:**
+
+- Update these common properties in the file `/src/main/resources/META-INF/properties.xml`:
+
+| Property Name | Description |
+| ------------- | ----------- |
+| dt.application.FTPInputExample.operator.Reader.host | address of the ftp server |
+| dt.application.FTPInputExample.operator.Reader.userName | user for the ftp server if anonymous ftp is disabled |
+| dt.application.FTPInputExample.operator.Reader.password | password associated with the above user |
+| dt.application.FTPInputExample.operator.Writer.filePath | output file path for the records after formatting |
+| dt.application.FTPInputExample.operator.Writer.outputFileName | output file name for the records to be written after formatting |
+
+### How to compile
+`shell> mvn clean package`
+
+This will generate application package malhar-examples-ftp-3.8.0-SNAPSHOT.apa inside target directory.
+
+### How to run
+Use the application package generated above to launch the application from UI console(if available) or apex command line interface.
+
+`apex> launch target/malhar-examples-ftp-3.8.0-SNAPSHOT.apa`
+
+
+
+
+In case you have issues configuring the operator or running the application, please send an email to users@apache.apex.org.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/pom.xml
----------------------------------------------------------------------
diff --git a/examples/ftp/pom.xml b/examples/ftp/pom.xml
new file mode 100644
index 0000000..a54ec33
--- /dev/null
+++ b/examples/ftp/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>malhar-examples-ftp</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Apex Malhar FTP Example</name>
+ <description>Apex example applications that reads from an FTP server and writes to hdfs.</description>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.8.0-SNAPSHOT</version>
+ </parent>
+
+</project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/ftp/src/assemble/appPackage.xml b/examples/ftp/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/ftp/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>appPackage</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/</directory>
+ <outputDirectory>/app</outputDirectory>
+ <includes>
+ <include>${project.artifactId}-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/target/deps</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/site/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>*.xml</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/META-INF</directory>
+ <outputDirectory>/META-INF</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>${basedir}/src/main/resources/app</directory>
+ <outputDirectory>/app</outputDirectory>
+ </fileSet>
+ </fileSets>
+
+</assembly>
+
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/main/java/org/apache/apex/examples/ftp/Application.java
----------------------------------------------------------------------
diff --git a/examples/ftp/src/main/java/org/apache/apex/examples/ftp/Application.java b/examples/ftp/src/main/java/org/apache/apex/examples/ftp/Application.java
new file mode 100644
index 0000000..5488970
--- /dev/null
+++ b/examples/ftp/src/main/java/org/apache/apex/examples/ftp/Application.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.ftp;
+
+import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringFileOutputOperator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.io.AbstractFTPInputOperator.FTPStringInputOperator;
+
+/**
+ * This application demonstrates the FTPStringInputOperator. It uses the FTPStringInputOperator which reads
+ * data from a directory on an FTP server, and then writes it to a file using the StringFileOutputOperator.
+ */
+@ApplicationAnnotation(name = "FTPInputExample")
+public class Application implements StreamingApplication
+{
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ // ftp read operator. Configuration through resources/META-INF/properties.xml
+ FTPStringInputOperator reader = dag.addOperator("Reader", new FTPStringInputOperator());
+ //Set properties for the FTP input operator
+ reader.setHost("localhost");
+ reader.setUserName("ftp");
+ reader.setDirectory("sourceDir");
+ reader.setPartitionCount(2);
+
+ // writer that writes strings to a file on hdfs
+ StringFileOutputOperator writer = dag.addOperator("Writer", new StringFileOutputOperator());
+ //Set properties for the output operator
+ writer.setFilePath("malhar_examples/ftp");
+ writer.setFilePath("destination");
+
+ //Connect reader output to writer
+ dag.addStream("data", reader.output, writer.input);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/main/java/org/apache/apex/examples/ftp/package-info.java
----------------------------------------------------------------------
diff --git a/examples/ftp/src/main/java/org/apache/apex/examples/ftp/package-info.java b/examples/ftp/src/main/java/org/apache/apex/examples/ftp/package-info.java
new file mode 100644
index 0000000..3a2339c
--- /dev/null
+++ b/examples/ftp/src/main/java/org/apache/apex/examples/ftp/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.ftp;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/ftp/src/main/resources/META-INF/properties.xml b/examples/ftp/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..8b3ac89
--- /dev/null
+++ b/examples/ftp/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration>
+ <!-- FTP Input Example -->
+ <property>
+ <name>dt.application.FTPInputExample.operator.Reader.partitionCount</name>
+ <value>2</value>
+ </property>
+ <property>
+ <name>dt.application.FTPInputExample.operator.Reader.host</name>
+ <value>localhost</value>
+ </property>
+ <property>
+ <name>dt.application.FTPInputExample.operator.Reader.userName</name>
+ <value>ftp</value>
+ </property>
+ <property>
+ <name>dt.application.FTPInputExample.operator.Reader.directory</name>
+ <value>sourceDir</value>
+ </property>
+ <property>
+ <name>dt.application.FTPInputExample.operator.Writer.filePath</name>
+ <value>malhar_examples/ftp</value>
+ </property>
+ <property>
+ <name>dt.application.FTPInputExample.operator.Writer.outputFileName</name>
+ <value>destination</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/ftp/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/ftp/src/test/resources/log4j.properties b/examples/ftp/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/ftp/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 16cfe26..cff85a7 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -198,6 +198,7 @@
<module>throttle</module>
<module>transform</module>
<module>kafka</module>
+ <module>ftp</module>
</modules>
<dependencies>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c3f86f23/mkdocs.yml
----------------------------------------------------------------------
diff --git a/mkdocs.yml b/mkdocs.yml
index e23abe6..75a862a 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -16,6 +16,7 @@ pages:
- File Splitter: operators/file_splitter.md
- Filter: operators/filter.md
- Fixed Width Parser: operators/fixedWidthParserOperator.md
+ - FTP Input Operator: operators/ftpInputOperator.md
- Jdbc Output Operator: operators/AbstractJdbcTransactionableOutputOperator.md
- JDBC Poller Input: operators/jdbcPollInputOperator.md
- JMS Input: operators/jmsInputOperator.md