You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/11/11 08:22:32 UTC

[camel-kafka-connector-examples] branch master updated: Added FTP Source Connector Example

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector-examples.git


The following commit(s) were added to refs/heads/master by this push:
     new efc2dec  Added FTP Source Connector Example
efc2dec is described below

commit efc2decfe27776f9445dc7917397dabf9ea3267e
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Nov 11 09:21:34 2020 +0100

    Added FTP Source Connector Example
---
 ftp/ftp-source/README.adoc                         | 220 +++++++++++++++++++++
 .../config/CamelFtpSourceConnector.properties      |  34 ++++
 2 files changed, 254 insertions(+)

diff --git a/ftp/ftp-source/README.adoc b/ftp/ftp-source/README.adoc
new file mode 100644
index 0000000..29c6c8e
--- /dev/null
+++ b/ftp/ftp-source/README.adoc
@@ -0,0 +1,220 @@
+# Camel-Kafka-connector FTP Source
+
+This is an example for Camel-Kafka-connector FTP Source 
+
+## Standalone
+
+### What is needed
+
+- An FTP server
+
+### Setting up FTP Server
+
+We'll use the fauria/vsftpd docker image
+
+Run the following command:
+
+```
+docker run -d -v /my/data/directory:/home/vsftpd -p 20:20 -p 21:21 -p 21100-21110:21100-21110 -e FTP_USER=admin -e FTP_PASS=password -e PASV_ADDRESS=127.0.0.1 -e PASV_MIN_PORT=21100 -e PASV_MAX_PORT=21110 --name vsftpd --restart=always fauria/vsftpd
+9534a8d7a87c5f0525079824f692552fe6306fcea2e0e2a0fe6039b038370a12
+```
+We are mapping container port 22 to host port 24 for convenience.
+
+take note of the container id. In our case it is 1cb0cdd7b9a24112ecb9e4c7e195f01552e0c9187a173e29e6642c1f9d9b3455
+
+### Running Kafka
+
+```
+$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
+$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
+$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mytopic
+```
+
+### Setting up the needed bits and running the example
+
+You'll need to setup the plugin.path property in your kafka
+
+Open the `$KAFKA_HOME/config/connect-standalone.properties`
+
+and set the `plugin.path` property to your choosen location
+
+You'll need to build your connector starting from an archetype:
+
+```
+> mvn archetype:generate  -DarchetypeGroupId=org.apache.camel.kafkaconnector.archetypes  -DarchetypeArtifactId=camel-kafka-connector-extensible-archetype  -DarchetypeVersion=0.6.0
+[INFO] Scanning for projects...
+[INFO] 
+[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
+[INFO] Building Maven Stub Project (No POM) 1
+[INFO] --------------------------------[ pom ]---------------------------------
+[INFO] 
+[INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
+[INFO] 
+[INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
+[INFO] 
+[INFO] 
+[INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
+[INFO] Generating project in Interactive mode
+[INFO] Archetype repository not defined. Using the one from [org.apache.camel.kafkaconnector.archetypes:camel-kafka-connector-extensible-archetype:0.6.0] found in catalog remote
+Define value for property 'groupId': org.apache.camel.kafkaconnector
+Define value for property 'artifactId': ftp-extended
+Define value for property 'version' 1.0-SNAPSHOT: : 0.6.0
+Define value for property 'package' org.apache.camel.kafkaconnector: : 
+Define value for property 'camel-kafka-connector-name': camel-ftp-kafka-connector
+[INFO] Using property: camel-kafka-connector-version = 0.6.0
+Confirm properties configuration:
+groupId: org.apache.camel.kafkaconnector
+artifactId: ftp-extended
+version: 0.6.0
+package: com.github.oscerd
+camel-kafka-connector-name: camel-ftp-kafka-connector
+camel-kafka-connector-version: 0.6.0
+ Y: : Y
+[INFO] ----------------------------------------------------------------------------
+[INFO] Using following parameters for creating project from Archetype: camel-kafka-connector-extensible-archetype:0.6.0
+[INFO] ----------------------------------------------------------------------------
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: artifactId, Value: ftp-extended
+[INFO] Parameter: version, Value: 0.6.0
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: packageInPathFormat, Value: org/apache/camel/kafkaconnector
+[INFO] Parameter: package, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: version, Value: 0.6.0
+[INFO] Parameter: groupId, Value: org.apache.camel.kafkaconnector
+[INFO] Parameter: camel-kafka-connector-name, Value: camel-ftp-kafka-connector
+[INFO] Parameter: camel-kafka-connector-version, Value: 0.6.0
+[INFO] Parameter: artifactId, Value: ftp-extended
+[INFO] Project created from Archetype in dir: /home/workspace/miscellanea/ftp-extended
+[INFO] ------------------------------------------------------------------------
+[INFO] BUILD SUCCESS
+[INFO] ------------------------------------------------------------------------
+[INFO] Total time:  24.590 s
+[INFO] Finished at: 2020-11-05T07:45:43+01:00
+[INFO] ------------------------------------------------------------------------
+> cd /home/workspace/miscellanea/ftp-extended
+```
+
+We'll need to add a little transform for this example. So import the ftp-extended project in your IDE and create a class in the only package there
+
+```
+package org.apache.camel.kafkaconnector;
+
+import java.util.Map;
+
+import org.apache.camel.component.file.remote.RemoteFile;
+import org.apache.camel.kafkaconnector.utils.SchemaHelper;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RemoteFileTransforms <R extends ConnectRecord<R>> implements Transformation<R> {
+    public static final String FIELD_KEY_CONFIG = "key";
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(FIELD_KEY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
+                    "Transforms Remote File to String");
+
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteFileTransforms.class);
+
+    @Override
+    public R apply(R r) {
+        Object value = r.value();
+
+        if (r.value() instanceof RemoteFile) {
+            LOG.debug("Converting record from RemoteFile to text");
+            RemoteFile message = (RemoteFile) r.value();
+
+            LOG.debug("Received text: {}", message.getBody());
+
+            return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(),
+                    SchemaHelper.buildSchemaBuilderForType(message.getBody()), message.getBody(), r.timestamp());
+
+        } else {
+            LOG.debug("Unexpected message type: {}", r.value().getClass());
+
+            return r;
+        }
+    }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+
+    }
+}
+```
+
+Now we need to build the connector:
+
+```
+> mvn clean package
+```
+
+In this example we'll use `/home/oscerd/connectors/` as plugin.path, but we'll need the generated zip from the previois build
+
+```
+> cd /home/oscerd/connectors/
+> cp /home/workspace/miscellanea/ftp-extended/target/ftp-extended-0.6.0-package.zip .
+> unzip ftp-extended-0.6.0-package.zip
+```
+
+Now it's time to setup the connector
+
+Open the FTP source configuration file
+
+```
+name=CamelFtpSourceConnector
+connector.class=org.apache.camel.kafkaconnector.ftp.CamelFtpSourceConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
+transforms=RemoteTransformer
+transforms.RemoteTransformer.type=org.apache.camel.kafkaconnector.RemoteFileTransforms
+
+topics=mytopic
+
+camel.source.path.host=127.0.0.1
+camel.source.path.port=21
+camel.source.endpoint.passiveMode=true
+camel.source.endpoint.recursive=true
+camel.source.endpoint.noop=false
+camel.source.endpoint.username=admin
+camel.source.endpoint.password=password
+camel.source.endpoint.move=.done
+```
+
+Now you can run the example
+
+```
+$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/CamelFtpSourceConnector.properties
+```
+
+Now we need to connect to the ftp server and add some stuff to the demos folder
+
+```
+> docker exec -it 9534a8d7a87c5f0525079824f692552fe6306fcea2e0e2a0fe6039b038370a12 bash
+[root@9534a8d7a87c /]# cd /home/vsftpd
+[root@9534a8d7a87c vsftpd]# touch test.txt
+[root@9534a8d7a87c vsftpd]# echo "Ckc rocks" > test.txt 
+[root@9534a8d7a87c vsftpd]# mv test.txt admin/
+```
+
+In another terminal, using kafkacat, you should be able to see the headers.
+
+```
+> ./kafkacat -b localhost:9092 -t mytopic -f 'Headers: %h: Message value: %s\n'
+% Auto-selecting Consumer mode (use -P or -C to override)
+Headers: CamelHeader.CamelFileAbsolute=false,CamelHeader.CamelFileAbsolutePath=test.txt,CamelHeader.CamelFileHost=127.0.0.1,CamelHeader.CamelFileLastModified=1605078600000,CamelHeader.CamelFileLength=10,CamelHeader.CamelFileName=test.txt,CamelHeader.CamelFileNameConsumed=test.txt,CamelHeader.CamelFileNameOnly=test.txt,CamelHeader.CamelFileParent=/,CamelHeader.CamelFilePath=/test.txt,CamelHeader.CamelFileRelativePath=test.txt,CamelHeader.CamelFtpReplyCode=226,CamelHeader.CamelFtpReplyStri [...]
+,CamelProperty.CamelBatchSize=1,CamelProperty.CamelUnitOfWorkProcessSync=true,CamelProperty.CamelBatchComplete=true,CamelProperty.CamelBatchIndex=0,CamelProperty.CamelToEndpoint=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000: Message value: Ckc rocks
+% Reached end of topic mytopic [0] at offset 1
+```
+
diff --git a/ftp/ftp-source/config/CamelFtpSourceConnector.properties b/ftp/ftp-source/config/CamelFtpSourceConnector.properties
new file mode 100644
index 0000000..1ab5260
--- /dev/null
+++ b/ftp/ftp-source/config/CamelFtpSourceConnector.properties
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name=CamelFtpSourceConnector
+connector.class=org.apache.camel.kafkaconnector.ftp.CamelFtpSourceConnector
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
+transforms=RemoteTransformer
+transforms.RemoteTransformer.type=org.apache.camel.kafkaconnector.RemoteFileTransforms
+
+topics=mytopic
+
+camel.source.path.host=127.0.0.1
+camel.source.path.port=21
+camel.source.endpoint.passiveMode=true
+camel.source.endpoint.recursive=true
+camel.source.endpoint.noop=false
+camel.source.endpoint.username=admin
+camel.source.endpoint.password=password
+camel.source.endpoint.move=.done