You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/08/20 22:28:26 UTC

[2/3] SQOOP-1375: Sqoop2: From/To: Create HDFS connector

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java
new file mode 100644
index 0000000..eb80121
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsSequenceWriter.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.hdfsWriter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+
+import java.io.IOException;
+
+public class HdfsSequenceWriter  extends GenericHdfsWriter {
+
+  private SequenceFile.Writer filewriter;
+  private Text text;
+
+  public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException {
+    if (codec != null) {
+      filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
+              conf, filepath, Text.class, NullWritable.class,
+              SequenceFile.CompressionType.BLOCK, codec);
+    } else {
+      filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
+              conf, filepath, Text.class, NullWritable.class, SequenceFile.CompressionType.NONE);
+    }
+
+    text = new Text();
+  }
+
+  @Override
+  public void write(String csv) throws IOException {
+    text.set(csv);
+      filewriter.append(text, NullWritable.get());
+  }
+
+  public void destroy() throws IOException {
+    filewriter.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java
new file mode 100644
index 0000000..78cf973
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/hdfsWriter/HdfsTextWriter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.hdfsWriter;
+
+import com.google.common.base.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.connector.hdfs.HdfsConstants;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+public class HdfsTextWriter extends GenericHdfsWriter {
+
+  private BufferedWriter filewriter;
+
+  @Override
+  public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException {
+    FileSystem fs = filepath.getFileSystem(conf);
+
+    DataOutputStream filestream = fs.create(filepath, false);
+    if (codec != null) {
+      filewriter = new BufferedWriter(new OutputStreamWriter(
+              codec.createOutputStream(filestream, codec.createCompressor()),
+              Charsets.UTF_8));
+    } else {
+      filewriter = new BufferedWriter(new OutputStreamWriter(
+              filestream, Charsets.UTF_8));
+    }
+  }
+
+  @Override
+  public void write(String csv) throws IOException {
+    filewriter.write(csv + HdfsConstants.DEFAULT_RECORD_DELIMITER);
+
+  }
+
+  @Override
+  public void destroy() throws IOException {
+    filewriter.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties
new file mode 100644
index 0000000..3125911
--- /dev/null
+++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-resources.properties
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Generic HDFS Connector Resources
+
+############################
+# Connection Form
+#
+connection.label = Connection configuration
+connection.help = You must supply the information requested in order to \
+                   create a connection object.
+
+connection.dummy.label = Dummy parameter needed to get HDFS connector to register
+connection.dummy.help = You can write anything here. Doesn't matter.
+
+# Output From
+#
+output.label = Output configuration
+output.help = You must supply the information requested in order to \
+                   get information where you want to store your data.
+
+output.storageType.label = Storage type
+output.storageType.help = Target on Hadoop ecosystem where to store data
+
+output.outputFormat.label = Output format
+output.outputFormat.help = Format in which data should be serialized
+
+output.compression.label = Compression format
+output.compression.help = Compression that should be used for the data
+
+output.customCompression.label = Custom compression format
+output.customCompression.help = Full class name of the custom compression
+
+output.outputDirectory.label = Output directory
+output.outputDirectory.help = Output directory for final data
+
+output.ignored.label = Ignored
+output.ignored.help = This value is ignored
+
+# Input Form
+#
+input.label = Input configuration
+input.help = Specifies information required to get data from Hadoop ecosystem
+
+input.inputDirectory.label = Input directory
+input.inputDirectory.help = Directory that should be exported

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-hdfs/src/main/resources/sqoopconnector.properties
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/resources/sqoopconnector.properties b/connector/connector-hdfs/src/main/resources/sqoopconnector.properties
new file mode 100644
index 0000000..fa4e5e1
--- /dev/null
+++ b/connector/connector-hdfs/src/main/resources/sqoopconnector.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Generic HDFS Connector Properties
+org.apache.sqoop.connector.class = org.apache.sqoop.connector.hdfs.HdfsConnector
+org.apache.sqoop.connector.name = hdfs-connector
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index 39d48c7..1e8ab52 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -180,6 +180,10 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
       return null;
     }
 
+    if (schema == null) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
+    }
+
     if (fields.length != schema.getColumns().size()) {
       throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
         "The data " + getTextData() + " has the wrong number of fields.");
@@ -189,7 +193,8 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     Column[] cols = schema.getColumns().toArray(new Column[fields.length]);
     for (int i = 0; i < fields.length; i++) {
       Type colType = cols[i].getType();
-      if (fields[i].equals("NULL")) {
+      //TODO: Replace with proper isNull method. Actually the entire content of the loop should be a parse method
+      if (fields[i].equals("NULL") || fields[i].equals("null") || fields[i].equals("'null'") || fields[i].isEmpty()) {
         out[i] = null;
         continue;
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
index 9219074..4d41679 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
@@ -37,7 +37,9 @@ public enum IntermediateDataFormatError implements ErrorCode {
   INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
 
   /** Number of fields. */
-  INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields.")
+  INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields."),
+
+  INTERMEDIATE_DATA_FORMAT_0006("Schema missing.")
 
   ;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/connector/pom.xml
----------------------------------------------------------------------
diff --git a/connector/pom.xml b/connector/pom.xml
index d642c3e..e98a0fc 100644
--- a/connector/pom.xml
+++ b/connector/pom.xml
@@ -35,10 +35,11 @@ limitations under the License.
   <modules>
     <module>connector-sdk</module>
     <module>connector-generic-jdbc</module>
-    <!-- Uncomment and finish connectors after sqoop framework will become stable
-    <module>connector-mysql-jdbc</module>
-    <module>connector-mysql-fastpath</module>
-    -->
+    <module>connector-hdfs</module>
+      <!-- Uncomment and finish connectors after sqoop framework will become stable
+      <module>connector-mysql-jdbc</module>
+      <module>connector-mysql-fastpath</module>
+      -->
   </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
index f19a23e..46257f2 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkValidator.java
@@ -18,20 +18,13 @@
 package org.apache.sqoop.framework;
 
 import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.InputForm;
 import org.apache.sqoop.framework.configuration.JobConfiguration;
-import org.apache.sqoop.framework.configuration.OutputCompression;
-import org.apache.sqoop.framework.configuration.OutputForm;
 import org.apache.sqoop.framework.configuration.ThrottlingForm;
 import org.apache.sqoop.validation.Status;
 import org.apache.sqoop.validation.Validation;
 import org.apache.sqoop.validation.Validator;
 
-/**
- * Validate framework configuration objects
- */
 public class FrameworkValidator extends Validator {
-
   @Override
   public Validation validateConnection(Object connectionConfiguration) {
     Validation validation = new Validation(ConnectionConfiguration.class);
@@ -39,61 +32,16 @@ public class FrameworkValidator extends Validator {
     return validation;
   }
 
-
   @Override
   public Validation validateJob(Object jobConfiguration) {
-    JobConfiguration configuration = (JobConfiguration)jobConfiguration;
     Validation validation = new Validation(JobConfiguration.class);
-    validateThrottingForm(validation, configuration.throttling);
-    return super.validateJob(jobConfiguration);
-  }
-
-//  private Validation validateExportJob(Object jobConfiguration) {
-//    Validation validation = new Validation(ExportJobConfiguration.class);
-//    ExportJobConfiguration configuration = (ExportJobConfiguration)jobConfiguration;
-//
-//    validateInputForm(validation, configuration.input);
-//    validateThrottingForm(validation, configuration.throttling);
-//
-//    return validation;
-//  }
-//
-//  private Validation validateImportJob(Object jobConfiguration) {
-//    Validation validation = new Validation(ImportJobConfiguration.class);
-//    ImportJobConfiguration configuration = (ImportJobConfiguration)jobConfiguration;
-//
-//    validateOutputForm(validation, configuration.output);
-//    validateThrottingForm(validation, configuration.throttling);
-//
-//    return validation;
-//  }
+    JobConfiguration conf = (JobConfiguration)jobConfiguration;
+    validateThrottlingForm(validation,conf.throttling);
 
-//  private void validateInputForm(Validation validation, InputForm input) {
-//    if(input.inputDirectory == null || input.inputDirectory.isEmpty()) {
-//      validation.addMessage(Status.UNACCEPTABLE, "input", "inputDirectory", "Input directory is empty");
-//    }
-//  }
-//
-//  private void validateOutputForm(Validation validation, OutputForm output) {
-//    if(output.outputDirectory == null || output.outputDirectory.isEmpty()) {
-//      validation.addMessage(Status.UNACCEPTABLE, "output", "outputDirectory", "Output directory is empty");
-//    }
-//    if(output.customCompression != null &&
-//      output.customCompression.trim().length() > 0  &&
-//      output.compression != OutputCompression.CUSTOM) {
-//      validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
-//        "custom compression should be blank as " + output.compression + " is being used.");
-//    }
-//    if(output.compression == OutputCompression.CUSTOM &&
-//      (output.customCompression == null ||
-//        output.customCompression.trim().length() == 0)
-//      ) {
-//      validation.addMessage(Status.UNACCEPTABLE, "output", "compression",
-//        "custom compression is blank.");
-//    }
-//  }
+    return validation;
+  };
 
-  private void validateThrottingForm(Validation validation, ThrottlingForm throttling) {
+  private void validateThrottlingForm(Validation validation, ThrottlingForm throttling) {
     if(throttling.extractors != null && throttling.extractors < 1) {
       validation.addMessage(Status.UNACCEPTABLE, "throttling", "extractors", "You need to specify more than one extractor");
     }
@@ -102,4 +50,5 @@ public class FrameworkValidator extends Validator {
       validation.addMessage(Status.UNACCEPTABLE, "throttling", "loaders", "You need to specify more than one loader");
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index 5571928..b1b37f6 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -36,6 +36,7 @@ import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MSubmission;
 import org.apache.sqoop.repository.Repository;
 import org.apache.sqoop.repository.RepositoryManager;
+import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.submission.SubmissionStatus;
 import org.apache.sqoop.submission.counter.Counters;
 import org.apache.sqoop.utils.ClassUtils;
@@ -434,12 +435,17 @@ public class JobManager implements Reconfigurable {
         request.getConnectorJobConfig(Direction.FROM)));
 
     // @TODO(Abe): Alter behavior of Schema here. Need from Schema.
-    // Retrieve and persist the schema
-    request.getSummary().setConnectorSchema(initializer.getSchema(
-        initializerContext,
-        request.getConnectorConnectionConfig(Direction.FROM),
-        request.getConnectorJobConfig(Direction.FROM)
-    ));
+
+
+    Schema fromSchema = initializer.getSchema(initializerContext,
+            request.getConnectorConnectionConfig(Direction.FROM),
+            request.getConnectorJobConfig(Direction.FROM));
+
+    // request.getSummary().setConnectorSchema(initializer.getSchema(
+    //    initializerContext,
+    //    request.getConnectorConnectionConfig(ConnectorType.FROM),
+    //    request.getConnectorJobConfig(ConnectorType.FROM)
+    // ));
 
     // Initialize To Connector callback.
     baseCallback = request.getToCallback();
@@ -468,6 +474,11 @@ public class JobManager implements Reconfigurable {
         request.getConnectorJobConfig(Direction.TO)));
 
     // @TODO(Abe): Alter behavior of Schema here. Need To Schema.
+
+    Schema toSchema = initializer.getSchema(initializerContext,
+            request.getConnectorConnectionConfig(Direction.TO),
+            request.getConnectorJobConfig(Direction.TO));
+
     // Retrieve and persist the schema
 //    request.getSummary().setConnectorSchema(initializer.getSchema(
 //        initializerContext,
@@ -475,6 +486,12 @@ public class JobManager implements Reconfigurable {
 //        request.getConnectorJobConfig(ConnectorType.TO)
 //    ));
 
+    //TODO: Need better logic here
+    if (fromSchema != null)
+      request.getSummary().setConnectorSchema(fromSchema);
+    else
+      request.getSummary().setConnectorSchema(toSchema);
+
     // Bootstrap job from framework perspective
     prepareSubmission(request);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
index be6099e..bf3f785 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
@@ -176,9 +176,11 @@ public class SubmissionRequest {
     switch(type) {
       case FROM:
         fromConnector = connector;
+        break;
 
       case TO:
         toConnector = connector;
+        break;
 
       default:
         throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
@@ -238,10 +240,10 @@ public class SubmissionRequest {
     switch(type) {
       case FROM:
         fromConnectorConnectionConfig = config;
-
+        break;
       case TO:
         toConnectorConnectionConfig = config;
-
+        break;
       default:
         throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
     }
@@ -264,10 +266,10 @@ public class SubmissionRequest {
     switch(type) {
       case FROM:
         fromConnectorJobConfig = config;
-
+        break;
       case TO:
         toConnectorJobConfig = config;
-
+        break;
       default:
         throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
     }
@@ -290,10 +292,10 @@ public class SubmissionRequest {
     switch(type) {
       case FROM:
         fromFrameworkConnectionConfig = config;
-
+        break;
       case TO:
         toFrameworkConnectionConfig = config;
-
+        break;
       default:
         throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java
deleted file mode 100644
index 6665429..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
-
-/**
- *
- */
-@ConfigurationClass
-public class ExportJobConfiguration {
-
-  @Form public InputForm input;
-
-  @Form public ThrottlingForm throttling;
-
-  public ExportJobConfiguration() {
-    input = new InputForm();
-    throttling = new ThrottlingForm();
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
deleted file mode 100644
index 2a35eb9..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
-
-/**
- *
- */
-@ConfigurationClass
-public class ImportJobConfiguration {
-
-  @Form public OutputForm output;
-
-  @Form public ThrottlingForm throttling;
-
-  public ImportJobConfiguration() {
-    output = new OutputForm();
-    throttling = new ThrottlingForm();
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java
deleted file mode 100644
index d5cbeec..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.FormClass;
-import org.apache.sqoop.model.Input;
-
-/**
- *
- */
-@FormClass
-public class InputForm {
-
-  @Input(size = 255) public String inputDirectory;
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
index 7c653bf..0abc611 100644
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
+++ b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
@@ -22,8 +22,8 @@ import org.apache.sqoop.model.Form;
 
 @ConfigurationClass
 public class JobConfiguration {
-
-  @Form public ThrottlingForm throttling;
+  @Form
+  public ThrottlingForm throttling;
 
   public JobConfiguration() {
     throttling = new ThrottlingForm();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
deleted file mode 100644
index 6cac46d..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputCompression.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-/**
- * Supported compressions
- */
-public enum OutputCompression {
-  NONE,
-  DEFAULT,
-  DEFLATE,
-  GZIP,
-  BZIP2,
-  LZO,
-  LZ4,
-  SNAPPY,
-  CUSTOM,
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
deleted file mode 100644
index b2cdb44..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputForm.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.FormClass;
-import org.apache.sqoop.model.Input;
-
-/**
- *
- */
-@FormClass
-public class OutputForm {
-
-  @Input public StorageType storageType;
-
-  @Input public OutputFormat outputFormat;
-
-  @Input public OutputCompression compression;
-
-  @Input(size = 255) public String customCompression;
-
-  @Input(size = 255) public String outputDirectory;
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java b/core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java
deleted file mode 100644
index 4cd3589..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/OutputFormat.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-/**
- * Various supported formats on disk
- */
-public enum OutputFormat {
-  /**
-   * Comma separated text file
-   */
-  TEXT_FILE,
-
-  /**
-   * Sequence file
-   */
-  SEQUENCE_FILE,
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java b/core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java
deleted file mode 100644
index dbe9f95..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/StorageType.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-/**
- * Various storage types that Sqoop is supporting
- */
-public enum StorageType {
-  /**
-   * Direct HDFS import
-   */
-  HDFS,
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
index f875ceb..90395ac 100644
--- a/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
+++ b/core/src/test/java/org/apache/sqoop/framework/TestFrameworkValidator.java
@@ -17,15 +17,7 @@
  */
 package org.apache.sqoop.framework;
 
-import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
-import org.apache.sqoop.framework.configuration.OutputCompression;
-import org.apache.sqoop.model.MJob;
-import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.Validation;
-import org.junit.Before;
-import org.junit.Test;
+//import org.apache.sqoop.framework.configuration.OutputCompression;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
index 31df04c..f19e01c 100644
--- a/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
+++ b/core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
@@ -28,7 +28,7 @@ import org.apache.sqoop.connector.ConnectorManager;
 import org.apache.sqoop.connector.spi.MetadataUpgrader;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.framework.FrameworkManager;
-import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
+//import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
 import org.apache.sqoop.model.ConfigurationClass;
 import org.apache.sqoop.model.FormUtils;
 import org.apache.sqoop.model.MConnection;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index ff328cb..b05954b 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -80,49 +80,9 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     if(request.getExtractors() != null) {
       context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
     }
-
-    // @TODO(Abe): Move to HDFS connector.
-//    if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) {
-//      context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
-//    } else if(jobConf.output.outputFormat == OutputFormat.SEQUENCE_FILE) {
-//      context.setString(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
-//    } else {
-//      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0024,
-//        "Format: " + jobConf.output.outputFormat);
-//    }
-//    if(getCompressionCodecName(jobConf) != null) {
-//      context.setString(JobConstants.HADOOP_COMPRESS_CODEC,
-//        getCompressionCodecName(jobConf));
-//      context.setBoolean(JobConstants.HADOOP_COMPRESS, true);
-//    }
   }
 
-  // @TODO(Abe): Move to HDFS connector.
-//  private String getCompressionCodecName(ImportJobConfiguration jobConf) {
-//    if(jobConf.output.compression == null)
-//      return null;
-//    switch(jobConf.output.compression) {
-//      case NONE:
-//        return null;
-//      case DEFAULT:
-//        return "org.apache.hadoop.io.compress.DefaultCodec";
-//      case DEFLATE:
-//        return "org.apache.hadoop.io.compress.DeflateCodec";
-//      case GZIP:
-//        return "org.apache.hadoop.io.compress.GzipCodec";
-//      case BZIP2:
-//        return "org.apache.hadoop.io.compress.BZip2Codec";
-//      case LZO:
-//        return "com.hadoop.compression.lzo.LzoCodec";
-//      case LZ4:
-//        return "org.apache.hadoop.io.compress.Lz4Codec";
-//      case SNAPPY:
-//        return "org.apache.hadoop.io.compress.SnappyCodec";
-//      case CUSTOM:
-//        return jobConf.output.customCompression.trim();
-//    }
-//    return null;
-//  }
+
 
   /**
    * Our execution engine have additional dependencies that needs to be available

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
deleted file mode 100644
index c3beed7..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sqoop.common.ImmutableContext;
-
-/**
- * Implementation of immutable context that is based on Hadoop configuration
- * object. Each context property is prefixed with special prefix and loaded
- * directly.
- */
-public class PrefixContext implements ImmutableContext {
-
-  Configuration configuration;
-  String prefix;
-
-  public PrefixContext(Configuration configuration, String prefix) {
-    this.configuration = configuration;
-    this.prefix = prefix;
-  }
-
-  @Override
-  public String getString(String key) {
-    return configuration.get(prefix + key);
-  }
-
-  @Override
-  public String getString(String key, String defaultValue) {
-    return configuration.get(prefix + key, defaultValue);
-  }
-
-  @Override
-  public long getLong(String key, long defaultValue) {
-    return configuration.getLong(prefix + key, defaultValue);
-  }
-
-  @Override
-  public int getInt(String key, int defaultValue) {
-    return  configuration.getInt(prefix + key, defaultValue);
-  }
-
-  @Override
-  public boolean getBoolean(String key, boolean defaultValue) {
-    return configuration.getBoolean(prefix + key, defaultValue);
-  }
-
-  /*
-   * TODO: Use getter methods for retrieval instead of
-   * exposing configuration directly.
-   */
-  public Configuration getConfiguration() {
-    return configuration;
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
deleted file mode 100644
index 27afd8c..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.util.LineReader;
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.etl.io.DataWriter;
-import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.PrefixContext;
-
-/**
- * Extract from HDFS.
- * Default field delimiter of a record is comma.
- */
-//public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
-//
-//  public static final Logger LOG = Logger.getLogger(HdfsExportExtractor.class);
-//
-//  private Configuration conf;
-//  private DataWriter dataWriter;
-//  private long rowRead = 0;
-//
-//  @Override
-//  public void extract(ExtractorContext context,
-//      ConnectionConfiguration connectionConfiguration,
-//      ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
-//
-//    conf = ((PrefixContext) context.getContext()).getConfiguration();
-//    dataWriter = context.getDataWriter();
-//
-//    try {
-//      HdfsExportPartition p = partition;
-//      LOG.info("Working on partition: " + p);
-//      int numFiles = p.getNumberOfFiles();
-//      for (int i = 0; i < numFiles; i++) {
-//        extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
-//      }
-//    } catch (IOException e) {
-//      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
-//    }
-//  }
-//
-//  private void extractFile(Path file, long start, long length)
-//      throws IOException {
-//    long end = start + length;
-//    LOG.info("Extracting file " + file);
-//    LOG.info("\t from offset " + start);
-//    LOG.info("\t to offset " + end);
-//    LOG.info("\t of length " + length);
-//    if(isSequenceFile(file)) {
-//      extractSequenceFile(file, start, length);
-//    } else {
-//      extractTextFile(file, start, length);
-//    }
-//  }
-//
-//  /**
-//   * Extracts Sequence file
-//   * @param file
-//   * @param start
-//   * @param length
-//   * @throws IOException
-//   */
-//  private void extractSequenceFile(Path file, long start, long length)
-//      throws IOException {
-//    LOG.info("Extracting sequence file");
-//    long end = start + length;
-//    SequenceFile.Reader filereader = new SequenceFile.Reader(
-//        file.getFileSystem(conf), file, conf);
-//
-//    if (start > filereader.getPosition()) {
-//      filereader.sync(start); // sync to start
-//    }
-//
-//    Text line = new Text();
-//    boolean hasNext = filereader.next(line);
-//    while (hasNext) {
-//      rowRead++;
-//      dataWriter.writeStringRecord(line.toString());
-//      line = new Text();
-//      hasNext = filereader.next(line);
-//      if (filereader.getPosition() >= end && filereader.syncSeen()) {
-//        break;
-//      }
-//    }
-//    filereader.close();
-//  }
-//
-//  /**
-//   * Extracts Text file
-//   * @param file
-//   * @param start
-//   * @param length
-//   * @throws IOException
-//   */
-//  private void extractTextFile(Path file, long start, long length)
-//      throws IOException {
-//    LOG.info("Extracting text file");
-//    long end = start + length;
-//    FileSystem fs = file.getFileSystem(conf);
-//    FSDataInputStream filestream = fs.open(file);
-//    CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
-//    LineReader filereader;
-//    Seekable fileseeker = filestream;
-//
-//    // Hadoop 1.0 does not have support for custom record delimiter and thus
-//    // we
-//    // are supporting only default one.
-//    // We might add another "else if" case for SplittableCompressionCodec once
-//    // we drop support for Hadoop 1.0.
-//    if (codec == null) {
-//      filestream.seek(start);
-//      filereader = new LineReader(filestream);
-//    } else {
-//      filereader = new LineReader(codec.createInputStream(filestream,
-//          codec.createDecompressor()), conf);
-//      fileseeker = filestream;
-//    }
-//    if (start != 0) {
-//      // always throw away first record because
-//      // one extra line is read in previous split
-//      start += filereader.readLine(new Text(), 0);
-//    }
-//    int size;
-//    LOG.info("Start position: " + String.valueOf(start));
-//    long next = start;
-//    while (next <= end) {
-//      Text line = new Text();
-//      size = filereader.readLine(line, Integer.MAX_VALUE);
-//      if (size == 0) {
-//        break;
-//      }
-//      if (codec == null) {
-//        next += size;
-//      } else {
-//        next = fileseeker.getPos();
-//      }
-//      rowRead++;
-//      dataWriter.writeStringRecord(line.toString());
-//    }
-//    LOG.info("Extracting ended on position: " + fileseeker.getPos());
-//    filestream.close();
-//  }
-//
-//  @Override
-//  public long getRowsRead() {
-//    return rowRead;
-//  }
-//
-//  /**
-//   * Returns true if given file is sequence
-//   * @param file
-//   * @return boolean
-//   */
-//  private boolean isSequenceFile(Path file) {
-//    SequenceFile.Reader filereader = null;
-//    try {
-//      filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf);
-//      filereader.close();
-//    } catch (IOException e) {
-//      return false;
-//    }
-//    return true;
-//  }
-//}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java
deleted file mode 100644
index cdbdaa8..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartition.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.job.etl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
-
-/**
- * This class derives mostly from CombineFileSplit of Hadoop, i.e.
- * org.apache.hadoop.mapreduce.lib.input.CombineFileSplit.
- */
-public class HdfsExportPartition extends Partition {
-
-  private long lenFiles;
-  private int numFiles;
-  private Path[] files;
-  private long[] offsets;
-  private long[] lengths;
-  private String[] locations;
-
-  public HdfsExportPartition() {}
-
-  public HdfsExportPartition(Path[] files, long[] offsets,
-      long[] lengths, String[] locations) {
-    for(long length : lengths) {
-      this.lenFiles += length;
-    }
-    this.numFiles = files.length;
-    this.files = files;
-    this.offsets = offsets;
-    this.lengths = lengths;
-    this.locations = locations;
-  }
-
-  public long getLengthOfFiles() {
-    return lenFiles;
-  }
-
-  public int getNumberOfFiles() {
-    return numFiles;
-  }
-
-  public Path getFile(int i) {
-    return files[i];
-  }
-
-  public long getOffset(int i) {
-    return offsets[i];
-  }
-
-  public long getLength(int i) {
-    return lengths[i];
-  }
-
-  public String[] getLocations() {
-    return locations;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    numFiles = in.readInt();
-
-    files = new Path[numFiles];
-    for(int i=0; i<numFiles; i++) {
-      files[i] = new Path(in.readUTF());
-    }
-
-    offsets = new long[numFiles];
-    for(int i=0; i<numFiles; i++) {
-      offsets[i] = in.readLong();
-    }
-
-    lengths = new long[numFiles];
-    for(int i=0; i<numFiles; i++) {
-      lengths[i] = in.readLong();
-    }
-
-    for(long length : lengths) {
-      lenFiles += length;
-    }
-
-    int numLocations = in.readInt();
-    if (numLocations == 0) {
-      locations = null;
-    } else {
-      locations = new String[numLocations];
-      for(int i=0; i<numLocations; i++) {
-        locations[i] = in.readUTF();
-      }
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(numFiles);
-
-    for(Path file : files) {
-      out.writeUTF(file.toString());
-    }
-
-    for(long offset : offsets) {
-      out.writeLong(offset);
-    }
-
-    for(long length : lengths) {
-      out.writeLong(length);
-    }
-
-    if (locations == null || locations.length == 0) {
-      out.writeInt(0);
-    } else {
-      out.writeInt(locations.length);
-      for(String location : locations) {
-        out.writeUTF(location);
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("{");
-    boolean first = true;
-    for(int i = 0; i < files.length; i++) {
-      if(first) {
-        first = false;
-      } else {
-        sb.append(", ");
-      }
-
-      sb.append(files[i]);
-      sb.append(" (offset=").append(offsets[i]);
-      sb.append(", end=").append(offsets[i] + lengths[i]);
-      sb.append(", length=").append(lengths[i]);
-      sb.append(")");
-    }
-    sb.append("}");
-    return sb.toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
deleted file mode 100644
index b3590dc..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.job.etl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.PrefixContext;
-
-/**
- * This class derives mostly from CombineFileInputFormat of Hadoop, i.e.
- * org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.
- */
-public class HdfsExportPartitioner extends Partitioner {
-
-  public static final String SPLIT_MINSIZE_PERNODE =
-      "mapreduce.input.fileinputformat.split.minsize.per.node";
-  public static final String SPLIT_MINSIZE_PERRACK =
-      "mapreduce.input.fileinputformat.split.minsize.per.rack";
-
-  // ability to limit the size of a single split
-  private long maxSplitSize = 0;
-  private long minSplitSizeNode = 0;
-  private long minSplitSizeRack = 0;
-
-  // mapping from a rack name to the set of Nodes in the rack
-  private HashMap<String, Set<String>> rackToNodes =
-      new HashMap<String, Set<String>>();
-
-  @Override
-  public List<Partition> getPartitions(PartitionerContext context,
-      Object connectionConfiguration, Object jobConfiguration) {
-
-    Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
-
-    try {
-      long numInputBytes = getInputSize(conf);
-      maxSplitSize = numInputBytes / context.getMaxPartitions();
-
-      if(numInputBytes % context.getMaxPartitions() != 0 ) {
-        maxSplitSize += 1;
-       }
-
-      long minSizeNode = 0;
-      long minSizeRack = 0;
-      long maxSize = 0;
-
-      // the values specified by setxxxSplitSize() takes precedence over the
-      // values that might have been specified in the config
-      if (minSplitSizeNode != 0) {
-        minSizeNode = minSplitSizeNode;
-      } else {
-        minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
-      }
-      if (minSplitSizeRack != 0) {
-        minSizeRack = minSplitSizeRack;
-      } else {
-        minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
-      }
-      if (maxSplitSize != 0) {
-        maxSize = maxSplitSize;
-      } else {
-        maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
-      }
-      if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
-        throw new IOException("Minimum split size pernode " + minSizeNode +
-                              " cannot be larger than maximum split size " +
-                              maxSize);
-      }
-      if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
-        throw new IOException("Minimum split size per rack" + minSizeRack +
-                              " cannot be larger than maximum split size " +
-                              maxSize);
-      }
-      if (minSizeRack != 0 && minSizeNode > minSizeRack) {
-        throw new IOException("Minimum split size per node" + minSizeNode +
-                              " cannot be smaller than minimum split " +
-                              "size per rack " + minSizeRack);
-      }
-
-      // all the files in input set
-      String indir = conf.get(JobConstants.HADOOP_INPUTDIR);
-      FileSystem fs = FileSystem.get(conf);
-
-      List<Path> paths = new LinkedList<Path>();
-      for(FileStatus status : fs.listStatus(new Path(indir))) {
-        if(!status.isDir()) {
-          paths.add(status.getPath());
-        }
-      }
-
-      List<Partition> partitions = new ArrayList<Partition>();
-      if (paths.size() == 0) {
-        return partitions;
-      }
-
-      // create splits for all files that are not in any pool.
-      getMoreSplits(conf, paths,
-                    maxSize, minSizeNode, minSizeRack, partitions);
-
-      // free up rackToNodes map
-      rackToNodes.clear();
-
-      return partitions;
-
-    } catch (IOException e) {
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0021, e);
-    }
-  }
-
-  private long getInputSize(Configuration conf) throws IOException {
-    String indir = conf.get(JobConstants.HADOOP_INPUTDIR);
-    FileSystem fs = FileSystem.get(conf);
-    FileStatus[] files = fs.listStatus(new Path(indir));
-    long count = 0;
-    for (FileStatus file : files) {
-      count += file.getLen();
-    }
-    return count;
-  }
-
-  /**
-   * Return all the splits in the specified set of paths
-   */
-  private void getMoreSplits(Configuration conf, List<Path> paths,
-      long maxSize, long minSizeNode, long minSizeRack,
-      List<Partition> partitions) throws IOException {
-
-    // all blocks for all the files in input set
-    OneFileInfo[] files;
-
-    // mapping from a rack name to the list of blocks it has
-    HashMap<String, List<OneBlockInfo>> rackToBlocks =
-                              new HashMap<String, List<OneBlockInfo>>();
-
-    // mapping from a block to the nodes on which it has replicas
-    HashMap<OneBlockInfo, String[]> blockToNodes =
-                              new HashMap<OneBlockInfo, String[]>();
-
-    // mapping from a node to the list of blocks that it contains
-    HashMap<String, List<OneBlockInfo>> nodeToBlocks =
-                              new HashMap<String, List<OneBlockInfo>>();
-
-    files = new OneFileInfo[paths.size()];
-    if (paths.size() == 0) {
-      return;
-    }
-
-    // populate all the blocks for all files
-    for (int i = 0; i < paths.size(); i++) {
-      files[i] = new OneFileInfo(paths.get(i), conf, isSplitable(conf, paths.get(i)),
-                                 rackToBlocks, blockToNodes, nodeToBlocks,
-                                 rackToNodes, maxSize);
-    }
-
-    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
-    Set<String> nodes = new HashSet<String>();
-    long curSplitSize = 0;
-
-    // process all nodes and create splits that are local
-    // to a node.
-    for (Iterator<Map.Entry<String,
-         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
-         iter.hasNext();) {
-
-      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-      nodes.add(one.getKey());
-      List<OneBlockInfo> blocksInNode = one.getValue();
-
-      // for each block, copy it into validBlocks. Delete it from
-      // blockToNodes so that the same block does not appear in
-      // two different splits.
-      for (OneBlockInfo oneblock : blocksInNode) {
-        if (blockToNodes.containsKey(oneblock)) {
-          validBlocks.add(oneblock);
-          blockToNodes.remove(oneblock);
-          curSplitSize += oneblock.length;
-
-          // if the accumulated split size exceeds the maximum, then
-          // create this split.
-          if (maxSize != 0 && curSplitSize >= maxSize) {
-            // create an input split and add it to the splits array
-            addCreatedSplit(partitions, nodes, validBlocks);
-            curSplitSize = 0;
-            validBlocks.clear();
-          }
-        }
-      }
-      // if there were any blocks left over and their combined size is
-      // larger than minSplitNode, then combine them into one split.
-      // Otherwise add them back to the unprocessed pool. It is likely
-      // that they will be combined with other blocks from the
-      // same rack later on.
-      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
-        // create an input split and add it to the splits array
-        addCreatedSplit(partitions, nodes, validBlocks);
-      } else {
-        for (OneBlockInfo oneblock : validBlocks) {
-          blockToNodes.put(oneblock, oneblock.hosts);
-        }
-      }
-      validBlocks.clear();
-      nodes.clear();
-      curSplitSize = 0;
-    }
-
-    // if blocks in a rack are below the specified minimum size, then keep them
-    // in 'overflow'. After the processing of all racks is complete, these
-    // overflow blocks will be combined into splits.
-    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
-    Set<String> racks = new HashSet<String>();
-
-    // Process all racks over and over again until there is no more work to do.
-    while (blockToNodes.size() > 0) {
-
-      // Create one split for this rack before moving over to the next rack.
-      // Come back to this rack after creating a single split for each of the
-      // remaining racks.
-      // Process one rack location at a time, Combine all possible blocks that
-      // reside on this rack as one split. (constrained by minimum and maximum
-      // split size).
-
-      // iterate over all racks
-      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
-           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
-
-        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-        racks.add(one.getKey());
-        List<OneBlockInfo> blocks = one.getValue();
-
-        // for each block, copy it into validBlocks. Delete it from
-        // blockToNodes so that the same block does not appear in
-        // two different splits.
-        boolean createdSplit = false;
-        for (OneBlockInfo oneblock : blocks) {
-          if (blockToNodes.containsKey(oneblock)) {
-            validBlocks.add(oneblock);
-            blockToNodes.remove(oneblock);
-            curSplitSize += oneblock.length;
-
-            // if the accumulated split size exceeds the maximum, then
-            // create this split.
-            if (maxSize != 0 && curSplitSize >= maxSize) {
-              // create an input split and add it to the splits array
-              addCreatedSplit(partitions, getHosts(racks), validBlocks);
-              createdSplit = true;
-              break;
-            }
-          }
-        }
-
-        // if we created a split, then just go to the next rack
-        if (createdSplit) {
-          curSplitSize = 0;
-          validBlocks.clear();
-          racks.clear();
-          continue;
-        }
-
-        if (!validBlocks.isEmpty()) {
-          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
-            // if there is a minimum size specified, then create a single split
-            // otherwise, store these blocks into overflow data structure
-            addCreatedSplit(partitions, getHosts(racks), validBlocks);
-          } else {
-            // There were a few blocks in this rack that
-            // remained to be processed. Keep them in 'overflow' block list.
-            // These will be combined later.
-            overflowBlocks.addAll(validBlocks);
-          }
-        }
-        curSplitSize = 0;
-        validBlocks.clear();
-        racks.clear();
-      }
-    }
-
-    assert blockToNodes.isEmpty();
-    assert curSplitSize == 0;
-    assert validBlocks.isEmpty();
-    assert racks.isEmpty();
-
-    // Process all overflow blocks
-    for (OneBlockInfo oneblock : overflowBlocks) {
-      validBlocks.add(oneblock);
-      curSplitSize += oneblock.length;
-
-      // This might cause an exiting rack location to be re-added,
-      // but it should be ok.
-      for (int i = 0; i < oneblock.racks.length; i++) {
-        racks.add(oneblock.racks[i]);
-      }
-
-      // if the accumulated split size exceeds the maximum, then
-      // create this split.
-      if (maxSize != 0 && curSplitSize >= maxSize) {
-        // create an input split and add it to the splits array
-        addCreatedSplit(partitions, getHosts(racks), validBlocks);
-        curSplitSize = 0;
-        validBlocks.clear();
-        racks.clear();
-      }
-    }
-
-    // Process any remaining blocks, if any.
-    if (!validBlocks.isEmpty()) {
-      addCreatedSplit(partitions, getHosts(racks), validBlocks);
-    }
-  }
-
-  private boolean isSplitable(Configuration conf, Path file) {
-    final CompressionCodec codec =
-        new CompressionCodecFactory(conf).getCodec(file);
-
-    // This method might be improved for SplittableCompression codec when we
-    // drop support for Hadoop 1.0
-    return null == codec;
-
-  }
-
-  /**
-   * Create a single split from the list of blocks specified in validBlocks
-   * Add this new split into list.
-   */
-  private void addCreatedSplit(List<Partition> partitions,
-                               Collection<String> locations,
-                               ArrayList<OneBlockInfo> validBlocks) {
-    // create an input split
-    Path[] files = new Path[validBlocks.size()];
-    long[] offsets = new long[validBlocks.size()];
-    long[] lengths = new long[validBlocks.size()];
-    for (int i = 0; i < validBlocks.size(); i++) {
-      files[i] = validBlocks.get(i).onepath;
-      offsets[i] = validBlocks.get(i).offset;
-      lengths[i] = validBlocks.get(i).length;
-    }
-
-     // add this split to the list that is returned
-    HdfsExportPartition partition = new HdfsExportPartition(
-        files, offsets, lengths, locations.toArray(new String[0]));
-    partitions.add(partition);
-  }
-
-  private Set<String> getHosts(Set<String> racks) {
-    Set<String> hosts = new HashSet<String>();
-    for (String rack : racks) {
-      if (rackToNodes.containsKey(rack)) {
-        hosts.addAll(rackToNodes.get(rack));
-      }
-    }
-    return hosts;
-  }
-
-  private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
-      String rack, String host) {
-    Set<String> hosts = rackToNodes.get(rack);
-    if (hosts == null) {
-      hosts = new HashSet<String>();
-      rackToNodes.put(rack, hosts);
-    }
-    hosts.add(host);
-  }
-
-  /**
-   * information about one file from the File System
-   */
-  private static class OneFileInfo {
-    private long fileSize;               // size of the file
-    private OneBlockInfo[] blocks;       // all blocks in this file
-
-    OneFileInfo(Path path, Configuration conf,
-                boolean isSplitable,
-                HashMap<String, List<OneBlockInfo>> rackToBlocks,
-                HashMap<OneBlockInfo, String[]> blockToNodes,
-                HashMap<String, List<OneBlockInfo>> nodeToBlocks,
-                HashMap<String, Set<String>> rackToNodes,
-                long maxSize)
-                throws IOException {
-      this.fileSize = 0;
-
-      // get block locations from file system
-      FileSystem fs = path.getFileSystem(conf);
-      FileStatus stat = fs.getFileStatus(path);
-      BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
-                                                           stat.getLen());
-      // create a list of all block and their locations
-      if (locations == null) {
-        blocks = new OneBlockInfo[0];
-      } else {
-        if (!isSplitable) {
-          // if the file is not splitable, just create the one block with
-          // full file length
-          blocks = new OneBlockInfo[1];
-          fileSize = stat.getLen();
-          blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
-              .getHosts(), locations[0].getTopologyPaths());
-        } else {
-          ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
-              locations.length);
-          for (int i = 0; i < locations.length; i++) {
-            fileSize += locations[i].getLength();
-
-            // each split can be a maximum of maxSize
-            long left = locations[i].getLength();
-            long myOffset = locations[i].getOffset();
-            long myLength = 0;
-            do {
-              if (maxSize == 0) {
-                myLength = left;
-              } else {
-                if (left > maxSize && left < 2 * maxSize) {
-                  // if remainder is between max and 2*max - then
-                  // instead of creating splits of size max, left-max we
-                  // create splits of size left/2 and left/2. This is
-                  // a heuristic to avoid creating really really small
-                  // splits.
-                  myLength = left / 2;
-                } else {
-                  myLength = Math.min(maxSize, left);
-                }
-              }
-              OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
-                  myLength, locations[i].getHosts(), locations[i]
-                      .getTopologyPaths());
-              left -= myLength;
-              myOffset += myLength;
-
-              blocksList.add(oneblock);
-            } while (left > 0);
-          }
-          blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
-        }
-
-        for (OneBlockInfo oneblock : blocks) {
-          // add this block to the block --> node locations map
-          blockToNodes.put(oneblock, oneblock.hosts);
-
-          // For blocks that do not have host/rack information,
-          // assign to default  rack.
-          String[] racks = null;
-          if (oneblock.hosts.length == 0) {
-            racks = new String[]{NetworkTopology.DEFAULT_RACK};
-          } else {
-            racks = oneblock.racks;
-          }
-
-          // add this block to the rack --> block map
-          for (int j = 0; j < racks.length; j++) {
-            String rack = racks[j];
-            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              rackToBlocks.put(rack, blklist);
-            }
-            blklist.add(oneblock);
-            if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
-              // Add this host to rackToNodes map
-              addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
-            }
-          }
-
-          // add this block to the node --> block map
-          for (int j = 0; j < oneblock.hosts.length; j++) {
-            String node = oneblock.hosts[j];
-            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              nodeToBlocks.put(node, blklist);
-            }
-            blklist.add(oneblock);
-          }
-        }
-      }
-    }
-
-  }
-
-  /**
-   * information about one block from the File System
-   */
-  private static class OneBlockInfo {
-    Path onepath;                // name of this file
-    long offset;                 // offset in file
-    long length;                 // length of this block
-    String[] hosts;              // nodes on which this block resides
-    String[] racks;              // network topology of hosts
-
-    OneBlockInfo(Path path, long offset, long len,
-                 String[] hosts, String[] topologyPaths) {
-      this.onepath = path;
-      this.offset = offset;
-      this.hosts = hosts;
-      this.length = len;
-      assert (hosts.length == topologyPaths.length ||
-              topologyPaths.length == 0);
-
-      // if the file system does not have any rack information, then
-      // use dummy rack location.
-      if (topologyPaths.length == 0) {
-        topologyPaths = new String[hosts.length];
-        for (int i = 0; i < topologyPaths.length; i++) {
-          topologyPaths[i] = (new NodeBase(hosts[i],
-                              NetworkTopology.DEFAULT_RACK)).toString();
-        }
-      }
-
-      // The topology paths have the host name included as the last
-      // component. Strip it.
-      this.racks = new String[topologyPaths.length];
-      for (int i = 0; i < topologyPaths.length; i++) {
-        this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
deleted file mode 100644
index d4ffb13..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.etl.io.DataReader;
-import org.apache.sqoop.utils.ClassUtils;
-
-public class HdfsSequenceImportLoader extends Loader {
-
-  public static final String EXTENSION = ".seq";
-
-  @Override
-  public void load(LoaderContext context, Object oc, Object oj) throws Exception {
-    DataReader reader = context.getDataReader();
-
-    Configuration conf = new Configuration();
-//    Configuration conf = ((EtlContext)context).getConfiguration();
-    String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
-    String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
-
-    CompressionCodec codec = null;
-    if (codecname != null) {
-      Class<?> clz = ClassUtils.loadClass(codecname);
-      if (clz == null) {
-        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, codecname);
-      }
-
-      try {
-        codec = (CompressionCodec) clz.newInstance();
-        if (codec instanceof Configurable) {
-          ((Configurable) codec).setConf(conf);
-        }
-      } catch (Exception e) {
-        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, codecname, e);
-      }
-    }
-
-    filename += EXTENSION;
-
-    try {
-      Path filepath = new Path(filename);
-      SequenceFile.Writer filewriter;
-      if (codec != null) {
-        filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
-          conf, filepath, Text.class, NullWritable.class,
-          CompressionType.BLOCK, codec);
-      } else {
-        filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
-          conf, filepath, Text.class, NullWritable.class, CompressionType.NONE);
-      }
-
-      String csv;
-      Text text = new Text();
-      while ((csv = reader.readTextRecord()) != null) {
-        text.set(csv);
-        filewriter.append(text, NullWritable.get());
-      }
-      filewriter.close();
-
-    } catch (IOException e) {
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, e);
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
deleted file mode 100644
index 7b799ca..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import com.google.common.base.Charsets;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.etl.io.DataReader;
-import org.apache.sqoop.utils.ClassUtils;
-
-public class HdfsTextImportLoader extends Loader {
-
-  private final char recordDelimiter;
-
-  public HdfsTextImportLoader() {
-    recordDelimiter = Data.DEFAULT_RECORD_DELIMITER;
-  }
-
-  @Override
-  public void load(LoaderContext context, Object oc, Object oj) throws Exception{
-    DataReader reader = context.getDataReader();
-
-    Configuration conf = new Configuration();
-//    Configuration conf = ((EtlContext)context).getConfiguration();
-    String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
-    String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
-
-    CompressionCodec codec = null;
-    if (codecname != null) {
-      Class<?> clz = ClassUtils.loadClass(codecname);
-      if (clz == null) {
-        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, codecname);
-      }
-
-      try {
-        codec = (CompressionCodec) clz.newInstance();
-        if (codec instanceof Configurable) {
-          ((Configurable) codec).setConf(conf);
-        }
-      } catch (Exception e) {
-        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, codecname, e);
-      }
-
-      filename += codec.getDefaultExtension();
-    }
-
-    try {
-      Path filepath = new Path(filename);
-      FileSystem fs = filepath.getFileSystem(conf);
-
-      BufferedWriter filewriter;
-      DataOutputStream filestream = fs.create(filepath, false);
-      if (codec != null) {
-        filewriter = new BufferedWriter(new OutputStreamWriter(
-          codec.createOutputStream(filestream, codec.createCompressor()),
-          Charsets.UTF_8));
-      } else {
-        filewriter = new BufferedWriter(new OutputStreamWriter(
-            filestream, Charsets.UTF_8));
-      }
-
-      String csv;
-      while ((csv = reader.readTextRecord()) != null) {
-        filewriter.write(csv + recordDelimiter);
-      }
-      filewriter.close();
-
-    } catch (IOException e) {
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, e);
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
index 8e31ef5..59431f4 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
 import org.apache.sqoop.schema.Schema;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
index e96909a..1c1133a 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -33,7 +33,7 @@ import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
 import org.apache.sqoop.job.etl.PartitionerContext;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/5c29a2a2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 6e2cfbf..1d60ba3 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -31,7 +31,7 @@ import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.etl.io.DataWriter;
@@ -66,7 +66,16 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
 
     // Propagate connector schema in every case for now
     // TODO: Change to coditional choosing between Connector schemas.
+
     Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
+    if (schema==null) {
+      schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
+    }
+
+    if (schema==null) {
+      LOG.info("setting an empty schema");
+    }
+
 
     String intermediateDataFormatName = conf.get(JobConstants
       .INTERMEDIATE_DATA_FORMAT);