You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/08/20 22:28:27 UTC

[3/3] git commit: SQOOP-1375: Sqoop2: From/To: Create HDFS connector

SQOOP-1375: Sqoop2: From/To: Create HDFS connector


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/5c29a2a2
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/5c29a2a2
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/5c29a2a2

Branch: refs/heads/SQOOP-1367
Commit: 5c29a2a291b54d1c15f7e2482893bc074a011a76
Parents: 7127948
Author: Gwen Shapira <cs...@gmail.com>
Authored: Wed Aug 20 13:23:41 2014 -0700
Committer: Abraham Elmahrek <ab...@elmahrek.com>
Committed: Wed Aug 20 13:23:41 2014 -0700

----------------------------------------------------------------------
 .../org/apache/sqoop/client/SqoopClient.java    |   2 +-
 common/pom.xml                                  |  66 +++
 .../org/apache/sqoop/common/PrefixContext.java  |  70 +++
 connector/connector-hdfs/pom.xml                |  83 +++
 .../sqoop/connector/hdfs/HdfsConnector.java     | 132 +++++
 .../connector/hdfs/HdfsConnectorError.java      |  52 ++
 .../sqoop/connector/hdfs/HdfsConstants.java     |  31 ++
 .../sqoop/connector/hdfs/HdfsDestroyer.java     |  36 ++
 .../sqoop/connector/hdfs/HdfsExtractor.java     | 199 +++++++
 .../sqoop/connector/hdfs/HdfsInitializer.java   |  45 ++
 .../apache/sqoop/connector/hdfs/HdfsLoader.java | 140 +++++
 .../sqoop/connector/hdfs/HdfsPartition.java     | 161 ++++++
 .../sqoop/connector/hdfs/HdfsPartitioner.java   | 555 +++++++++++++++++++
 .../sqoop/connector/hdfs/HdfsValidator.java     |  89 +++
 .../configuration/ConnectionConfiguration.java  |  31 ++
 .../hdfs/configuration/ConnectionForm.java      |  29 +
 .../configuration/FromJobConfiguration.java     |  32 ++
 .../connector/hdfs/configuration/InputForm.java |  30 +
 .../hdfs/configuration/OutputCompression.java   |  33 ++
 .../hdfs/configuration/OutputForm.java          |  36 ++
 .../hdfs/configuration/OutputFormat.java        |  33 ++
 .../hdfs/configuration/StorageType.java         |  28 +
 .../hdfs/configuration/ToJobConfiguration.java  |  31 ++
 .../hdfs/hdfsWriter/GenericHdfsWriter.java      |  34 ++
 .../hdfs/hdfsWriter/HdfsSequenceWriter.java     |  57 ++
 .../hdfs/hdfsWriter/HdfsTextWriter.java         |  61 ++
 .../hdfs-connector-resources.properties         |  58 ++
 .../main/resources/sqoopconnector.properties    |  18 +
 .../idf/CSVIntermediateDataFormat.java          |   7 +-
 .../idf/IntermediateDataFormatError.java        |   4 +-
 connector/pom.xml                               |   9 +-
 .../sqoop/framework/FrameworkValidator.java     |  63 +--
 .../org/apache/sqoop/framework/JobManager.java  |  29 +-
 .../sqoop/framework/SubmissionRequest.java      |  14 +-
 .../configuration/ExportJobConfiguration.java   |  37 --
 .../configuration/ImportJobConfiguration.java   |  37 --
 .../framework/configuration/InputForm.java      |  30 -
 .../configuration/JobConfiguration.java         |   4 +-
 .../configuration/OutputCompression.java        |  33 --
 .../framework/configuration/OutputForm.java     |  38 --
 .../framework/configuration/OutputFormat.java   |  33 --
 .../framework/configuration/StorageType.java    |  28 -
 .../sqoop/framework/TestFrameworkValidator.java |  10 +-
 .../sqoop/repository/TestJdbcRepository.java    |   2 +-
 .../mapreduce/MapreduceExecutionEngine.java     |  42 +-
 .../org/apache/sqoop/job/PrefixContext.java     |  70 ---
 .../sqoop/job/etl/HdfsExportExtractor.java      | 194 -------
 .../sqoop/job/etl/HdfsExportPartition.java      | 160 ------
 .../sqoop/job/etl/HdfsExportPartitioner.java    | 552 ------------------
 .../sqoop/job/etl/HdfsSequenceImportLoader.java |  94 ----
 .../sqoop/job/etl/HdfsTextImportLoader.java     | 101 ----
 .../sqoop/job/mr/SqoopDestroyerExecutor.java    |   2 +-
 .../apache/sqoop/job/mr/SqoopInputFormat.java   |   2 +-
 .../org/apache/sqoop/job/mr/SqoopMapper.java    |  11 +-
 .../job/mr/SqoopOutputFormatLoadExecutor.java   |  10 +-
 .../mapreduce/MapreduceExecutionEngineTest.java |  13 +-
 .../org/apache/sqoop/job/TestHdfsExtract.java   |  31 +-
 .../java/org/apache/sqoop/job/TestHdfsLoad.java |  33 +-
 pom.xml                                         |  11 +
 .../derby/DerbyRepositoryHandler.java           |  50 +-
 server/pom.xml                                  |   5 +
 .../apache/sqoop/handler/JobRequestHandler.java |   1 +
 test/pom.xml                                    |   5 +
 .../sqoop/test/testcases/ConnectorTestCase.java |   4 +-
 .../connector/jdbc/generic/TableImportTest.java |   8 -
 .../jdbc/generic/imports/PartitionerTest.java   |   8 -
 .../SubmissionWithDisabledModelObjectsTest.java |  11 -
 67 files changed, 2309 insertions(+), 1659 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
index 1d93ae3..2b3171c 100644
--- a/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
+++ b/client/src/main/java/org/apache/sqoop/client/SqoopClient.java
@@ -369,7 +369,7 @@ public class SqoopClient {
       fromConnection.getPersistenceId(),
       toConnection.getPersistenceId(),
       getConnector(fromConnection.getConnectorId()).getJobForms(Direction.FROM),
-      getConnector(fromConnection.getConnectorId()).getJobForms(Direction.TO),
+      getConnector(toConnection.getConnectorId()).getJobForms(Direction.TO),
       getFramework().getJobForms()
     );
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 9bfa07d..151a649 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -106,5 +106,71 @@ limitations under the License.
       </plugin>
     </plugins>
   </build>
+  <!-- Profiles for various supported Hadoop distributions -->
+  <profiles>
+
+    <!-- Hadoop 1.x -->
+    <profile>
+      <id>hadoop100</id>
+
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>100</value>
+        </property>
+      </activation>
+
+      <dependencies>
+        <dependency>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+          <scope>provided</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+
+    <!-- Hadoop 2.x (active by default) -->
+    <profile>
+      <id>hadoop200</id>
+
+      <activation>
+        <activeByDefault>true</activeByDefault>
+        <property>
+          <name>hadoop.profile</name>
+          <value>200</value>
+        </property>
+      </activation>
+
+      <properties>
+        <hadoop.profile>200</hadoop.profile>
+      </properties>
+
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+          <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+          <scope>provided</scope>
+        </dependency>
+
+      </dependencies>
+    </profile>
+  </profiles>
 
 </project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/common/src/main/java/org/apache/sqoop/common/PrefixContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/common/PrefixContext.java b/common/src/main/java/org/apache/sqoop/common/PrefixContext.java
new file mode 100644
index 0000000..6434e6d
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/common/PrefixContext.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.common.ImmutableContext;
+
+/**
+ * Implementation of immutable context that is based on Hadoop configuration
+ * object. Each context property is prefixed with special prefix and loaded
+ * directly.
+ */
+public class PrefixContext implements ImmutableContext {
+
+  Configuration configuration;
+  String prefix;
+
+  public PrefixContext(Configuration configuration, String prefix) {
+    this.configuration = configuration;
+    this.prefix = prefix;
+  }
+
+  @Override
+  public String getString(String key) {
+    return configuration.get(prefix + key);
+  }
+
+  @Override
+  public String getString(String key, String defaultValue) {
+    return configuration.get(prefix + key, defaultValue);
+  }
+
+  @Override
+  public long getLong(String key, long defaultValue) {
+    return configuration.getLong(prefix + key, defaultValue);
+  }
+
+  @Override
+  public int getInt(String key, int defaultValue) {
+    return  configuration.getInt(prefix + key, defaultValue);
+  }
+
+  @Override
+  public boolean getBoolean(String key, boolean defaultValue) {
+    return configuration.getBoolean(prefix + key, defaultValue);
+  }
+
+  /*
+   * TODO: Use getter methods for retrieval instead of
+   * exposing configuration directly.
+   */
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml
new file mode 100644
index 0000000..8df9f11
--- /dev/null
+++ b/connector/connector-hdfs/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.sqoop</groupId>
+    <artifactId>connector</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.sqoop.connector</groupId>
+  <artifactId>sqoop-connector-hdfs</artifactId>
+  <name>Sqoop HDFS Connector</name>
+
+  <!-- TODO: Hardcoding Hadoop200 for now -->
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-spi</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>connector-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+  </dependencies>
+  <build>
+    <finalName>sqoop</finalName>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
new file mode 100644
index 0000000..557091e
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.VersionInfo;
+import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.spi.MetadataUpgrader;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.job.etl.From;
+import org.apache.sqoop.job.etl.To;
+import org.apache.sqoop.validation.Validator;
+
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+public class HdfsConnector extends SqoopConnector {
+
+  private static final From FROM = new From(
+          HdfsInitializer.class,
+          HdfsPartitioner.class,
+          HdfsExtractor.class,
+          HdfsDestroyer.class);
+
+  private static final To TO = new To(
+          HdfsInitializer.class,
+          HdfsLoader.class,
+          HdfsDestroyer.class);
+
+  private static final HdfsValidator hdfsValidator = new HdfsValidator();
+
+  /**
+   * Retrieve connector version.
+   *
+   * @return Version encoded as a string
+   */
+  @Override
+  public String getVersion() {
+    return VersionInfo.getVersion();
+  }
+
+  /**
+   * @param locale
+   * @return the resource bundle associated with the given locale.
+   */
+  @Override
+  public ResourceBundle getBundle(Locale locale) {
+    return ResourceBundle.getBundle(
+            HdfsConstants.RESOURCE_BUNDLE_NAME, locale);
+  }
+
+  /**
+   * @return Get connection configuration class
+   */
+  @Override
+  public Class getConnectionConfigurationClass() {
+    return ConnectionConfiguration.class;
+  }
+
+  /**
+   * @param jobType
+   * @return Get job configuration class for given type or null if not supported
+   */
+  @Override
+  public Class getJobConfigurationClass(Direction jobType) {
+    switch (jobType) {
+      case FROM:
+        return FromJobConfiguration.class;
+      case TO:
+        return ToJobConfiguration.class;
+      default:
+        return null;
+    }
+  }
+
+  /**
+   * @return an <tt>From</tt> that provides classes for performing import.
+   */
+  @Override
+  public From getFrom() {
+    return FROM;
+  }
+
+  /**
+   * @return an <tt>To</tt> that provides classes for performing export.
+   */
+  @Override
+  public To getTo() {
+    return TO;
+  }
+
+  /**
+   * Returns validation object that Sqoop framework can use to validate user
+   * supplied forms before accepting them. This object will be used both for
+   * connection and job forms.
+   *
+   * @return Validator object
+   */
+  @Override
+  public Validator getValidator() {
+    return hdfsValidator;
+  }
+
+  /**
+   * Returns an {@linkplain org.apache.sqoop.connector.spi.MetadataUpgrader} object that can upgrade the
+   * connection and job metadata.
+   *
+   * @return MetadataUpgrader object
+   */
+  @Override
+  public MetadataUpgrader getMetadataUpgrader() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java
new file mode 100644
index 0000000..8a095d2
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnectorError.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum HdfsConnectorError implements ErrorCode{
+  /** Error occurs during partitioner run */
+  GENERIC_HDFS_CONNECTOR_0000("Error occurs during partitioner run"),
+  /** Error occurs during extractor run */
+  GENERIC_HDFS_CONNECTOR_0001("Error occurs during extractor run"),
+  /** Unsupported output format type found **/
+  GENERIC_HDFS_CONNECTOR_0002("Unknown output format type"),
+  /** The system was unable to load the specified class. */
+  GENERIC_HDFS_CONNECTOR_0003("Unable to load the specified class"),
+  /** The system was unable to instantiate the specified class. */
+  GENERIC_HDFS_CONNECTOR_0004("Unable to instantiate the specified class"),
+  /** Error occurs during loader run */
+  GENERIC_HDFS_CONNECTOR_0005("Error occurs during loader run")
+
+  ;
+
+  private final String message;
+
+  private HdfsConnectorError(String message) {
+    this.message = message;
+  }
+
+  public String getCode() {
+    return name();
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
new file mode 100644
index 0000000..a27aff1
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.job.Constants;
+
+public final class HdfsConstants extends Constants {
+
+  // Resource bundle name
+  public static final String RESOURCE_BUNDLE_NAME =
+          "hdfs-connector-resources";
+
+  public static final char DEFAULT_RECORD_DELIMITER = '\n';
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java
new file mode 100644
index 0000000..74b1cb8
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsDestroyer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+public class HdfsDestroyer extends Destroyer {
+  /**
+   * Callback to clean up after job execution.
+   *
+   * @param context Destroyer context
+   * @param o       Connection configuration object
+   * @param o2      Job configuration object
+   */
+  @Override
+  public void destroy(DestroyerContext context, Object o, Object o2) {
+    //TODO: Add a "success" flag?
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
new file mode 100644
index 0000000..fc12381
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.util.LineReader;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.common.PrefixContext;
+
+import java.io.IOException;
+
+/**
+ * Extract from HDFS.
+ * Default field delimiter of a record is comma.
+ */
+
+
+public class HdfsExtractor extends Extractor<ConnectionConfiguration, FromJobConfiguration, HdfsPartition> {
+
+  public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);
+
+  private Configuration conf;
+  private DataWriter dataWriter;
+  private long rowRead = 0;
+
+  @Override
+  public void extract(ExtractorContext context,
+      ConnectionConfiguration connectionConfiguration,
+      FromJobConfiguration jobConfiguration, HdfsPartition partition) {
+
+    conf = ((PrefixContext) context.getContext()).getConfiguration();
+    dataWriter = context.getDataWriter();
+
+    try {
+      HdfsPartition p = partition;
+      LOG.info("Working on partition: " + p);
+      int numFiles = p.getNumberOfFiles();
+      for (int i = 0; i < numFiles; i++) {
+        extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
+      }
+    } catch (IOException e) {
+      throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0001, e);
+    }
+  }
+
+  private void extractFile(Path file, long start, long length)
+      throws IOException {
+    long end = start + length;
+    LOG.info("Extracting file " + file);
+    LOG.info("\t from offset " + start);
+    LOG.info("\t to offset " + end);
+    LOG.info("\t of length " + length);
+    if(isSequenceFile(file)) {
+      extractSequenceFile(file, start, length);
+    } else {
+      extractTextFile(file, start, length);
+    }
+  }
+
+  /**
+   * Extracts Sequence file
+   * @param file
+   * @param start
+   * @param length
+   * @throws IOException
+   */
+  private void extractSequenceFile(Path file, long start, long length)
+      throws IOException {
+    LOG.info("Extracting sequence file");
+    long end = start + length;
+    SequenceFile.Reader filereader = new SequenceFile.Reader(
+        file.getFileSystem(conf), file, conf);
+
+    if (start > filereader.getPosition()) {
+      filereader.sync(start); // sync to start
+    }
+
+    Text line = new Text();
+    boolean hasNext = filereader.next(line);
+    while (hasNext) {
+      rowRead++;
+      dataWriter.writeStringRecord(line.toString());
+      line = new Text();
+      hasNext = filereader.next(line);
+      if (filereader.getPosition() >= end && filereader.syncSeen()) {
+        break;
+      }
+    }
+    filereader.close();
+  }
+
+  /**
+   * Extracts Text file
+   * @param file
+   * @param start
+   * @param length
+   * @throws IOException
+   */
+  private void extractTextFile(Path file, long start, long length)
+      throws IOException {
+    LOG.info("Extracting text file");
+    long end = start + length;
+    FileSystem fs = file.getFileSystem(conf);
+    FSDataInputStream filestream = fs.open(file);
+    CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
+    LineReader filereader;
+    Seekable fileseeker = filestream;
+
+    // Hadoop 1.0 does not have support for custom record delimiter and thus
+    // we
+    // are supporting only default one.
+    // We might add another "else if" case for SplittableCompressionCodec once
+    // we drop support for Hadoop 1.0.
+    if (codec == null) {
+      filestream.seek(start);
+      filereader = new LineReader(filestream);
+    } else {
+      filereader = new LineReader(codec.createInputStream(filestream,
+          codec.createDecompressor()), conf);
+      fileseeker = filestream;
+    }
+    if (start != 0) {
+      // always throw away first record because
+      // one extra line is read in previous split
+      start += filereader.readLine(new Text(), 0);
+    }
+    int size;
+    LOG.info("Start position: " + String.valueOf(start));
+    long next = start;
+    while (next <= end) {
+      Text line = new Text();
+      size = filereader.readLine(line, Integer.MAX_VALUE);
+      if (size == 0) {
+        break;
+      }
+      if (codec == null) {
+        next += size;
+      } else {
+        next = fileseeker.getPos();
+      }
+      rowRead++;
+      dataWriter.writeStringRecord(line.toString());
+    }
+    LOG.info("Extracting ended on position: " + fileseeker.getPos());
+    filestream.close();
+  }
+
+  @Override
+  public long getRowsRead() {
+    return rowRead;
+  }
+
+  /**
+   * Returns true if given file is sequence
+   * @param file
+   * @return boolean
+   */
+  private boolean isSequenceFile(Path file) {
+    SequenceFile.Reader filereader = null;
+    try {
+      filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf);
+      filereader.close();
+    } catch (IOException e) {
+      return false;
+    }
+    return true;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
new file mode 100644
index 0000000..d2d12a8
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsInitializer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+
+
+public class HdfsInitializer extends Initializer {
+  /**
+   * Initialize new submission based on given configuration properties. Any
+   * needed temporary values might be saved to context object and they will be
+   * promoted to all other part of the workflow automatically.
+   *
+   * @param context Initializer context object
+   * @param connection       Connector's connection configuration object
+   * @param job      Connector's job configuration object
+   */
+  @Override
+  public void initialize(InitializerContext context, Object connection, Object job) {
+
+  }
+
+
+  @Override
+  public Schema getSchema(InitializerContext context, Object connection, Object job) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
new file mode 100644
index 0000000..5a924f9
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.OutputFormat;
+import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
+import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter;
+import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter;
+import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter;
+import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
+import org.apache.sqoop.utils.ClassUtils;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class HdfsLoader extends Loader<ConnectionConfiguration, ToJobConfiguration> {
+  /**
+   * Load data to target.
+   *
+   * @param context Loader context object
+   * @param connection       Connection configuration
+   * @param job      Job configuration
+   * @throws Exception
+   */
+  @Override
+  public void load(LoaderContext context, ConnectionConfiguration connection, ToJobConfiguration job) throws Exception {
+
+    DataReader reader = context.getDataReader();
+
+    Configuration conf = new Configuration();
+
+    String directoryName = job.output.outputDirectory;
+    String codecname = getCompressionCodecName(job);
+
+    CompressionCodec codec = null;
+    if (codecname != null) {
+      Class<?> clz = ClassUtils.loadClass(codecname);
+      if (clz == null) {
+        throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0003, codecname);
+      }
+
+      try {
+        codec = (CompressionCodec) clz.newInstance();
+        if (codec instanceof Configurable) {
+          ((Configurable) codec).setConf(conf);
+        }
+      } catch (Exception e) {
+        throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0004, codecname, e);
+      }
+    }
+
+    String filename = directoryName + "/" + UUID.randomUUID() + getExtension(job,codec);
+
+    try {
+      Path filepath = new Path(filename);
+
+      GenericHdfsWriter filewriter = getWriter(job);
+
+      filewriter.initialize(filepath,conf,codec);
+
+      String csv;
+
+      while ((csv = reader.readTextRecord()) != null) {
+        filewriter.write(csv);
+      }
+      filewriter.destroy();
+
+    } catch (IOException e) {
+      throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0005, e);
+    }
+
+  }
+
+  private GenericHdfsWriter getWriter(ToJobConfiguration job) {
+    if (job.output.outputFormat == OutputFormat.SEQUENCE_FILE)
+      return new HdfsSequenceWriter();
+    else
+      return new HdfsTextWriter();
+  }
+
+
+  private String getCompressionCodecName(ToJobConfiguration jobConf) {
+    if(jobConf.output.compression == null)
+      return null;
+    switch(jobConf.output.compression) {
+      case NONE:
+        return null;
+      case DEFAULT:
+        return "org.apache.hadoop.io.compress.DefaultCodec";
+      case DEFLATE:
+        return "org.apache.hadoop.io.compress.DeflateCodec";
+      case GZIP:
+        return "org.apache.hadoop.io.compress.GzipCodec";
+      case BZIP2:
+        return "org.apache.hadoop.io.compress.BZip2Codec";
+      case LZO:
+        return "com.hadoop.compression.lzo.LzoCodec";
+      case LZ4:
+        return "org.apache.hadoop.io.compress.Lz4Codec";
+      case SNAPPY:
+        return "org.apache.hadoop.io.compress.SnappyCodec";
+      case CUSTOM:
+        return jobConf.output.customCompression.trim();
+    }
+    return null;
+  }
+
+  //TODO: We should probably support configurable extensions at some point
+  private static String getExtension(ToJobConfiguration job, CompressionCodec codec) {
+    if (job.output.outputFormat == OutputFormat.SEQUENCE_FILE)
+      return ".seq";
+    if (codec == null)
+      return ".txt";
+    return codec.getDefaultExtension();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java
new file mode 100644
index 0000000..b801356
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartition.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.hdfs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.job.etl.Partition;
+
+/**
+ * This class derives mostly from CombineFileSplit of Hadoop, i.e.
+ * org.apache.hadoop.mapreduce.lib.input.CombineFileSplit.
+ */
+public class HdfsPartition extends Partition {
+
+  private long lenFiles;
+  private int numFiles;
+  private Path[] files;
+  private long[] offsets;
+  private long[] lengths;
+  private String[] locations;
+
+  public HdfsPartition() {}
+
+  public HdfsPartition(Path[] files, long[] offsets,
+                       long[] lengths, String[] locations) {
+    for(long length : lengths) {
+      this.lenFiles += length;
+    }
+    this.numFiles = files.length;
+    this.files = files;
+    this.offsets = offsets;
+    this.lengths = lengths;
+    this.locations = locations;
+  }
+
+  public long getLengthOfFiles() {
+    return lenFiles;
+  }
+
+  public int getNumberOfFiles() {
+    return numFiles;
+  }
+
+  public Path getFile(int i) {
+    return files[i];
+  }
+
+  public long getOffset(int i) {
+    return offsets[i];
+  }
+
+  public long getLength(int i) {
+    return lengths[i];
+  }
+
+  public String[] getLocations() {
+    return locations;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    numFiles = in.readInt();
+
+    files = new Path[numFiles];
+    for(int i=0; i<numFiles; i++) {
+      files[i] = new Path(in.readUTF());
+    }
+
+    offsets = new long[numFiles];
+    for(int i=0; i<numFiles; i++) {
+      offsets[i] = in.readLong();
+    }
+
+    lengths = new long[numFiles];
+    for(int i=0; i<numFiles; i++) {
+      lengths[i] = in.readLong();
+    }
+
+    for(long length : lengths) {
+      lenFiles += length;
+    }
+
+    int numLocations = in.readInt();
+    if (numLocations == 0) {
+      locations = null;
+    } else {
+      locations = new String[numLocations];
+      for(int i=0; i<numLocations; i++) {
+        locations[i] = in.readUTF();
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(numFiles);
+
+    for(Path file : files) {
+      out.writeUTF(file.toString());
+    }
+
+    for(long offset : offsets) {
+      out.writeLong(offset);
+    }
+
+    for(long length : lengths) {
+      out.writeLong(length);
+    }
+
+    if (locations == null || locations.length == 0) {
+      out.writeInt(0);
+    } else {
+      out.writeInt(locations.length);
+      for(String location : locations) {
+        out.writeUTF(location);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{");
+    boolean first = true;
+    for(int i = 0; i < files.length; i++) {
+      if(first) {
+        first = false;
+      } else {
+        sb.append(", ");
+      }
+
+      sb.append(files[i]);
+      sb.append(" (offset=").append(offsets[i]);
+      sb.append(", end=").append(offsets[i] + lengths[i]);
+      sb.append(", length=").append(lengths[i]);
+      sb.append(")");
+    }
+    sb.append("}");
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
new file mode 100644
index 0000000..df764d2
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
@@ -0,0 +1,555 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.hdfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hdfs.configuration.ConnectionConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
+import org.apache.sqoop.common.PrefixContext;
+
+/**
+ * This class derives mostly from CombineFileInputFormat of Hadoop, i.e.
+ * org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.
+ */
+public class HdfsPartitioner extends Partitioner<ConnectionConfiguration, FromJobConfiguration> {
+
+  public static final String SPLIT_MINSIZE_PERNODE =
+      "mapreduce.input.fileinputformat.split.minsize.per.node";
+  public static final String SPLIT_MINSIZE_PERRACK =
+      "mapreduce.input.fileinputformat.split.minsize.per.rack";
+
+  // ability to limit the size of a single split
+  private long maxSplitSize = 0;
+  private long minSplitSizeNode = 0;
+  private long minSplitSizeRack = 0;
+
+  // mapping from a rack name to the set of Nodes in the rack
+  private HashMap<String, Set<String>> rackToNodes =
+      new HashMap<String, Set<String>>();
+
+  @Override
+  public List<Partition> getPartitions(PartitionerContext context,
+      ConnectionConfiguration connectionConfiguration, FromJobConfiguration jobConfiguration) {
+
+    Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
+
+    try {
+      long numInputBytes = getInputSize(conf, jobConfiguration.input.inputDirectory);
+      maxSplitSize = numInputBytes / context.getMaxPartitions();
+
+      if(numInputBytes % context.getMaxPartitions() != 0 ) {
+        maxSplitSize += 1;
+       }
+
+      long minSizeNode = 0;
+      long minSizeRack = 0;
+      long maxSize = 0;
+
+      // the values specified by setxxxSplitSize() takes precedence over the
+      // values that might have been specified in the config
+      if (minSplitSizeNode != 0) {
+        minSizeNode = minSplitSizeNode;
+      } else {
+        minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
+      }
+      if (minSplitSizeRack != 0) {
+        minSizeRack = minSplitSizeRack;
+      } else {
+        minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
+      }
+      if (maxSplitSize != 0) {
+        maxSize = maxSplitSize;
+      } else {
+        maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
+      }
+      if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
+        throw new IOException("Minimum split size pernode " + minSizeNode +
+                              " cannot be larger than maximum split size " +
+                              maxSize);
+      }
+      if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
+        throw new IOException("Minimum split size per rack" + minSizeRack +
+                              " cannot be larger than maximum split size " +
+                              maxSize);
+      }
+      if (minSizeRack != 0 && minSizeNode > minSizeRack) {
+        throw new IOException("Minimum split size per node" + minSizeNode +
+                              " cannot be smaller than minimum split " +
+                              "size per rack " + minSizeRack);
+      }
+
+      // all the files in input set
+      String indir = jobConfiguration.input.inputDirectory;
+      FileSystem fs = FileSystem.get(conf);
+
+      List<Path> paths = new LinkedList<Path>();
+      for(FileStatus status : fs.listStatus(new Path(indir))) {
+        if(!status.isDir()) {
+          paths.add(status.getPath());
+        }
+      }
+
+      List<Partition> partitions = new ArrayList<Partition>();
+      if (paths.size() == 0) {
+        return partitions;
+      }
+
+      // create splits for all files that are not in any pool.
+      getMoreSplits(conf, paths,
+                    maxSize, minSizeNode, minSizeRack, partitions);
+
+      // free up rackToNodes map
+      rackToNodes.clear();
+
+      return partitions;
+
+    } catch (IOException e) {
+      throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0000, e);
+    }
+  }
+
+  //TODO: Perhaps get the FS from connection configuration so we can support remote HDFS
+  private long getInputSize(Configuration conf, String indir) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] files = fs.listStatus(new Path(indir));
+    long count = 0;
+    for (FileStatus file : files) {
+      count += file.getLen();
+    }
+    return count;
+  }
+
+  /**
+   * Return all the splits in the specified set of paths
+   */
+  private void getMoreSplits(Configuration conf, List<Path> paths,
+      long maxSize, long minSizeNode, long minSizeRack,
+      List<Partition> partitions) throws IOException {
+
+    // all blocks for all the files in input set
+    OneFileInfo[] files;
+
+    // mapping from a rack name to the list of blocks it has
+    HashMap<String, List<OneBlockInfo>> rackToBlocks =
+                              new HashMap<String, List<OneBlockInfo>>();
+
+    // mapping from a block to the nodes on which it has replicas
+    HashMap<OneBlockInfo, String[]> blockToNodes =
+                              new HashMap<OneBlockInfo, String[]>();
+
+    // mapping from a node to the list of blocks that it contains
+    HashMap<String, List<OneBlockInfo>> nodeToBlocks =
+                              new HashMap<String, List<OneBlockInfo>>();
+
+    files = new OneFileInfo[paths.size()];
+    if (paths.size() == 0) {
+      return;
+    }
+
+    // populate all the blocks for all files
+    for (int i = 0; i < paths.size(); i++) {
+      files[i] = new OneFileInfo(paths.get(i), conf, isSplitable(conf, paths.get(i)),
+                                 rackToBlocks, blockToNodes, nodeToBlocks,
+                                 rackToNodes, maxSize);
+    }
+
+    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
+    Set<String> nodes = new HashSet<String>();
+    long curSplitSize = 0;
+
+    // process all nodes and create splits that are local
+    // to a node.
+    for (Iterator<Map.Entry<String,
+         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
+         iter.hasNext();) {
+
+      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+      nodes.add(one.getKey());
+      List<OneBlockInfo> blocksInNode = one.getValue();
+
+      // for each block, copy it into validBlocks. Delete it from
+      // blockToNodes so that the same block does not appear in
+      // two different splits.
+      for (OneBlockInfo oneblock : blocksInNode) {
+        if (blockToNodes.containsKey(oneblock)) {
+          validBlocks.add(oneblock);
+          blockToNodes.remove(oneblock);
+          curSplitSize += oneblock.length;
+
+          // if the accumulated split size exceeds the maximum, then
+          // create this split.
+          if (maxSize != 0 && curSplitSize >= maxSize) {
+            // create an input split and add it to the splits array
+            addCreatedSplit(partitions, nodes, validBlocks);
+            curSplitSize = 0;
+            validBlocks.clear();
+          }
+        }
+      }
+      // if there were any blocks left over and their combined size is
+      // larger than minSplitNode, then combine them into one split.
+      // Otherwise add them back to the unprocessed pool. It is likely
+      // that they will be combined with other blocks from the
+      // same rack later on.
+      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(partitions, nodes, validBlocks);
+      } else {
+        for (OneBlockInfo oneblock : validBlocks) {
+          blockToNodes.put(oneblock, oneblock.hosts);
+        }
+      }
+      validBlocks.clear();
+      nodes.clear();
+      curSplitSize = 0;
+    }
+
+    // if blocks in a rack are below the specified minimum size, then keep them
+    // in 'overflow'. After the processing of all racks is complete, these
+    // overflow blocks will be combined into splits.
+    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
+    Set<String> racks = new HashSet<String>();
+
+    // Process all racks over and over again until there is no more work to do.
+    while (blockToNodes.size() > 0) {
+
+      // Create one split for this rack before moving over to the next rack.
+      // Come back to this rack after creating a single split for each of the
+      // remaining racks.
+      // Process one rack location at a time, Combine all possible blocks that
+      // reside on this rack as one split. (constrained by minimum and maximum
+      // split size).
+
+      // iterate over all racks
+      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
+           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
+
+        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+        racks.add(one.getKey());
+        List<OneBlockInfo> blocks = one.getValue();
+
+        // for each block, copy it into validBlocks. Delete it from
+        // blockToNodes so that the same block does not appear in
+        // two different splits.
+        boolean createdSplit = false;
+        for (OneBlockInfo oneblock : blocks) {
+          if (blockToNodes.containsKey(oneblock)) {
+            validBlocks.add(oneblock);
+            blockToNodes.remove(oneblock);
+            curSplitSize += oneblock.length;
+
+            // if the accumulated split size exceeds the maximum, then
+            // create this split.
+            if (maxSize != 0 && curSplitSize >= maxSize) {
+              // create an input split and add it to the splits array
+              addCreatedSplit(partitions, getHosts(racks), validBlocks);
+              createdSplit = true;
+              break;
+            }
+          }
+        }
+
+        // if we created a split, then just go to the next rack
+        if (createdSplit) {
+          curSplitSize = 0;
+          validBlocks.clear();
+          racks.clear();
+          continue;
+        }
+
+        if (!validBlocks.isEmpty()) {
+          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
+            // if there is a minimum size specified, then create a single split
+            // otherwise, store these blocks into overflow data structure
+            addCreatedSplit(partitions, getHosts(racks), validBlocks);
+          } else {
+            // There were a few blocks in this rack that
+            // remained to be processed. Keep them in 'overflow' block list.
+            // These will be combined later.
+            overflowBlocks.addAll(validBlocks);
+          }
+        }
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
+    }
+
+    assert blockToNodes.isEmpty();
+    assert curSplitSize == 0;
+    assert validBlocks.isEmpty();
+    assert racks.isEmpty();
+
+    // Process all overflow blocks
+    for (OneBlockInfo oneblock : overflowBlocks) {
+      validBlocks.add(oneblock);
+      curSplitSize += oneblock.length;
+
+      // This might cause an exiting rack location to be re-added,
+      // but it should be ok.
+      for (int i = 0; i < oneblock.racks.length; i++) {
+        racks.add(oneblock.racks[i]);
+      }
+
+      // if the accumulated split size exceeds the maximum, then
+      // create this split.
+      if (maxSize != 0 && curSplitSize >= maxSize) {
+        // create an input split and add it to the splits array
+        addCreatedSplit(partitions, getHosts(racks), validBlocks);
+        curSplitSize = 0;
+        validBlocks.clear();
+        racks.clear();
+      }
+    }
+
+    // Process any remaining blocks, if any.
+    if (!validBlocks.isEmpty()) {
+      addCreatedSplit(partitions, getHosts(racks), validBlocks);
+    }
+  }
+
+  private boolean isSplitable(Configuration conf, Path file) {
+    final CompressionCodec codec =
+        new CompressionCodecFactory(conf).getCodec(file);
+
+    // This method might be improved for SplittableCompression codec when we
+    // drop support for Hadoop 1.0
+    return null == codec;
+
+  }
+
+  /**
+   * Create a single split from the list of blocks specified in validBlocks
+   * Add this new split into list.
+   */
+  private void addCreatedSplit(List<Partition> partitions,
+                               Collection<String> locations,
+                               ArrayList<OneBlockInfo> validBlocks) {
+    // create an input split
+    Path[] files = new Path[validBlocks.size()];
+    long[] offsets = new long[validBlocks.size()];
+    long[] lengths = new long[validBlocks.size()];
+    for (int i = 0; i < validBlocks.size(); i++) {
+      files[i] = validBlocks.get(i).onepath;
+      offsets[i] = validBlocks.get(i).offset;
+      lengths[i] = validBlocks.get(i).length;
+    }
+
+     // add this split to the list that is returned
+    HdfsPartition partition = new HdfsPartition(
+        files, offsets, lengths, locations.toArray(new String[0]));
+    partitions.add(partition);
+  }
+
+  private Set<String> getHosts(Set<String> racks) {
+    Set<String> hosts = new HashSet<String>();
+    for (String rack : racks) {
+      if (rackToNodes.containsKey(rack)) {
+        hosts.addAll(rackToNodes.get(rack));
+      }
+    }
+    return hosts;
+  }
+
+  private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
+      String rack, String host) {
+    Set<String> hosts = rackToNodes.get(rack);
+    if (hosts == null) {
+      hosts = new HashSet<String>();
+      rackToNodes.put(rack, hosts);
+    }
+    hosts.add(host);
+  }
+
+  /**
+   * information about one file from the File System
+   */
+  private static class OneFileInfo {
+    private long fileSize;               // size of the file
+    private OneBlockInfo[] blocks;       // all blocks in this file
+
+    OneFileInfo(Path path, Configuration conf,
+                boolean isSplitable,
+                HashMap<String, List<OneBlockInfo>> rackToBlocks,
+                HashMap<OneBlockInfo, String[]> blockToNodes,
+                HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                HashMap<String, Set<String>> rackToNodes,
+                long maxSize)
+                throws IOException {
+      this.fileSize = 0;
+
+      // get block locations from file system
+      FileSystem fs = path.getFileSystem(conf);
+      FileStatus stat = fs.getFileStatus(path);
+      BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
+                                                           stat.getLen());
+      // create a list of all block and their locations
+      if (locations == null) {
+        blocks = new OneBlockInfo[0];
+      } else {
+        if (!isSplitable) {
+          // if the file is not splitable, just create the one block with
+          // full file length
+          blocks = new OneBlockInfo[1];
+          fileSize = stat.getLen();
+          blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
+              .getHosts(), locations[0].getTopologyPaths());
+        } else {
+          ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
+              locations.length);
+          for (int i = 0; i < locations.length; i++) {
+            fileSize += locations[i].getLength();
+
+            // each split can be a maximum of maxSize
+            long left = locations[i].getLength();
+            long myOffset = locations[i].getOffset();
+            long myLength = 0;
+            do {
+              if (maxSize == 0) {
+                myLength = left;
+              } else {
+                if (left > maxSize && left < 2 * maxSize) {
+                  // if remainder is between max and 2*max - then
+                  // instead of creating splits of size max, left-max we
+                  // create splits of size left/2 and left/2. This is
+                  // a heuristic to avoid creating really really small
+                  // splits.
+                  myLength = left / 2;
+                } else {
+                  myLength = Math.min(maxSize, left);
+                }
+              }
+              OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
+                  myLength, locations[i].getHosts(), locations[i]
+                      .getTopologyPaths());
+              left -= myLength;
+              myOffset += myLength;
+
+              blocksList.add(oneblock);
+            } while (left > 0);
+          }
+          blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
+        }
+
+        for (OneBlockInfo oneblock : blocks) {
+          // add this block to the block --> node locations map
+          blockToNodes.put(oneblock, oneblock.hosts);
+
+          // For blocks that do not have host/rack information,
+          // assign to default  rack.
+          String[] racks = null;
+          if (oneblock.hosts.length == 0) {
+            racks = new String[]{NetworkTopology.DEFAULT_RACK};
+          } else {
+            racks = oneblock.racks;
+          }
+
+          // add this block to the rack --> block map
+          for (int j = 0; j < racks.length; j++) {
+            String rack = racks[j];
+            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              rackToBlocks.put(rack, blklist);
+            }
+            blklist.add(oneblock);
+            if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+              // Add this host to rackToNodes map
+              addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
+            }
+          }
+
+          // add this block to the node --> block map
+          for (int j = 0; j < oneblock.hosts.length; j++) {
+            String node = oneblock.hosts[j];
+            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+            if (blklist == null) {
+              blklist = new ArrayList<OneBlockInfo>();
+              nodeToBlocks.put(node, blklist);
+            }
+            blklist.add(oneblock);
+          }
+        }
+      }
+    }
+
+  }
+
+  /**
+   * information about one block from the File System
+   */
+  private static class OneBlockInfo {
+    Path onepath;                // name of this file
+    long offset;                 // offset in file
+    long length;                 // length of this block
+    String[] hosts;              // nodes on which this block resides
+    String[] racks;              // network topology of hosts
+
+    OneBlockInfo(Path path, long offset, long len,
+                 String[] hosts, String[] topologyPaths) {
+      this.onepath = path;
+      this.offset = offset;
+      this.hosts = hosts;
+      this.length = len;
+      assert (hosts.length == topologyPaths.length ||
+              topologyPaths.length == 0);
+
+      // if the file system does not have any rack information, then
+      // use dummy rack location.
+      if (topologyPaths.length == 0) {
+        topologyPaths = new String[hosts.length];
+        for (int i = 0; i < topologyPaths.length; i++) {
+          topologyPaths[i] = (new NodeBase(hosts[i],
+                              NetworkTopology.DEFAULT_RACK)).toString();
+        }
+      }
+
+      // The topology paths have the host name included as the last
+      // component. Strip it.
+      this.racks = new String[topologyPaths.length];
+      for (int i = 0; i < topologyPaths.length; i++) {
+        this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java
new file mode 100644
index 0000000..4efbd33
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsValidator.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.connector.hdfs.configuration.*;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.Validation;
+import org.apache.sqoop.validation.Validator;
+
+/**
+ * Validate framework configuration objects
+ */
+public class HdfsValidator extends Validator {
+
+  @Override
+  public Validation validateConnection(Object connectionConfiguration) {
+    Validation validation = new Validation(ConnectionConfiguration.class);
+    // No validation on connection object
+    return validation;
+  }
+
+
+  @Override
+  public Validation validateJob(Object jobConfiguration) {
+    //TODO: I'm pretty sure this needs to call either validateExportJob or validateImportJob, depending on context
+    return super.validateJob(jobConfiguration);
+  }
+
+  private Validation validateExportJob(Object jobConfiguration) {
+    Validation validation = new Validation(FromJobConfiguration.class);
+    FromJobConfiguration configuration = (FromJobConfiguration)jobConfiguration;
+
+    validateInputForm(validation, configuration.input);
+
+
+    return validation;
+  }
+
+  private Validation validateImportJob(Object jobConfiguration) {
+    Validation validation = new Validation(ToJobConfiguration.class);
+    ToJobConfiguration configuration = (ToJobConfiguration)jobConfiguration;
+
+    validateOutputForm(validation, configuration.output);
+
+    return validation;
+  }
+
+  private void validateInputForm(Validation validation, InputForm input) {
+    if(input.inputDirectory == null || input.inputDirectory.isEmpty()) {
+      validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty");
+    }
+  }
+
+  private void validateOutputForm(Validation validation, OutputForm output) {
+    if(output.outputDirectory == null || output.outputDirectory.isEmpty()) {
+      validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty");
+    }
+    if(output.customCompression != null &&
+      output.customCompression.trim().length() > 0  &&
+      output.compression != OutputCompression.CUSTOM) {
+      validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
+        "custom compression should be blank as " + output.compression + " is being used.");
+    }
+    if(output.compression == OutputCompression.CUSTOM &&
+      (output.customCompression == null ||
+        output.customCompression.trim().length() == 0)
+      ) {
+      validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
+        "custom compression is blank.");
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java
new file mode 100644
index 0000000..6dd79d5
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+
+@ConfigurationClass
+public class ConnectionConfiguration {
+  @Form
+  public ConnectionForm connection;
+
+  public ConnectionConfiguration() {
+    connection = new ConnectionForm();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java
new file mode 100644
index 0000000..7dad2a2
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ConnectionForm.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+
+@FormClass
+public class ConnectionForm {
+ //Todo: Didn't find anything that belongs here...
+ // Since empty forms don't work (DERBYREPO_0008:The form contains no input metadata), I'm putting a dummy form here
+
+  @Input(size = 255) public String dummy;
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
new file mode 100644
index 0000000..bccb99d
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/FromJobConfiguration.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+
+@ConfigurationClass
+public class FromJobConfiguration {
+  @Form public InputForm input;
+
+
+  public FromJobConfiguration() {
+    input = new InputForm();
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java
new file mode 100644
index 0000000..413f04c
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/InputForm.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+
+/**
+ *
+ */
+@FormClass
+public class InputForm {
+
+  @Input(size = 255) public String inputDirectory;
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java
new file mode 100644
index 0000000..55db1bc
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputCompression.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+/**
+ * Supported compressions
+ */
+public enum OutputCompression {
+  NONE,
+  DEFAULT,
+  DEFLATE,
+  GZIP,
+  BZIP2,
+  LZO,
+  LZ4,
+  SNAPPY,
+  CUSTOM,
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java
new file mode 100644
index 0000000..d57b4c2
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputForm.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+
+/**
+ *
+ */
+@FormClass
+public class OutputForm {
+
+  @Input public OutputFormat outputFormat;
+
+  @Input public OutputCompression compression;
+
+  @Input(size = 255) public String customCompression;
+
+  @Input(size = 255) public String outputDirectory;
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java
new file mode 100644
index 0000000..676c33c
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/OutputFormat.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+/**
+ * Various supported formats on disk
+ */
+public enum OutputFormat {
+  /**
+   * Comma separated text file
+   */
+  TEXT_FILE,
+
+  /**
+   * Sequence file
+   */
+  SEQUENCE_FILE,
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java
new file mode 100644
index 0000000..d4aaa0a
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/StorageType.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+/**
+ * Various storage types that Sqoop is supporting
+ */
+public enum StorageType {
+  /**
+   * Direct HDFS import
+   */
+  HDFS,
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java
new file mode 100644
index 0000000..65ee8a7
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/ToJobConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+
+@ConfigurationClass
+public class ToJobConfiguration {
+    @Form
+    public OutputForm output;
+
+    public ToJobConfiguration() {
+      output = new OutputForm();
+    }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java
new file mode 100644
index 0000000..2ccccc4
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/GenericHdfsWriter.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.hdfsWriter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+import java.io.IOException;
+
+public abstract class GenericHdfsWriter {
+
+  public abstract void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException;
+
+  public abstract void write(String csv) throws IOException;
+
+  public abstract void destroy() throws IOException;
+
+}