You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/05/15 20:19:09 UTC
sqoop git commit: SQOOP-2126: Sqoop2: Implement FTP TO Connector
Support
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 9ee44d4f0 -> bf4ae0b3c
SQOOP-2126: Sqoop2: Implement FTP TO Connector Support
(Jonathan Seidman via Abraham Elmahrek)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/bf4ae0b3
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/bf4ae0b3
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/bf4ae0b3
Branch: refs/heads/sqoop2
Commit: bf4ae0b3c77f5ffc037193cb12695a450b30d822
Parents: 9ee44d4
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Fri May 15 11:18:35 2015 -0700
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Fri May 15 11:18:35 2015 -0700
----------------------------------------------------------------------
connector/connector-ftp/pom.xml | 92 +++++++++
.../sqoop/connector/ftp/FtpConnector.java | 137 ++++++++++++
.../sqoop/connector/ftp/FtpConnectorError.java | 52 +++++
.../connector/ftp/FtpConnectorUpgrader.java | 46 +++++
.../sqoop/connector/ftp/FtpConstants.java | 37 ++++
.../apache/sqoop/connector/ftp/FtpLoader.java | 74 +++++++
.../sqoop/connector/ftp/FtpToDestroyer.java | 41 ++++
.../sqoop/connector/ftp/FtpToInitializer.java | 44 ++++
.../connector/ftp/configuration/LinkConfig.java | 76 +++++++
.../ftp/configuration/LinkConfiguration.java | 34 +++
.../ftp/configuration/ToJobConfig.java | 36 ++++
.../ftp/configuration/ToJobConfiguration.java | 34 +++
.../ftp/ftpclient/FtpConnectorClient.java | 207 +++++++++++++++++++
.../resources/ftp-connector-config.properties | 51 +++++
.../src/main/resources/log4j.properties | 24 +++
.../main/resources/sqoopconnector.properties | 18 ++
.../sqoop/connector/ftp/TestFtpLoader.java | 125 +++++++++++
.../configuration/TestLinkConfiguration.java | 142 +++++++++++++
.../configuration/TestToJobConfiguration.java | 58 ++++++
.../ftp/ftpclient/TestFtpConnectorClient.java | 148 +++++++++++++
.../src/test/resources/log4j.properties | 24 +++
connector/pom.xml | 1 +
docs/src/site/sphinx/Connectors.rst | 63 ++++++
pom.xml | 11 +
server/pom.xml | 5 +
25 files changed, 1580 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/pom.xml b/connector/connector-ftp/pom.xml
new file mode 100644
index 0000000..9a70dfc
--- /dev/null
+++ b/connector/connector-ftp/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>connector</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.sqoop.connector</groupId>
+ <artifactId>sqoop-connector-ftp</artifactId>
+ <name>Sqoop FTP Connector</name>
+
+ <properties>
+ <slf4j.version>1.6.1</slf4j.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>connector-sdk</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <version>3.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockftpserver</groupId>
+ <artifactId>MockFtpServer</artifactId>
+ <version>2.4</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <finalName>sqoop</finalName>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnector.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnector.java
new file mode 100644
index 0000000..ffef1bf
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnector.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.VersionInfo;
+import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+/**
+ * Implementation of a Sqoop 2 connector to support data movement to/from an
+ * FTP server.
+ */
+public class FtpConnector extends SqoopConnector {
+
+ /**
+ * Define the TO instance.
+ */
+ private static final To TO = new To(FtpToInitializer.class,
+ FtpLoader.class,
+ FtpToDestroyer.class);
+
+ /**
+ * {@inheritDoc}
+ *
+ * Since this is a built-in connector it will return the same version as
+ * the rest of the Sqoop code.
+ */
+ @Override
+ public String getVersion() {
+ return VersionInfo.getBuildVersion();
+ }
+
+ /**
+ * Return the configuration resource bundle for this connector.
+ *
+ * @param locale The Locale object.
+ *
+ * @return The resource bundle associated with the input locale.
+ */
+ @Override
+ public ResourceBundle getBundle(Locale locale) {
+ return ResourceBundle.getBundle(FtpConstants.RESOURCE_BUNDLE_NAME, locale);
+ }
+
+ /**
+ * Get the class encapsulating link configuration for this connector.
+ *
+ * @return The link configuration class for this connector.
+ */
+ @Override
+ public Class getLinkConfigurationClass() {
+ return LinkConfiguration.class;
+ }
+
+ /**
+ * Get the appropriate job configuration class for the input direction.
+ *
+ * @param direction Whether to return TO or FROM configuration class.
+ *
+ * @return Job configuration class for given direction.
+ */
+ @Override
+ public Class getJobConfigurationClass(Direction direction) {
+ switch (direction) {
+ case TO:
+ return ToJobConfiguration.class;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Get the object which defines classes for performing import jobs.
+ *
+ * @return the From object defining classes for performing import.
+ */
+ @Override
+ public From getFrom() {
+ return null;
+ }
+
+ /**
+ * Get the object which defines classes for performing export jobs.
+ *
+ * @return the To object defining classes for performing export.
+ */
+ @Override
+ public To getTo() {
+ return TO;
+ }
+
+ /**
+ * Returns an {@linkplain org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader}
+ * object that can upgrade the connection and job configs.
+ *
+ * @return configurable upgrader object
+ */
+ @Override
+ public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
+ return new FtpConnectorUpgrader();
+ }
+
+ /**
+ * Return a List of directions supported by this connector.
+ *
+ * @return list of enums representing supported directions.
+ */
+ public List<Direction> getSupportedDirections() {
+ return Arrays.asList(Direction.TO);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorError.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorError.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorError.java
new file mode 100644
index 0000000..77c36ea
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorError.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.common.ErrorCode;
+
+/**
+ * Error messages for FTP connector.
+ */
+public enum FtpConnectorError implements ErrorCode {
+ FTP_CONNECTOR_0000("Unknown error occurred."),
+ FTP_CONNECTOR_0001("Error occurred connecting to FTP server."),
+ FTP_CONNECTOR_0002("Error occurred disconnecting from FTP server."),
+ FTP_CONNECTOR_0003("Error occurred transferring data to FTP server."),
+ FTP_CONNECTOR_0004("Unknown job type")
+ ;
+
+ private final String message;
+
+ private FtpConnectorError(String message) {
+ this.message = message;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String getCode() {
+ return name();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorUpgrader.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorUpgrader.java
new file mode 100644
index 0000000..159ba4e
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorUpgrader.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
+import org.apache.sqoop.model.MLinkConfig;
+import org.apache.sqoop.model.MToConfig;
+
+//NOTE: All config types have the similar upgrade path at this point
+public class FtpConnectorUpgrader extends ConnectorConfigurableUpgrader {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
+ ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(),
+ upgradeTarget.getConfigs());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
+ ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(),
+ upgradeTarget.getConfigs());
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConstants.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConstants.java
new file mode 100644
index 0000000..8e8b4ad
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConstants.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.job.Constants;
+
+/**
+ * Constants for FTP connector.
+ */
+public final class FtpConstants extends Constants {
+
+ /**
+ * Name of resource bundle for configuring this connector.
+ */
+ public static final String RESOURCE_BUNDLE_NAME = "ftp-connector-config";
+
+ /**
+ * Default port for FTP.
+ */
+ public static final int DEFAULT_PORT = 21;
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpLoader.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpLoader.java
new file mode 100644
index 0000000..0ed1eee
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpLoader.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.ftp.ftpclient.FtpConnectorClient;
+import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
+
+import java.util.UUID;
+
+/**
+ * Class to receive data from a From instance and load to a To instance.
+ */
+public class FtpLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
+
+ /**
+ * Number of records written by last call to load() method.
+ */
+ private long rowsWritten = 0;
+
+ /**
+ * Load data to target directory on FTP server.
+ *
+ * @param context Loader context object
+ * @param linkConfiguration Link configuration
+ * @param toJobConfig Job configuration
+ * @throws Exception Re-thrown from FTP client code.
+ */
+ @Override
+ public void load(LoaderContext context, LinkConfiguration linkConfiguration,
+ ToJobConfiguration toJobConfig) throws Exception {
+ DataReader reader = context.getDataReader();
+ String outputDir = toJobConfig.toJobConfig.outputDirectory;
+ // Create a unique filename for writing records, since this method will
+ // likely get called multiple times for a single source file/dataset:
+ String path = outputDir + "/" + UUID.randomUUID() + ".txt";
+
+ FtpConnectorClient client =
+ new FtpConnectorClient(linkConfiguration.linkConfig.server,
+ linkConfiguration.linkConfig.port);
+ client.connect(linkConfiguration.linkConfig.username,
+ linkConfiguration.linkConfig.password);
+ rowsWritten = client.uploadStream(reader, path);
+ client.disconnect();
+ }
+
+ /**
+ * Return the number of rows witten by the last call to load() method.
+ *
+ * @return Number of rows written by call to loader.
+ */
+ @Override
+ public long getRowsWritten() {
+ return rowsWritten;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToDestroyer.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToDestroyer.java
new file mode 100644
index 0000000..50208ce
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToDestroyer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+/**
+ * Perform any clean up, etc. tasks when the Sqoop execution completes.
+ */
+public class FtpToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> {
+ /**
+ * Callback to clean up after job execution.
+ *
+ * @param context Destroyer context
+ * @param linkConfig link configuration object
+ * @param jobConfig TO job configuration object
+ */
+ @Override
+ public void destroy(DestroyerContext context, LinkConfiguration linkConfig,
+ ToJobConfiguration jobConfig) {
+ // do nothing at this point
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToInitializer.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToInitializer.java
new file mode 100644
index 0000000..96a100d
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToInitializer.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+
+/**
+ * Perform any required initialization before execution of job.
+ */
+public class FtpToInitializer extends Initializer<LinkConfiguration, ToJobConfiguration> {
+
+ /**
+ * Initialize new submission based on given configuration properties. Any
+ * needed temporary values might be saved to context object and they will be
+ * promoted to all other part of the workflow automatically.
+ *
+ * @param context Initializer context object
+ * @param linkConfig link configuration object
+ * @param jobConfig TO job configuration object
+ */
+ @Override
+ public void initialize(InitializerContext context, LinkConfiguration linkConfig,
+ ToJobConfiguration jobConfig) {
+ // do nothing at this point
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfig.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfig.java
new file mode 100644
index 0000000..ed9c2cc
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfig.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp.configuration;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ftp.ftpclient.FtpConnectorClient;
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.validators.AbstractValidator;
+import org.apache.sqoop.validation.validators.NotEmpty;
+
+/**
+ * Attributes for FTP connector link configuration.
+ */
+@ConfigClass(validators = {@Validator(LinkConfig.ConfigValidator.class)})
+public class LinkConfig {
+
+ /**
+ * FTP server hostname.
+ */
+ @Input(size = 256, validators = {@Validator(NotEmpty.class)})
+ public String server;
+
+ /**
+ * FTP server port. Default is port 21.
+ */
+ @Input
+ public Integer port;
+
+ /**
+ * Username for server login.
+ */
+ @Input(size = 256, validators = {@Validator(NotEmpty.class)})
+ public String username;
+
+ /**
+ * Password for server login.
+ */
+ @Input(size = 256, sensitive = true)
+ public String password;
+
+ /**
+ * Validate that we can log into the server using the supplied credentials.
+ */
+ public static class ConfigValidator extends AbstractValidator<LinkConfig> {
+ @Override
+ public void validate(LinkConfig linkConfig) {
+ try {
+ FtpConnectorClient client =
+ new FtpConnectorClient(linkConfig.server, linkConfig.port);
+ client.connect(linkConfig.username, linkConfig.password);
+ client.disconnect();
+ } catch (SqoopException e) {
+ addMessage(Status.WARNING, "Can't connect to the FTP server " +
+ linkConfig.server + " error is " + e.getMessage());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfiguration.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfiguration.java
new file mode 100644
index 0000000..60f1836
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfiguration.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp.configuration;
+
+import org.apache.sqoop.model.Config;
+import org.apache.sqoop.model.ConfigurationClass;
+
+/**
+ * Class to encapsulate link attributes for FTP connector.
+ */
+@ConfigurationClass
+public class LinkConfiguration {
+ @Config
+ public LinkConfig linkConfig;
+
+ public LinkConfiguration() {
+ linkConfig = new LinkConfig();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfig.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfig.java
new file mode 100644
index 0000000..dc46946
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfig.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp.configuration;
+
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.validators.NotEmpty;
+
+/**
+ * Attributes for FTP connector TO configuration.
+ */
+@ConfigClass
+public class ToJobConfig {
+
+ /**
+ * Directory on FTP server to write data to.
+ */
+ @Input(size = 260, validators = {@Validator(NotEmpty.class)})
+ public String outputDirectory;
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfiguration.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfiguration.java
new file mode 100644
index 0000000..636afbb
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfiguration.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Config;
+
+/**
+ * Class to encapsulate TO configuration.
+ */
+@ConfigurationClass
+public class ToJobConfiguration {
+ @Config
+ public ToJobConfig toJobConfig;
+
+ public ToJobConfiguration() {
+ toJobConfig = new ToJobConfig();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/ftpclient/FtpConnectorClient.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/ftpclient/FtpConnectorClient.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/ftpclient/FtpConnectorClient.java
new file mode 100644
index 0000000..7a65173
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/ftpclient/FtpConnectorClient.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.ftp.ftpclient;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPReply;
+import org.apache.log4j.Logger;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ftp.FtpConstants;
+import org.apache.sqoop.connector.ftp.FtpConnectorError;
+import org.apache.sqoop.etl.io.DataReader;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+
+/**
+ * Class encapsulating functionality to interact with an FTP server. This class
+ * uses the Apache Commons Net libraries to provide the FTP functionality. See
+ * http://commons.apache.org/proper/commons-net/.
+ */
+public class FtpConnectorClient {
+
+ /**
+ * Apache Commons Net FTP client.
+ */
+ private FTPClient ftpClient = null;
+
+ /**
+ * Hostname for FTP server.
+ */
+ private String ftpServer = null;
+
+ /**
+ * Port for FTP server.
+ */
+ private int ftpPort = FtpConstants.DEFAULT_PORT;
+
+ private static final Logger LOG = Logger.getLogger(FtpConnectorClient.class);
+
+ /**
+ * Constructor to initialize client.
+ *
+ * @param server Hostname of FTP server.
+ * @param port Port number of FTP server. Pass in null to use default port
+ * of 21.
+ */
+ public FtpConnectorClient(String server, Integer port) {
+ ftpClient = new FTPClient();
+ ftpServer = server;
+ if (port != null) {
+ ftpPort = port.intValue();
+ }
+ }
+
+ /**
+ * Connect to the FTP server.
+ *
+ * @param username Username for server login.
+ * @param pass Password for server login.
+ *
+ * @throws SqoopException Thrown if an error occurs while connecting to server.
+ */
+ public void connect(String username, String pass)
+ throws SqoopException {
+
+ try {
+ ftpClient.connect(ftpServer, ftpPort);
+ LOG.info(getServerReplyAsString());
+ int replyCode = ftpClient.getReplyCode();
+ if (!FTPReply.isPositiveCompletion(replyCode)) {
+ ftpClient.disconnect();
+ LOG.error("Operation failed. Server reply code: " + replyCode);
+ throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0001,
+ "Server reply code=" + replyCode);
+ }
+
+ boolean success = ftpClient.login(username, pass);
+ LOG.info(getServerReplyAsString());
+ if (!success) {
+ ftpClient.disconnect();
+ LOG.error("Could not login to the server" + ftpServer);
+ throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0001);
+ } else {
+ LOG.info("logged into " + ftpServer);
+ }
+ } catch (IOException e) {
+ LOG.error("Caught IOException connecting to FTP server: " +
+ e.getMessage());
+ throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0001,
+ "Caught IOException: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Log out and disconnect from FTP server.
+ *
+ * @throws SqoopException Thrown if an error occurs while disconnecting from
+ * server.
+ */
+ public void disconnect()
+ throws SqoopException {
+ try {
+ ftpClient.logout();
+ ftpClient.disconnect();
+ } catch (IOException e) {
+ LOG.error("Caught IOException disconnecting from FTP server: " +
+ e.getMessage());
+ throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0002,
+ "Caught IOException: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Stream records to a file on the FTP server.
+ *
+ * @param reader a DataReader object containing data passed from source
+ * connector.
+ * @param path directory on FTP server to write files to.
+ *
+ * @return Number of records written in call to this method.
+ *
+ * @throws SqoopException thrown if error occurs during interaction with
+ * FTP server.
+ * @throws Exception thrown if error occurs in DataReader.
+ */
+ public long uploadStream(DataReader reader, String path)
+ throws SqoopException, Exception {
+
+ OutputStream output = null;
+
+ long recordsWritten = 0;
+
+ try {
+ output = ftpClient.storeFileStream(path);
+ if (!FTPReply.isPositivePreliminary(ftpClient.getReplyCode())) {
+ LOG.error("File transfer failed, server reply=" +
+ getServerReplyAsString());
+ throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0003,
+ getServerReplyAsString());
+ } else {
+ String record;
+ while ((record = reader.readTextRecord()) != null) {
+ LOG.info("Writing record to FTP server:" + record);
+ output.write(record.getBytes());
+ output.write(("\n").getBytes());
+ recordsWritten++;
+ }
+
+ output.close();
+ if (!ftpClient.completePendingCommand()) {
+ LOG.error("File transfer failed, server reply=" +
+ getServerReplyAsString());
+ throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0003,
+ getServerReplyAsString());
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Caught IOException: " + e.getMessage());
+ throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0003,
+ "Caught IOException: " + e.getMessage(), e);
+ } finally {
+ try {
+ if (output != null) {
+ output.close();
+ }
+ } catch (IOException e) {
+ LOG.error("Caught IOException closing FTP output stream: " +
+ e.getMessage());
+ // Throw this in case there was an underlying issue with closing the stream:
+ throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0003,
+ "Caught IOException closing output stream to FTP server: "
+ + e.getMessage(), e);
+ }
+ }
+
+ return recordsWritten;
+ }
+
+ /**
+ * Turn a collection of reply strings from the FTP server into a single string.
+ * @return String containing a concatenation of replies from the server.
+ */
+ private String getServerReplyAsString() {
+ String[] replies = ftpClient.getReplyStrings();
+ return StringUtils.join(replies);
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/resources/ftp-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/resources/ftp-connector-config.properties b/connector/connector-ftp/src/main/resources/ftp-connector-config.properties
new file mode 100644
index 0000000..d84157f
--- /dev/null
+++ b/connector/connector-ftp/src/main/resources/ftp-connector-config.properties
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# FTP Connector Resources
+
+############################
+
+# Link Config
+linkConfig.label = Link configuration
+linkConfig.help = Parameters required to connect to an FTP server.
+
+# FTP server hostname
+linkConfig.server.label = FTP server hostname
+linkConfig.server.help = Hostname for the FTP server.
+
+# FTP server port
+linkConfig.port.label = FTP server port (21)
+linkConfig.port.help = Port for the FTP server. 21 by default.
+
+# username string
+linkConfig.username.label = Username
+linkConfig.username.help = Enter the username to be used for connecting to the \
+ FTP server.
+
+# password string
+linkConfig.password.label = Password
+linkConfig.password.help = Enter the password to be used for connecting to the \
+ FTP server.
+
+# To Job Config
+#
+toJobConfig.label = To FTP configuration
+toJobConfig.help = Parameters required to store data on the FTP server.
+
+toJobConfig.outputDirectory.label = Output directory
+toJobConfig.outputDirectory.help = Directory on the FTP server to write data to.
+
+toJobConfig.ignored.label = Ignored
+toJobConfig.ignored.help = This value is ignored.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/resources/log4j.properties b/connector/connector-ftp/src/main/resources/log4j.properties
new file mode 100644
index 0000000..c62f102
--- /dev/null
+++ b/connector/connector-ftp/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set root logger level to INFO and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/resources/sqoopconnector.properties
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/resources/sqoopconnector.properties b/connector/connector-ftp/src/main/resources/sqoopconnector.properties
new file mode 100644
index 0000000..0864f9f
--- /dev/null
+++ b/connector/connector-ftp/src/main/resources/sqoopconnector.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# FTP Connector Properties
+org.apache.sqoop.connector.class = org.apache.sqoop.connector.ftp.FtpConnector
+org.apache.sqoop.connector.name = ftp-connector
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java
new file mode 100644
index 0000000..33c808a
--- /dev/null
+++ b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration;
+import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.job.etl.LoaderContext;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.mockftpserver.fake.filesystem.DirectoryEntry;
+import org.mockftpserver.fake.filesystem.FileSystem;
+import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;
+import org.mockftpserver.fake.FakeFtpServer;
+import org.mockftpserver.fake.UserAccount;
+
+/**
+ * Unit tests for {@link org.apache.sqoop.connector.ftp.Loader} class.
+ *
+ * This uses MockFtpServer (http://mockftpserver.sourceforge.net/) to provide
+ * a mock FTP server implementation.
+ */
+public class TestFtpLoader {
+
+ private FakeFtpServer fakeFtpServer;
+ private int port;
+ private String username = "user";
+ private String password = "pass";
+ private FtpLoader loader;
+
+ @Test
+ public void testLoader() {
+
+ final int NUMBER_OF_ROWS = 1000;
+
+ DataReader reader = new DataReader() {
+ private long index = 0L;
+ @Override
+ public Object[] readArrayRecord() {
+ return null;
+ }
+
+ @Override
+ public String readTextRecord() {
+ if (index++ < NUMBER_OF_ROWS) {
+ return index + "," + (double)index + ",'" + index + "'";
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Object readContent() {
+ return null;
+ }
+ };
+
+ try {
+ LoaderContext context = new LoaderContext(null, reader, null);
+ LinkConfiguration linkConfig = new LinkConfiguration();
+ linkConfig.linkConfig.username = username;
+ linkConfig.linkConfig.password = password;
+ linkConfig.linkConfig.server = "localhost";
+ linkConfig.linkConfig.port = port;
+ ToJobConfiguration jobConfig = new ToJobConfiguration();
+ jobConfig.toJobConfig.outputDirectory = "uploads";
+ loader.load(context, linkConfig, jobConfig);
+ Long rowsWritten = loader.getRowsWritten();
+ Assert.assertTrue(rowsWritten == NUMBER_OF_ROWS,
+ ("actual rows written=" + rowsWritten + " instead of " +
+ NUMBER_OF_ROWS));
+ } catch(Exception e) {
+ Assert.fail("caught exception: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Create mock FTP server for testing, and add a user account for testing.
+ */
+ @BeforeClass(alwaysRun = true)
+ public void setUp() throws Exception {
+
+ loader = new FtpLoader();
+
+ fakeFtpServer = new FakeFtpServer();
+ fakeFtpServer.setServerControlPort(0);
+
+ FileSystem fileSystem = new UnixFakeFileSystem();
+ fileSystem.add(new DirectoryEntry("/uploads"));
+ fakeFtpServer.setFileSystem(fileSystem);
+
+ UserAccount userAccount = new UserAccount(username, password, "/");
+ fakeFtpServer.addUserAccount(userAccount);
+
+ fakeFtpServer.start();
+ port = fakeFtpServer.getServerControlPort();
+ }
+
+ /**
+ * Stop mock FTP server.
+ */
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ fakeFtpServer.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestLinkConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestLinkConfiguration.java b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestLinkConfiguration.java
new file mode 100644
index 0000000..b926a1d
--- /dev/null
+++ b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestLinkConfiguration.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp.configuration;
+
+import org.apache.sqoop.validation.ConfigValidationResult;
+import org.apache.sqoop.validation.ConfigValidationRunner;
+import org.apache.sqoop.validation.Status;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.mockftpserver.fake.filesystem.FileEntry;
+import org.mockftpserver.fake.filesystem.FileSystem;
+import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;
+import org.mockftpserver.fake.FakeFtpServer;
+import org.mockftpserver.fake.UserAccount;
+
+/**
+ * Unit tests for {@link org.apache.sqoop.connector.ftp.configuration.LinkConfiguration}.
+ *
+ * This uses MockFtpServer (http://mockftpserver.sourceforge.net/) to provide
+ * a mock FTP server implementation.
+ */
+public class TestLinkConfiguration {
+
+ private FakeFtpServer fakeFtpServer;
+ private int port;
+ private String username = "user";
+ private String password = "pass";
+
+ /**
+ * Test valid configuration.
+ */
+ @Test
+ public void testValidConfig() {
+ ConfigValidationRunner runner = new ConfigValidationRunner();
+ ConfigValidationResult result;
+ LinkConfiguration config = new LinkConfiguration();
+ config.linkConfig.username = username;
+ config.linkConfig.password = password;
+ config.linkConfig.server = "localhost";
+ config.linkConfig.port = port;
+ result = runner.validate(config);
+ Assert.assertTrue(result.getStatus() == Status.OK,
+ "Test of valid configuration failed");
+ }
+
+ /**
+ * Test empty username.
+ */
+ @Test
+ public void testEmptyUsername() {
+ ConfigValidationRunner runner = new ConfigValidationRunner();
+ ConfigValidationResult result;
+ LinkConfiguration config = new LinkConfiguration();
+ config.linkConfig.username = "";
+ config.linkConfig.password = password;
+ config.linkConfig.server = "localhost";
+ config.linkConfig.port = port;
+ result = runner.validate(config);
+ Assert.assertFalse(result.getStatus() == Status.OK,
+ "Test of empty username failed");
+ }
+
+ /**
+ * Test invalid username.
+ */
+ @Test
+ public void testInvalidUsername() {
+ ConfigValidationRunner runner = new ConfigValidationRunner();
+ ConfigValidationResult result;
+ LinkConfiguration config = new LinkConfiguration();
+ config.linkConfig.username = "baduser";
+ config.linkConfig.password = password;
+ config.linkConfig.server = "localhost";
+ config.linkConfig.port = port;
+ result = runner.validate(config);
+ Assert.assertFalse(result.getStatus() == Status.OK,
+ "Test of invalid username failed");
+ }
+
+ /**
+ * Test empty server.
+ */
+ @Test
+ public void TestEmptyServer() {
+ ConfigValidationRunner runner = new ConfigValidationRunner();
+ ConfigValidationResult result;
+ LinkConfiguration config = new LinkConfiguration();
+ config.linkConfig.username = username;
+ config.linkConfig.password = password;
+ config.linkConfig.server = "";
+ config.linkConfig.port = port;
+ result = runner.validate(config);
+ Assert.assertFalse(result.getStatus() == Status.OK,
+ "Test of empty server name failed");
+ }
+
+ /**
+ * Create mock FTP server for testing, and add a user account for testing.
+ */
+ @BeforeClass(alwaysRun = true)
+ public void setUp() throws Exception {
+ fakeFtpServer = new FakeFtpServer();
+ fakeFtpServer.setServerControlPort(0);
+
+ FileSystem fileSystem = new UnixFakeFileSystem();
+ fileSystem.add(new FileEntry("/home/user/file.txt", "abcdef"));
+ fakeFtpServer.setFileSystem(fileSystem);
+
+ UserAccount userAccount = new UserAccount(username, password, "/");
+ fakeFtpServer.addUserAccount(userAccount);
+
+ fakeFtpServer.start();
+ port = fakeFtpServer.getServerControlPort();
+ }
+
+ /**
+ * Stop mock FTP server.
+ */
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ fakeFtpServer.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestToJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestToJobConfiguration.java b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestToJobConfiguration.java
new file mode 100644
index 0000000..2870a16
--- /dev/null
+++ b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestToJobConfiguration.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp.configuration;
+
+import org.apache.sqoop.validation.ConfigValidationResult;
+import org.apache.sqoop.validation.ConfigValidationRunner;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for {@link org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration} class.
+ */
+public class TestToJobConfiguration {
+
+ /**
+ * Test a non-empty directory name.
+ */
+ @Test
+ public void testValidDirectory() {
+ ConfigValidationRunner runner = new ConfigValidationRunner();
+ ConfigValidationResult result;
+ ToJobConfiguration config = new ToJobConfiguration();
+ config.toJobConfig.outputDirectory = "testdir";
+ result = runner.validate(config);
+ Assert.assertTrue(result.getStatus().canProceed(),
+ "Test of valid directory failed");
+ }
+
+ /**
+ * Test an invalid, empty directory name.
+ */
+ @Test
+ public void testEmptyDirectory() {
+ ConfigValidationRunner runner = new ConfigValidationRunner();
+ ConfigValidationResult result;
+ ToJobConfiguration config = new ToJobConfiguration();
+ config.toJobConfig.outputDirectory = "";
+ result = runner.validate(config);
+ Assert.assertFalse(result.getStatus().canProceed(),
+ "Test of empty directory failed");
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/ftpclient/TestFtpConnectorClient.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/ftpclient/TestFtpConnectorClient.java b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/ftpclient/TestFtpConnectorClient.java
new file mode 100644
index 0000000..b9430b3
--- /dev/null
+++ b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/ftpclient/TestFtpConnectorClient.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp.ftpclient;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.etl.io.DataReader;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.mockftpserver.fake.filesystem.DirectoryEntry;
+import org.mockftpserver.fake.filesystem.FileSystem;
+import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;
+import org.mockftpserver.fake.FakeFtpServer;
+import org.mockftpserver.fake.UserAccount;
+
+/**
+ * Unit tests for {@link org.apache.sqoop.connector.ftp.ftpclient.FtpConnectorClient} class.
+ *
+ * This uses MockFtpServer (http://mockftpserver.sourceforge.net/) to provide
+ * a mock FTP server implementation.
+ *
+ * Note that this duplicates other tests currently, but leaving for now in case
+ * additional functionality is added to FtpConnectorClient.
+ */
+public class TestFtpConnectorClient {
+
+ private FakeFtpServer fakeFtpServer;
+ private int port;
+ private String username = "user";
+ private String password = "pass";
+
+ /**
+ * Test connect and login to FTP server.
+ */
+ @Test
+ public void testValidLogin() {
+ try {
+ FtpConnectorClient client = new FtpConnectorClient("localhost", port);
+ client.connect(username, password);
+ client.disconnect();
+ } catch (SqoopException e) {
+ Assert.fail("login failed " + e.getMessage());
+ }
+ }
+
+ /**
+ * Test invalid login to FTP server.
+ */
+ @Test
+ public void testInvalidLogin() {
+ try {
+ FtpConnectorClient client = new FtpConnectorClient("localhost", port);
+ client.connect("baduser", "badpass");
+ client.disconnect();
+ Assert.fail("expected exception for invalid login");
+ } catch (SqoopException e) {
+ // Expected
+ }
+ }
+
+ /**
+ * Test streaming upload.
+ */
+ @Test
+ public void testUploadStream() {
+
+ final int NUMBER_OF_ROWS = 1000;
+
+ DataReader reader = new DataReader() {
+ private long index = 0L;
+ @Override
+ public Object[] readArrayRecord() {
+ return null;
+ }
+
+ @Override
+ public String readTextRecord() {
+ if (index++ < NUMBER_OF_ROWS) {
+ return index + "," + (double)index + ",'" + index + "'";
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Object readContent() {
+ return null;
+ }
+ };
+
+ try {
+ FtpConnectorClient client = new FtpConnectorClient("localhost", port);
+ client.connect(username, password);
+ long rowsWritten = client.uploadStream(reader, "/uploads/test.txt");
+ client.disconnect();
+ Assert.assertTrue(rowsWritten == NUMBER_OF_ROWS,
+ ("actual rows written=" + rowsWritten + " instead of " +
+ NUMBER_OF_ROWS));
+ } catch(Exception e) {
+ Assert.fail("caught exception: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Create mock FTP server for testing, and add a user account for testing.
+ */
+ @BeforeClass(alwaysRun = true)
+ public void setUp() throws Exception {
+ fakeFtpServer = new FakeFtpServer();
+ fakeFtpServer.setServerControlPort(0);
+
+ FileSystem fileSystem = new UnixFakeFileSystem();
+ fileSystem.add(new DirectoryEntry("/uploads"));
+ fakeFtpServer.setFileSystem(fileSystem);
+
+ UserAccount userAccount = new UserAccount(username, password, "/");
+ fakeFtpServer.addUserAccount(userAccount);
+
+ fakeFtpServer.start();
+ port = fakeFtpServer.getServerControlPort();
+ }
+
+ /**
+ * Stop mock FTP server.
+ */
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ fakeFtpServer.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/test/resources/log4j.properties b/connector/connector-ftp/src/test/resources/log4j.properties
new file mode 100644
index 0000000..44ffced
--- /dev/null
+++ b/connector/connector-ftp/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=DEBUG, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/pom.xml
----------------------------------------------------------------------
diff --git a/connector/pom.xml b/connector/pom.xml
index c999061..1b69180 100644
--- a/connector/pom.xml
+++ b/connector/pom.xml
@@ -38,6 +38,7 @@ limitations under the License.
<module>connector-hdfs</module>
<module>connector-kite</module>
<module>connector-kafka</module>
+ <module>connector-ftp</module>
<module>connector-sftp</module>
<!-- Uncomment and finish connectors after sqoop framework will become stable
<module>connector-mysql-jdbc</module>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/docs/src/site/sphinx/Connectors.rst
----------------------------------------------------------------------
diff --git a/docs/src/site/sphinx/Connectors.rst b/docs/src/site/sphinx/Connectors.rst
index 4e24793..721e92a 100644
--- a/docs/src/site/sphinx/Connectors.rst
+++ b/docs/src/site/sphinx/Connectors.rst
@@ -459,3 +459,66 @@ Loader
------
During the *loading* phase, the connector will create uniquely named files in the *output directory* for each partition of data received from the **FROM** connector.
+
+++++++++++++++
+FTP Connector
+++++++++++++++
+
+The FTP connector supports moving data between an FTP server and other supported Sqoop2 connectors.
+
+Currently only the TO direction is supported to write records to an FTP server. A FROM connector is pending (SQOOP-2127).
+
+-----
+Usage
+-----
+
+To use the FTP Connector, create a link for the connector and a job that uses the link.
+
+**Link Configuration**
+++++++++++++++++++++++
+
+Inputs associated with the link configuration include:
+
++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
+| Input | Type | Description | Example |
++=============================+=========+=======================================================================+============================+
+| FTP server hostname | String | Hostname for the FTP server. | ftp.example.com |
+| | | *Required*. | |
++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
+| FTP server port | Integer | Port number for the FTP server. Defaults to 21. | 2100 |
+| | | *Optional*. | |
++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
+| Username | String | The username to provide when connecting to the FTP server. | sqoop |
+| | | *Required*. | |
++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
+| Password | String | The password to provide when connecting to the FTP server. | sqoop |
+| | | *Required* | |
++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
+
+**Notes**
+=========
+
+1. The FTP connector will attempt to connect to the FTP server as part of the link validation process. If for some reason a connection can not be established, you'll see a corresponding warning message.
+
+**TO Job Configuration**
+++++++++++++++++++++++++
+
+Inputs associated with the Job configuration for the TO direction include:
+
++-----------------------------+---------+-------------------------------------------------------------------------+-----------------------------------+
+| Input | Type | Description | Example |
++=============================+=========+=========================================================================+===================================+
+| Output directory | String | The location on the FTP server that the connector will write files to. | uploads |
+| | | *Required* | |
++-----------------------------+---------+-------------------------------------------------------------------------+-----------------------------------+
+
+**Notes**
+=========
+
+1. The *output directory* value needs to be an existing directory on the FTP server.
+
+------
+Loader
+------
+
+During the *loading* phase, the connector will create uniquely named files in the *output directory* for each partition of data received from the **FROM** connector.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 955276a..531009a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -319,6 +319,17 @@ limitations under the License.
</dependency>
<dependency>
<groupId>org.apache.sqoop.connector</groupId>
+ <artifactId>sqoop-connector-ftp</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sqoop.connector</groupId>
+ <artifactId>sqoop-connector-ftp</artifactId>
+ <type>test-jar</type>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sqoop.connector</groupId>
<artifactId>sqoop-connector-kite</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 416b5c0..aabefc0 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -99,6 +99,11 @@ limitations under the License.
</dependency>
<dependency>
+ <groupId>org.apache.sqoop.connector</groupId>
+ <artifactId>sqoop-connector-ftp</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>