You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/05/15 20:19:09 UTC

sqoop git commit: SQOOP-2126: Sqoop2: Implement FTP TO Connector Support

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 9ee44d4f0 -> bf4ae0b3c


SQOOP-2126: Sqoop2: Implement FTP TO Connector Support

(Jonathan Seidman via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/bf4ae0b3
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/bf4ae0b3
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/bf4ae0b3

Branch: refs/heads/sqoop2
Commit: bf4ae0b3c77f5ffc037193cb12695a450b30d822
Parents: 9ee44d4
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Fri May 15 11:18:35 2015 -0700
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Fri May 15 11:18:35 2015 -0700

----------------------------------------------------------------------
 connector/connector-ftp/pom.xml                 |  92 +++++++++
 .../sqoop/connector/ftp/FtpConnector.java       | 137 ++++++++++++
 .../sqoop/connector/ftp/FtpConnectorError.java  |  52 +++++
 .../connector/ftp/FtpConnectorUpgrader.java     |  46 +++++
 .../sqoop/connector/ftp/FtpConstants.java       |  37 ++++
 .../apache/sqoop/connector/ftp/FtpLoader.java   |  74 +++++++
 .../sqoop/connector/ftp/FtpToDestroyer.java     |  41 ++++
 .../sqoop/connector/ftp/FtpToInitializer.java   |  44 ++++
 .../connector/ftp/configuration/LinkConfig.java |  76 +++++++
 .../ftp/configuration/LinkConfiguration.java    |  34 +++
 .../ftp/configuration/ToJobConfig.java          |  36 ++++
 .../ftp/configuration/ToJobConfiguration.java   |  34 +++
 .../ftp/ftpclient/FtpConnectorClient.java       | 207 +++++++++++++++++++
 .../resources/ftp-connector-config.properties   |  51 +++++
 .../src/main/resources/log4j.properties         |  24 +++
 .../main/resources/sqoopconnector.properties    |  18 ++
 .../sqoop/connector/ftp/TestFtpLoader.java      | 125 +++++++++++
 .../configuration/TestLinkConfiguration.java    | 142 +++++++++++++
 .../configuration/TestToJobConfiguration.java   |  58 ++++++
 .../ftp/ftpclient/TestFtpConnectorClient.java   | 148 +++++++++++++
 .../src/test/resources/log4j.properties         |  24 +++
 connector/pom.xml                               |   1 +
 docs/src/site/sphinx/Connectors.rst             |  63 ++++++
 pom.xml                                         |  11 +
 server/pom.xml                                  |   5 +
 25 files changed, 1580 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/pom.xml b/connector/connector-ftp/pom.xml
new file mode 100644
index 0000000..9a70dfc
--- /dev/null
+++ b/connector/connector-ftp/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.sqoop</groupId>
+    <artifactId>connector</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.sqoop.connector</groupId>
+  <artifactId>sqoop-connector-ftp</artifactId>
+  <name>Sqoop FTP Connector</name>
+
+  <properties>
+    <slf4j.version>1.6.1</slf4j.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>connector-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-net</groupId>
+      <artifactId>commons-net</artifactId>
+      <version>3.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockftpserver</groupId>
+      <artifactId>MockFtpServer</artifactId>
+      <version>2.4</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+
+    <dependency>
+        <groupId>org.slf4j</groupId>
+        <artifactId>slf4j-log4j12</artifactId>
+        <version>${slf4j.version}</version>
+      </dependency>
+  </dependencies>
+
+  <build>
+    <finalName>sqoop</finalName>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnector.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnector.java
new file mode 100644
index 0000000..ffef1bf
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnector.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.VersionInfo;
+import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+/**
+ * Implementation of a Sqoop 2 connector to support data movement to/from an
+ * FTP server.
+ */
+public class FtpConnector extends SqoopConnector {
+
+  /**
+   * Define the TO instance.
+   */
+  private static final To TO = new To(FtpToInitializer.class,
+                                      FtpLoader.class,
+                                      FtpToDestroyer.class);
+
+  /**
+   * {@inheritDoc}
+   *
+   * Since this is a built-in connector it will return the same version as
+   * the rest of the Sqoop code.
+   */
+  @Override
+  public String getVersion() {
+     return VersionInfo.getBuildVersion();
+  }
+
+  /**
+   * Return the configuration resource bundle for this connector.
+   *
+   * @param locale The Locale object.
+   *
+   * @return The resource bundle associated with the input locale.
+   */
+  @Override
+  public ResourceBundle getBundle(Locale locale) {
+    return ResourceBundle.getBundle(FtpConstants.RESOURCE_BUNDLE_NAME, locale);
+  }
+
+  /**
+   * Get the class encapsulating link configuration for this connector.
+   *
+   * @return The link configuration class for this connector.
+   */
+  @Override
+  public Class getLinkConfigurationClass() {
+    return LinkConfiguration.class;
+  }
+
+  /**
+   * Get the appropriate job configuration class for the input direction.
+   *
+   * @param direction Whether to return TO or FROM configuration class.
+   *
+   * @return Job configuration class for given direction.
+   */
+  @Override
+  public Class getJobConfigurationClass(Direction direction) {
+    switch (direction) {
+      case TO:
+        return ToJobConfiguration.class;
+      default:
+        return null;
+    }
+  }
+
+  /**
+   * Get the object which defines classes for performing import jobs.
+   *
+   * @return the From object defining classes for performing import.
+   */
+  @Override
+  public From getFrom() {
+    return null;
+  }
+
+  /**
+   * Get the object which defines classes for performing export jobs.
+   *
+   * @return the To object defining classes for performing export.
+   */
+  @Override
+  public To getTo() {
+    return TO;
+  }
+
+  /**
+   * Returns an {@linkplain org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader}
+   * object that can upgrade the connection and job configs.
+   *
+   * @return configurable upgrader object
+   */
+  @Override
+  public ConnectorConfigurableUpgrader getConfigurableUpgrader() {
+    return new FtpConnectorUpgrader();
+  }
+
+  /**
+   * Return a List of directions supported by this connector.
+   *
+   * @return list of enums representing supported directions.
+   */
+  public List<Direction> getSupportedDirections() {
+    return Arrays.asList(Direction.TO);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorError.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorError.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorError.java
new file mode 100644
index 0000000..77c36ea
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorError.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.common.ErrorCode;
+
+/**
+ * Error messages for FTP connector.
+ */
+public enum FtpConnectorError implements ErrorCode {
+  FTP_CONNECTOR_0000("Unknown error occurred."),
+  FTP_CONNECTOR_0001("Error occurred connecting to FTP server."),
+  FTP_CONNECTOR_0002("Error occurred disconnecting from FTP server."),
+  FTP_CONNECTOR_0003("Error occurred transferring data to FTP server."),
+  FTP_CONNECTOR_0004("Unknown job type")
+  ;
+
+  private final String message;
+
+  private FtpConnectorError(String message) {
+    this.message = message;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public String getCode() {
+    return name();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorUpgrader.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorUpgrader.java
new file mode 100644
index 0000000..159ba4e
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConnectorUpgrader.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
+import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
+import org.apache.sqoop.model.MLinkConfig;
+import org.apache.sqoop.model.MToConfig;
+
+//NOTE: All config types have the similar upgrade path at this point
+public class FtpConnectorUpgrader extends ConnectorConfigurableUpgrader {
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void upgradeLinkConfig(MLinkConfig original, MLinkConfig upgradeTarget) {
+    ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(),
+                                      upgradeTarget.getConfigs());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void upgradeToJobConfig(MToConfig original, MToConfig upgradeTarget) {
+    ConfigurableUpgradeUtil.doUpgrade(original.getConfigs(),
+                                      upgradeTarget.getConfigs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConstants.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConstants.java
new file mode 100644
index 0000000..8e8b4ad
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpConstants.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.job.Constants;
+
+/**
+ * Constants for FTP connector.
+ */
+public final class FtpConstants extends Constants {
+
+  /**
+   * Name of resource bundle for configuring this connector.
+   */
+  public static final String RESOURCE_BUNDLE_NAME = "ftp-connector-config";
+
+  /**
+   * Default port for FTP.
+   */
+  public static final int DEFAULT_PORT = 21;
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpLoader.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpLoader.java
new file mode 100644
index 0000000..0ed1eee
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpLoader.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.ftp.ftpclient.FtpConnectorClient;
+import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
+
+import java.util.UUID;
+
+/**
+ * Class to receive data from a From instance and load to a To instance.
+ */
+public class FtpLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
+
+  /**
+   * Number of records written by last call to load() method.
+   */
+  private long rowsWritten = 0;
+
+  /**
+   * Load data to target directory on FTP server.
+   *
+   * @param context Loader context object
+   * @param linkConfiguration Link configuration
+   * @param toJobConfig Job configuration
+   * @throws Exception Re-thrown from FTP client code.
+   */
+  @Override
+  public void load(LoaderContext context, LinkConfiguration linkConfiguration,
+                   ToJobConfiguration toJobConfig) throws Exception {
+    DataReader reader = context.getDataReader();
+    String outputDir = toJobConfig.toJobConfig.outputDirectory;
+    // Create a unique filename for writing records, since this method will
+    // likely get called multiple times for a single source file/dataset:
+    String path = outputDir + "/" + UUID.randomUUID() + ".txt";
+
+    FtpConnectorClient client =
+      new FtpConnectorClient(linkConfiguration.linkConfig.server,
+                             linkConfiguration.linkConfig.port);
+    client.connect(linkConfiguration.linkConfig.username,
+                   linkConfiguration.linkConfig.password);
+    rowsWritten = client.uploadStream(reader, path);
+    client.disconnect();
+  }
+
+  /**
+   * Return the number of rows witten by the last call to load() method.
+   *
+   * @return Number of rows written by call to loader.
+   */
+  @Override
+  public long getRowsWritten() {
+    return rowsWritten;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToDestroyer.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToDestroyer.java
new file mode 100644
index 0000000..50208ce
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToDestroyer.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+/**
+ * Perform any clean up, etc. tasks when the Sqoop execution completes.
+ */
+public class FtpToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> {
+  /**
+   * Callback to clean up after job execution.
+   *
+   * @param context Destroyer context
+   * @param linkConfig link configuration object
+   * @param jobConfig TO job configuration object
+   */
+  @Override
+  public void destroy(DestroyerContext context, LinkConfiguration linkConfig,
+      ToJobConfiguration jobConfig) {
+    // do nothing at this point
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToInitializer.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToInitializer.java
new file mode 100644
index 0000000..96a100d
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/FtpToInitializer.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+
+/**
+ * Perform any required initialization before execution of job.
+ */
+public class FtpToInitializer extends Initializer<LinkConfiguration, ToJobConfiguration> {
+
+  /**
+   * Initialize new submission based on given configuration properties. Any
+   * needed temporary values might be saved to context object and they will be
+   * promoted to all other part of the workflow automatically.
+   *
+   * @param context Initializer context object
+   * @param linkConfig link configuration object
+   * @param jobConfig TO job configuration object
+   */
+  @Override
+  public void initialize(InitializerContext context, LinkConfiguration linkConfig,
+      ToJobConfiguration jobConfig) {
+    // do nothing at this point
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfig.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfig.java
new file mode 100644
index 0000000..ed9c2cc
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfig.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp.configuration;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ftp.ftpclient.FtpConnectorClient;
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.validators.AbstractValidator;
+import org.apache.sqoop.validation.validators.NotEmpty;
+
+/**
+ * Attributes for FTP connector link configuration.
+ */
+@ConfigClass(validators = {@Validator(LinkConfig.ConfigValidator.class)})
+public class LinkConfig {
+
+  /**
+   * FTP server hostname.
+   */
+  @Input(size = 256, validators = {@Validator(NotEmpty.class)})
+  public String server;
+
+  /**
+   * FTP server port. Default is port 21.
+   */
+  @Input
+  public Integer port;
+
+  /**
+   * Username for server login.
+   */
+  @Input(size = 256, validators = {@Validator(NotEmpty.class)})
+  public String username;
+
+  /**
+   * Password for server login.
+   */
+  @Input(size = 256, sensitive = true)
+  public String password;
+
+  /**
+   * Validate that we can log into the server using the supplied credentials.
+   */
+  public static class ConfigValidator extends AbstractValidator<LinkConfig> {
+    @Override
+    public void validate(LinkConfig linkConfig) {
+      try {
+        FtpConnectorClient client =
+          new FtpConnectorClient(linkConfig.server, linkConfig.port);
+        client.connect(linkConfig.username, linkConfig.password);
+        client.disconnect();
+      } catch (SqoopException e) {
+        addMessage(Status.WARNING, "Can't connect to the FTP server " +
+                   linkConfig.server + " error is " + e.getMessage());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfiguration.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfiguration.java
new file mode 100644
index 0000000..60f1836
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/LinkConfiguration.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp.configuration;
+
+import org.apache.sqoop.model.Config;
+import org.apache.sqoop.model.ConfigurationClass;
+
+/**
+ * Class to encapsulate link attributes for FTP connector.
+ */
+@ConfigurationClass
+public class LinkConfiguration {
+  @Config
+  public LinkConfig linkConfig;
+
+  public LinkConfiguration() {
+    linkConfig = new LinkConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfig.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfig.java
new file mode 100644
index 0000000..dc46946
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfig.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp.configuration;
+
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.validators.NotEmpty;
+
+/**
+ * Attributes for FTP connector TO configuration.
+ */
+@ConfigClass
+public class ToJobConfig {
+
+  /**
+   * Directory on FTP server to write data to.
+   */
+  @Input(size = 260, validators = {@Validator(NotEmpty.class)})
+  public String outputDirectory;
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfiguration.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfiguration.java
new file mode 100644
index 0000000..636afbb
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/configuration/ToJobConfiguration.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Config;
+
+/**
+ * Class to encapsulate TO configuration.
+ */
+@ConfigurationClass
+public class ToJobConfiguration {
+    @Config
+    public ToJobConfig toJobConfig;
+
+    public ToJobConfiguration() {
+      toJobConfig = new ToJobConfig();
+    }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/ftpclient/FtpConnectorClient.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/ftpclient/FtpConnectorClient.java b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/ftpclient/FtpConnectorClient.java
new file mode 100644
index 0000000..7a65173
--- /dev/null
+++ b/connector/connector-ftp/src/main/java/org/apache/sqoop/connector/ftp/ftpclient/FtpConnectorClient.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.ftp.ftpclient;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPReply;
+import org.apache.log4j.Logger;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ftp.FtpConstants;
+import org.apache.sqoop.connector.ftp.FtpConnectorError;
+import org.apache.sqoop.etl.io.DataReader;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+
+/**
+ * Class encapsulating functionality to interact with an FTP server. This class
+ * uses the Apache Commons Net libraries to provide the FTP functionality. See
+ * http://commons.apache.org/proper/commons-net/.
+ */
+public class FtpConnectorClient {
+
+  /**
+   * Apache Commons Net FTP client.
+   */
+  private FTPClient ftpClient = null;
+
+  /**
+   * Hostname for FTP server.
+   */
+  private String ftpServer = null;
+
+  /**
+   * Port for FTP server.
+   */
+  private int ftpPort = FtpConstants.DEFAULT_PORT;
+
+  private static final Logger LOG = Logger.getLogger(FtpConnectorClient.class);
+
+  /**
+   * Constructor to initialize client.
+   *
+   * @param server Hostname of FTP server.
+   * @param port Port number of FTP server. Pass in null to use default port
+   * of 21.
+   */
+  public FtpConnectorClient(String server, Integer port) {
+    ftpClient = new FTPClient();
+    ftpServer = server;
+    if (port != null) {
+      ftpPort = port.intValue();
+    }
+  }
+
+  /**
+   * Connect to the FTP server.
+   *
+   * @param username Username for server login.
+   * @param pass Password for server login.
+   *
+   * @throws SqoopException Thrown if an error occurs while connecting to server.
+   */
+  public void connect(String username, String pass)
+    throws SqoopException {
+
+    try {
+      ftpClient.connect(ftpServer, ftpPort);
+      LOG.info(getServerReplyAsString());
+      int replyCode = ftpClient.getReplyCode();
+      if (!FTPReply.isPositiveCompletion(replyCode)) {
+        ftpClient.disconnect();
+        LOG.error("Operation failed. Server reply code: " + replyCode);
+        throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0001,
+                                 "Server reply code=" + replyCode);
+      }
+
+      boolean success = ftpClient.login(username, pass);
+      LOG.info(getServerReplyAsString());
+      if (!success) {
+        ftpClient.disconnect();
+        LOG.error("Could not login to the server" + ftpServer);
+        throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0001);
+      } else {
+        LOG.info("logged into " + ftpServer);
+      }
+    } catch (IOException e) {
+      LOG.error("Caught IOException connecting to FTP server: " +
+                e.getMessage());
+      throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0001,
+                               "Caught IOException: " + e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Log out and disconnect from FTP server.
+   *
+   * @throws SqoopException Thrown if an error occurs while disconnecting from
+   * server.
+   */
+  public void disconnect()
+    throws SqoopException {
+    try {
+      ftpClient.logout();
+      ftpClient.disconnect();
+    } catch (IOException e) {
+      LOG.error("Caught IOException disconnecting from FTP server: " +
+                e.getMessage());
+      throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0002,
+                               "Caught IOException: " + e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Stream records to a file on the FTP server.
+   *
+   * @param reader a DataReader object containing data passed from source
+   * connector.
+   * @param path directory on FTP server to write files to.
+   *
+   * @return Number of records written in call to this method.
+   *
+   * @throws SqoopException thrown if error occurs during interaction with
+   * FTP server.
+   * @throws Exception thrown if error occurs in DataReader.
+   */
+  public long uploadStream(DataReader reader, String path)
+    throws SqoopException, Exception {
+
+    OutputStream output = null;
+
+    long recordsWritten = 0;
+
+    try {
+      output = ftpClient.storeFileStream(path);
+      if (!FTPReply.isPositivePreliminary(ftpClient.getReplyCode())) {
+        LOG.error("File transfer failed, server reply=" +
+                  getServerReplyAsString());
+        throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0003,
+                                 getServerReplyAsString());
+      } else {
+        String record;
+        while ((record = reader.readTextRecord()) != null) {
+          LOG.info("Writing record to FTP server:" + record);
+          output.write(record.getBytes());
+          output.write(("\n").getBytes());
+          recordsWritten++;
+        }
+
+        output.close();
+        if (!ftpClient.completePendingCommand()) {
+          LOG.error("File transfer failed, server reply=" +
+                    getServerReplyAsString());
+          throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0003,
+                                   getServerReplyAsString());
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Caught IOException: " + e.getMessage());
+      throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0003,
+                               "Caught IOException: " + e.getMessage(), e);
+    } finally {
+      try {
+        if (output != null) {
+          output.close();
+        }
+      } catch (IOException e) {
+        LOG.error("Caught IOException closing FTP output stream: " +
+                  e.getMessage());
+        // Throw this in case there was an underlying issue with closing the stream:
+        throw new SqoopException(FtpConnectorError.FTP_CONNECTOR_0003,
+                                 "Caught IOException closing output stream to FTP server: "
+                                 + e.getMessage(), e);
+      }
+    }
+
+    return recordsWritten;
+ }
+
+  /**
+   * Turn a collection of reply strings from the FTP server into a single string.
+   * @return String containing a concatenation of replies from the server.
+   */
+  private String getServerReplyAsString() {
+    String[] replies = ftpClient.getReplyStrings();
+    return StringUtils.join(replies);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/resources/ftp-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/resources/ftp-connector-config.properties b/connector/connector-ftp/src/main/resources/ftp-connector-config.properties
new file mode 100644
index 0000000..d84157f
--- /dev/null
+++ b/connector/connector-ftp/src/main/resources/ftp-connector-config.properties
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# FTP Connector Resources
+
+############################
+
+# Link Config
+linkConfig.label = Link configuration
+linkConfig.help = Parameters required to connect to an FTP server.
+
+# FTP server hostname
+linkConfig.server.label = FTP server hostname
+linkConfig.server.help = Hostname for the FTP server.
+
+# FTP server port
+linkConfig.port.label = FTP server port (21)
+linkConfig.port.help = Port for the FTP server. 21 by default.
+
+# username string
+linkConfig.username.label = Username
+linkConfig.username.help = Enter the username to be used for connecting to the \
+                 FTP server.
+
+# password string
+linkConfig.password.label = Password
+linkConfig.password.help = Enter the password to be used for connecting to the \
+                   FTP server.
+
+# To Job Config
+#
+toJobConfig.label = To FTP configuration
+toJobConfig.help = Parameters required to store data on the FTP server.
+
+toJobConfig.outputDirectory.label = Output directory
+toJobConfig.outputDirectory.help = Directory on the FTP server to write data to.
+
+toJobConfig.ignored.label = Ignored
+toJobConfig.ignored.help = This value is ignored.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/resources/log4j.properties b/connector/connector-ftp/src/main/resources/log4j.properties
new file mode 100644
index 0000000..c62f102
--- /dev/null
+++ b/connector/connector-ftp/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set root logger level to INFO and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/main/resources/sqoopconnector.properties
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/main/resources/sqoopconnector.properties b/connector/connector-ftp/src/main/resources/sqoopconnector.properties
new file mode 100644
index 0000000..0864f9f
--- /dev/null
+++ b/connector/connector-ftp/src/main/resources/sqoopconnector.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# FTP Connector Properties
+org.apache.sqoop.connector.class = org.apache.sqoop.connector.ftp.FtpConnector
+org.apache.sqoop.connector.name = ftp-connector

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java
new file mode 100644
index 0000000..33c808a
--- /dev/null
+++ b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp;
+
+import org.apache.sqoop.connector.ftp.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration;
+import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.job.etl.LoaderContext;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.mockftpserver.fake.filesystem.DirectoryEntry;
+import org.mockftpserver.fake.filesystem.FileSystem;
+import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;
+import org.mockftpserver.fake.FakeFtpServer;
+import org.mockftpserver.fake.UserAccount;
+
+/**
+ * Unit tests for {@link org.apache.sqoop.connector.ftp.Loader} class.
+ *
+ * This uses MockFtpServer (http://mockftpserver.sourceforge.net/) to provide
+ * a mock FTP server implementation.
+ */
+public class TestFtpLoader {
+
+  private FakeFtpServer fakeFtpServer;
+  private int port;
+  private String username = "user";
+  private String password = "pass";
+  private FtpLoader loader;
+
+  @Test
+  public void testLoader() {
+
+    final int NUMBER_OF_ROWS = 1000;
+
+    DataReader reader = new DataReader() {
+      private long index = 0L;
+      @Override
+      public Object[] readArrayRecord() {
+        return null;
+     }
+
+      @Override
+      public String readTextRecord() {
+        if (index++ < NUMBER_OF_ROWS) {
+          return index + "," + (double)index + ",'" + index + "'";
+        } else {
+          return null;
+        }
+      }
+
+      @Override
+      public Object readContent() {
+        return null;
+      }
+    };
+
+    try {
+      LoaderContext context = new LoaderContext(null, reader, null);
+      LinkConfiguration linkConfig = new LinkConfiguration();
+      linkConfig.linkConfig.username = username;
+      linkConfig.linkConfig.password = password;
+      linkConfig.linkConfig.server = "localhost";
+      linkConfig.linkConfig.port = port;
+      ToJobConfiguration jobConfig = new ToJobConfiguration();
+      jobConfig.toJobConfig.outputDirectory = "uploads";
+      loader.load(context, linkConfig, jobConfig);
+      Long rowsWritten = loader.getRowsWritten();
+      Assert.assertTrue(rowsWritten == NUMBER_OF_ROWS,
+                        ("actual rows written=" + rowsWritten + " instead of " +
+                         NUMBER_OF_ROWS));
+    } catch(Exception e) {
+      Assert.fail("caught exception: " + e.getMessage());
+    }
+  }
+
+  /**
+   * Create mock FTP server for testing, and add a user account for testing.
+   */
+  @BeforeClass(alwaysRun = true)
+  public void setUp() throws Exception {
+
+    loader = new FtpLoader();
+
+    fakeFtpServer = new FakeFtpServer();
+    fakeFtpServer.setServerControlPort(0);
+
+    FileSystem fileSystem = new UnixFakeFileSystem();
+    fileSystem.add(new DirectoryEntry("/uploads"));
+    fakeFtpServer.setFileSystem(fileSystem);
+
+    UserAccount userAccount = new UserAccount(username, password, "/");
+    fakeFtpServer.addUserAccount(userAccount);
+
+    fakeFtpServer.start();
+    port = fakeFtpServer.getServerControlPort();
+  }
+
+  /**
+   * Stop mock FTP server.
+   */
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    fakeFtpServer.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestLinkConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestLinkConfiguration.java b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestLinkConfiguration.java
new file mode 100644
index 0000000..b926a1d
--- /dev/null
+++ b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestLinkConfiguration.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp.configuration;
+
+import org.apache.sqoop.validation.ConfigValidationResult;
+import org.apache.sqoop.validation.ConfigValidationRunner;
+import org.apache.sqoop.validation.Status;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.mockftpserver.fake.filesystem.FileEntry;
+import org.mockftpserver.fake.filesystem.FileSystem;
+import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;
+import org.mockftpserver.fake.FakeFtpServer;
+import org.mockftpserver.fake.UserAccount;
+
+/**
+ * Unit tests for {@link org.apache.sqoop.connector.ftp.configuration.LinkConfiguration}.
+ *
+ * This uses MockFtpServer (http://mockftpserver.sourceforge.net/) to provide
+ * a mock FTP server implementation.
+ */
+public class TestLinkConfiguration {
+
+  private FakeFtpServer fakeFtpServer;
+  private int port;
+  private String username = "user";
+  private String password = "pass";
+
+  /**
+   * Test valid configuration.
+   */
+  @Test
+  public void testValidConfig() {
+    ConfigValidationRunner runner = new ConfigValidationRunner();
+    ConfigValidationResult result;
+    LinkConfiguration config = new LinkConfiguration();
+    config.linkConfig.username = username;
+    config.linkConfig.password = password;
+    config.linkConfig.server = "localhost";
+    config.linkConfig.port = port;
+    result = runner.validate(config);
+    Assert.assertTrue(result.getStatus() == Status.OK,
+                      "Test of valid configuration failed");
+  }
+
+  /**
+   * Test empty username.
+   */
+  @Test
+  public void testEmptyUsername() {
+    ConfigValidationRunner runner = new ConfigValidationRunner();
+    ConfigValidationResult result;
+    LinkConfiguration config = new LinkConfiguration();
+    config.linkConfig.username = "";
+    config.linkConfig.password = password;
+    config.linkConfig.server = "localhost";
+    config.linkConfig.port = port;
+    result = runner.validate(config);
+    Assert.assertFalse(result.getStatus() == Status.OK,
+                       "Test of empty username failed");
+  }
+
+  /**
+   * Test invalid username.
+   */
+  @Test
+  public void testInvalidUsername() {
+    ConfigValidationRunner runner = new ConfigValidationRunner();
+    ConfigValidationResult result;
+    LinkConfiguration config = new LinkConfiguration();
+    config.linkConfig.username = "baduser";
+    config.linkConfig.password = password;
+    config.linkConfig.server = "localhost";
+    config.linkConfig.port = port;
+    result = runner.validate(config);
+    Assert.assertFalse(result.getStatus() == Status.OK,
+                       "Test of invalid username failed");
+  }
+
+  /**
+   * Test empty server.
+   */
+  @Test
+  public void TestEmptyServer() {
+    ConfigValidationRunner runner = new ConfigValidationRunner();
+    ConfigValidationResult result;
+    LinkConfiguration config = new LinkConfiguration();
+    config.linkConfig.username = username;
+    config.linkConfig.password = password;
+    config.linkConfig.server = "";
+    config.linkConfig.port = port;
+    result = runner.validate(config);
+    Assert.assertFalse(result.getStatus() == Status.OK,
+                       "Test of empty server name failed");
+  }
+
+  /**
+   * Create mock FTP server for testing, and add a user account for testing.
+   */
+  @BeforeClass(alwaysRun = true)
+  public void setUp() throws Exception {
+    fakeFtpServer = new FakeFtpServer();
+    fakeFtpServer.setServerControlPort(0);
+
+    FileSystem fileSystem = new UnixFakeFileSystem();
+    fileSystem.add(new FileEntry("/home/user/file.txt", "abcdef"));
+    fakeFtpServer.setFileSystem(fileSystem);
+
+    UserAccount userAccount = new UserAccount(username, password, "/");
+    fakeFtpServer.addUserAccount(userAccount);
+
+    fakeFtpServer.start();
+    port = fakeFtpServer.getServerControlPort();
+  }
+
+  /**
+   * Stop mock FTP server.
+   */
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    fakeFtpServer.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestToJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestToJobConfiguration.java b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestToJobConfiguration.java
new file mode 100644
index 0000000..2870a16
--- /dev/null
+++ b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/configuration/TestToJobConfiguration.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp.configuration;
+
+import org.apache.sqoop.validation.ConfigValidationResult;
+import org.apache.sqoop.validation.ConfigValidationRunner;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for {@link org.apache.sqoop.connector.ftp.configuration.ToJobConfiguration} class.
+ */
+public class TestToJobConfiguration {
+
+  /**
+   * Test a non-empty directory name.
+   */
+  @Test
+  public void testValidDirectory() {
+    ConfigValidationRunner runner = new ConfigValidationRunner();
+    ConfigValidationResult result;
+    ToJobConfiguration config = new ToJobConfiguration();
+    config.toJobConfig.outputDirectory = "testdir";
+    result = runner.validate(config);
+    Assert.assertTrue(result.getStatus().canProceed(),
+                      "Test of valid directory failed");
+  }
+
+  /**
+   * Test an invalid, empty directory name.
+   */
+  @Test
+  public void testEmptyDirectory() {
+    ConfigValidationRunner runner = new ConfigValidationRunner();
+    ConfigValidationResult result;
+    ToJobConfiguration config = new ToJobConfiguration();
+    config.toJobConfig.outputDirectory = "";
+    result = runner.validate(config);
+    Assert.assertFalse(result.getStatus().canProceed(),
+                       "Test of empty directory failed");
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/ftpclient/TestFtpConnectorClient.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/ftpclient/TestFtpConnectorClient.java b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/ftpclient/TestFtpConnectorClient.java
new file mode 100644
index 0000000..b9430b3
--- /dev/null
+++ b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/ftpclient/TestFtpConnectorClient.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.ftp.ftpclient;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.etl.io.DataReader;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.mockftpserver.fake.filesystem.DirectoryEntry;
+import org.mockftpserver.fake.filesystem.FileSystem;
+import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;
+import org.mockftpserver.fake.FakeFtpServer;
+import org.mockftpserver.fake.UserAccount;
+
+/**
+ * Unit tests for {@link org.apache.sqoop.connector.ftp.ftpclient.FtpConnectorClient} class.
+ *
+ * This uses MockFtpServer (http://mockftpserver.sourceforge.net/) to provide
+ * a mock FTP server implementation.
+ *
+ * Note that this duplicates other tests currently, but leaving for now in case
+ * additional functionality is added to FtpConnectorClient.
+ */
+public class TestFtpConnectorClient {
+
+  private FakeFtpServer fakeFtpServer;
+  private int port;
+  private String username = "user";
+  private String password = "pass";
+
+  /**
+   * Test connect and login to FTP server.
+   */
+  @Test
+  public void testValidLogin() {
+    try {
+      FtpConnectorClient client = new FtpConnectorClient("localhost", port);
+      client.connect(username, password);
+      client.disconnect();
+    } catch (SqoopException e) {
+      Assert.fail("login failed " + e.getMessage());
+    }
+  }
+
+  /**
+   * Test invalid login to FTP server.
+   */
+  @Test
+  public void testInvalidLogin() {
+    try {
+      FtpConnectorClient client = new FtpConnectorClient("localhost", port);
+      client.connect("baduser", "badpass");
+      client.disconnect();
+      Assert.fail("expected exception for invalid login");
+    } catch (SqoopException e) {
+      // Expected
+    }
+  }
+
+  /**
+   * Test streaming upload.
+   */
+  @Test
+  public void testUploadStream() {
+
+    final int NUMBER_OF_ROWS = 1000;
+
+    DataReader reader = new DataReader() {
+      private long index = 0L;
+      @Override
+      public Object[] readArrayRecord() {
+        return null;
+     }
+
+      @Override
+      public String readTextRecord() {
+        if (index++ < NUMBER_OF_ROWS) {
+          return index + "," + (double)index + ",'" + index + "'";
+        } else {
+          return null;
+        }
+      }
+
+      @Override
+      public Object readContent() {
+        return null;
+      }
+    };
+
+    try {
+      FtpConnectorClient client = new FtpConnectorClient("localhost", port);
+      client.connect(username, password);
+      long rowsWritten = client.uploadStream(reader, "/uploads/test.txt");
+      client.disconnect();
+      Assert.assertTrue(rowsWritten == NUMBER_OF_ROWS,
+                        ("actual rows written=" + rowsWritten + " instead of " +
+                         NUMBER_OF_ROWS));
+    } catch(Exception e) {
+      Assert.fail("caught exception: " + e.getMessage());
+    }
+  }
+
+  /**
+   * Create mock FTP server for testing, and add a user account for testing.
+   */
+  @BeforeClass(alwaysRun = true)
+  public void setUp() throws Exception {
+    fakeFtpServer = new FakeFtpServer();
+    fakeFtpServer.setServerControlPort(0);
+
+    FileSystem fileSystem = new UnixFakeFileSystem();
+    fileSystem.add(new DirectoryEntry("/uploads"));
+    fakeFtpServer.setFileSystem(fileSystem);
+
+    UserAccount userAccount = new UserAccount(username, password, "/");
+    fakeFtpServer.addUserAccount(userAccount);
+
+    fakeFtpServer.start();
+    port = fakeFtpServer.getServerControlPort();
+  }
+
+  /**
+   * Stop mock FTP server.
+   */
+  @AfterClass(alwaysRun = true)
+  public void tearDown() throws Exception {
+    fakeFtpServer.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/connector-ftp/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/test/resources/log4j.properties b/connector/connector-ftp/src/test/resources/log4j.properties
new file mode 100644
index 0000000..44ffced
--- /dev/null
+++ b/connector/connector-ftp/src/test/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=DEBUG, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/connector/pom.xml
----------------------------------------------------------------------
diff --git a/connector/pom.xml b/connector/pom.xml
index c999061..1b69180 100644
--- a/connector/pom.xml
+++ b/connector/pom.xml
@@ -38,6 +38,7 @@ limitations under the License.
     <module>connector-hdfs</module>
     <module>connector-kite</module>
     <module>connector-kafka</module>
+    <module>connector-ftp</module>
     <module>connector-sftp</module>
     <!-- Uncomment and finish connectors after sqoop framework will become stable
     <module>connector-mysql-jdbc</module>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/docs/src/site/sphinx/Connectors.rst
----------------------------------------------------------------------
diff --git a/docs/src/site/sphinx/Connectors.rst b/docs/src/site/sphinx/Connectors.rst
index 4e24793..721e92a 100644
--- a/docs/src/site/sphinx/Connectors.rst
+++ b/docs/src/site/sphinx/Connectors.rst
@@ -459,3 +459,66 @@ Loader
 ------
 
 During the *loading* phase, the connector will create uniquely named files in the *output directory* for each partition of data received from the **FROM** connector.
+
+++++++++++++++
+FTP Connector
+++++++++++++++
+
+The FTP connector supports moving data between an FTP server and other supported Sqoop2 connectors.
+
+Currently only the TO direction is supported to write records to an FTP server. A FROM connector is pending (SQOOP-2127).
+
+-----
+Usage
+-----
+
+To use the FTP Connector, create a link for the connector and a job that uses the link.
+
+**Link Configuration**
+++++++++++++++++++++++
+
+Inputs associated with the link configuration include:
+
++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
+| Input                       | Type    | Description                                                           | Example                    |
++=============================+=========+=======================================================================+============================+
+| FTP server hostname         | String  | Hostname for the FTP server.                                          | ftp.example.com            |
+|                             |         | *Required*.                                                           |                            |
++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
+| FTP server port             | Integer | Port number for the FTP server. Defaults to 21.                       | 2100                       |
+|                             |         | *Optional*.                                                           |                            |
++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
+| Username                    | String  | The username to provide when connecting to the FTP server.            | sqoop                      |
+|                             |         | *Required*.                                                           |                            |
++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
+| Password                    | String  | The password to provide when connecting to the FTP server.            | sqoop                      |
+|                             |         | *Required*                                                            |                            |
++-----------------------------+---------+-----------------------------------------------------------------------+----------------------------+
+
+**Notes**
+=========
+
+1. The FTP connector will attempt to connect to the FTP server as part of the link validation process. If for some reason a connection can not be established, you'll see a corresponding warning message.
+
+**TO Job Configuration**
+++++++++++++++++++++++++
+
+Inputs associated with the Job configuration for the TO direction include:
+
++-----------------------------+---------+-------------------------------------------------------------------------+-----------------------------------+
+| Input                       | Type    | Description                                                             | Example                           |
++=============================+=========+=========================================================================+===================================+
+| Output directory            | String  | The location on the FTP server that the connector will write files to.  | uploads                           |
+|                             |         | *Required*                                                              |                                   |
++-----------------------------+---------+-------------------------------------------------------------------------+-----------------------------------+
+
+**Notes**
+=========
+
+1. The *output directory* value needs to be an existing directory on the FTP server.
+
+------
+Loader
+------
+
+During the *loading* phase, the connector will create uniquely named files in the *output directory* for each partition of data received from the **FROM** connector.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 955276a..531009a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -319,6 +319,17 @@ limitations under the License.
       </dependency>
       <dependency>
         <groupId>org.apache.sqoop.connector</groupId>
+        <artifactId>sqoop-connector-ftp</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.sqoop.connector</groupId>
+        <artifactId>sqoop-connector-ftp</artifactId>
+        <type>test-jar</type>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.sqoop.connector</groupId>
         <artifactId>sqoop-connector-kite</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bf4ae0b3/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 416b5c0..aabefc0 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -99,6 +99,11 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.sqoop.connector</groupId>
+      <artifactId>sqoop-connector-ftp</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>