You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by bl...@apache.org on 2012/11/06 21:53:35 UTC

[2/5] git commit: SQOOP-666 Introduce execution engine (Jarek Jarcec Cecho)

SQOOP-666 Introduce execution engine
(Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/06e054bc
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/06e054bc
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/06e054bc

Branch: refs/heads/sqoop2
Commit: 06e054bc8d5350f1f15d88a0c6e9b6b236820327
Parents: 25f3fd3
Author: Bilung Lee <bl...@apache.org>
Authored: Tue Nov 6 12:43:03 2012 -0800
Committer: Bilung Lee <bl...@apache.org>
Committed: Tue Nov 6 12:43:03 2012 -0800

----------------------------------------------------------------------
 core/pom.xml                                       |   14 +-
 .../apache/sqoop/framework/ExecutionEngine.java    |   72 +++
 .../apache/sqoop/framework/FrameworkConstants.java |    9 +
 .../org/apache/sqoop/framework/FrameworkError.java |    4 +
 .../apache/sqoop/framework/FrameworkManager.java   |  136 ++++--
 .../apache/sqoop/framework/SubmissionEngine.java   |    9 +
 .../apache/sqoop/framework/SubmissionRequest.java  |   60 +---
 .../java/org/apache/sqoop/job/JobConstants.java    |   82 ----
 .../java/org/apache/sqoop/job/PrefixContext.java   |   62 ---
 .../sqoop/job/etl/HdfsSequenceImportLoader.java    |  108 ----
 .../apache/sqoop/job/etl/HdfsTextImportLoader.java |  103 ----
 .../main/java/org/apache/sqoop/job/io/Data.java    |  378 ---------------
 .../java/org/apache/sqoop/job/io/FieldTypes.java   |   42 --
 .../apache/sqoop/job/mr/ConfigurationUtils.java    |   65 ---
 .../apache/sqoop/job/mr/SqoopFileOutputFormat.java |   69 ---
 .../org/apache/sqoop/job/mr/SqoopInputFormat.java  |  118 -----
 .../java/org/apache/sqoop/job/mr/SqoopMapper.java  |  109 -----
 .../apache/sqoop/job/mr/SqoopNullOutputFormat.java |   77 ---
 .../job/mr/SqoopOutputFormatLoadExecutor.java      |  228 ---------
 .../java/org/apache/sqoop/job/mr/SqoopReducer.java |   35 --
 .../java/org/apache/sqoop/job/mr/SqoopSplit.java   |   82 ----
 .../test/java/org/apache/sqoop/io/TestData.java    |   76 ---
 .../test/java/org/apache/sqoop/job/FileUtils.java  |   69 ---
 .../test/java/org/apache/sqoop/job/JobUtils.java   |   69 ---
 .../java/org/apache/sqoop/job/TestHdfsLoad.java    |  232 ---------
 .../java/org/apache/sqoop/job/TestJobEngine.java   |  196 --------
 .../java/org/apache/sqoop/job/TestMapReduce.java   |  229 ---------
 dist/src/main/server/conf/sqoop.properties         |    5 +
 execution/mapreduce/pom.xml                        |   67 +++
 .../execution/mapreduce/MRSubmissionRequest.java   |  110 +++++
 .../mapreduce/MapreduceExecutionEngine.java        |   74 +++
 .../java/org/apache/sqoop/job/JobConstants.java    |   82 ++++
 .../java/org/apache/sqoop/job/PrefixContext.java   |   62 +++
 .../sqoop/job/etl/HdfsSequenceImportLoader.java    |  108 ++++
 .../apache/sqoop/job/etl/HdfsTextImportLoader.java |  103 ++++
 .../main/java/org/apache/sqoop/job/io/Data.java    |  378 +++++++++++++++
 .../java/org/apache/sqoop/job/io/FieldTypes.java   |   42 ++
 .../apache/sqoop/job/mr/ConfigurationUtils.java    |   65 +++
 .../apache/sqoop/job/mr/SqoopFileOutputFormat.java |   69 +++
 .../org/apache/sqoop/job/mr/SqoopInputFormat.java  |  118 +++++
 .../java/org/apache/sqoop/job/mr/SqoopMapper.java  |  109 +++++
 .../apache/sqoop/job/mr/SqoopNullOutputFormat.java |   77 +++
 .../job/mr/SqoopOutputFormatLoadExecutor.java      |  228 +++++++++
 .../java/org/apache/sqoop/job/mr/SqoopReducer.java |   35 ++
 .../java/org/apache/sqoop/job/mr/SqoopSplit.java   |   82 ++++
 .../test/java/org/apache/sqoop/job/FileUtils.java  |   69 +++
 .../test/java/org/apache/sqoop/job/JobUtils.java   |   69 +++
 .../java/org/apache/sqoop/job/TestHdfsLoad.java    |  232 +++++++++
 .../java/org/apache/sqoop/job/TestJobEngine.java   |  196 ++++++++
 .../java/org/apache/sqoop/job/TestMapReduce.java   |  229 +++++++++
 .../java/org/apache/sqoop/job/io/TestData.java     |   75 +++
 execution/pom.xml                                  |   36 ++
 pom.xml                                            |    1 +
 submission/mapreduce/pom.xml                       |    6 +
 .../mapreduce/MapreduceSubmissionEngine.java       |   20 +-
 55 files changed, 2941 insertions(+), 2539 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 028c240..0732b2c 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -36,29 +36,23 @@ limitations under the License.
       <groupId>org.apache.sqoop</groupId>
       <artifactId>sqoop-spi</artifactId>
     </dependency>
+
     <dependency>
       <groupId>org.apache.sqoop</groupId>
       <artifactId>sqoop-common</artifactId>
     </dependency>
+
     <dependency>
       <groupId>commons-dbcp</groupId>
       <artifactId>commons-dbcp</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-      <scope>provided</scope>
-    </dependency>
+
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
new file mode 100644
index 0000000..e1ccdf6
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.framework;
+
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.model.MSubmission;
+
+/**
+ * Execution engine drive execution of sqoop submission (job). It's responsible
+ * for executing all defined steps in the import/export workflow.
+ */
+public abstract class ExecutionEngine {
+
+  /**
+   * Initialize execution engine
+   *
+   * @param context Configuration context
+   */
+  public void initialize(ImmutableContext context, String prefix) {
+  }
+
+  /**
+   * Destroy execution engine when stopping server
+   */
+  public void destroy() {
+  }
+
+  /**
+   * Return new SubmissionRequest class or any subclass if it's needed by
+   * execution and submission engine combination.
+   *
+   * @param summary Submission summary
+   * @param connector Appropriate connector structure
+   * @param connectorConnection Connector connection configuration
+   * @param connectorJob Connector job configuration
+   * @param frameworkConnection Framework connection configuration
+   * @param frameworkJob Framework job configuration
+   * @return New Submission request object
+   */
+  public SubmissionRequest createSubmissionRequest(MSubmission summary,
+                                                   SqoopConnector connector,
+                                                   Object connectorConnection,
+                                                   Object connectorJob,
+                                                   Object frameworkConnection,
+                                                   Object frameworkJob) {
+    return new SubmissionRequest(summary, connector,
+      connectorConnection, connectorJob, frameworkConnection, frameworkJob);
+  }
+
+  /**
+   * Prepare given submission request for import submission.
+   *
+   * @param request Submission request
+   */
+  public abstract void prepareImportSubmission(SubmissionRequest request);
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
index d6e70ca..32da4e8 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
@@ -29,6 +29,9 @@ public final class FrameworkConstants {
   public static final String PREFIX_SUBMISSION_CONFIG =
     ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "submission.";
 
+  public static final String PREFIX_EXECUTION_CONFIG =
+    ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "execution.";
+
   public static final String SYSCFG_SUBMISSION_ENGINE =
     PREFIX_SUBMISSION_CONFIG + "engine";
 
@@ -50,6 +53,12 @@ public final class FrameworkConstants {
   public static final String SYSCFG_SUBMISSION_UPDATE_SLEEP =
     PREFIX_SUBMISSION_UPDATE_CONFIG + "sleep";
 
+  public static final String SYSCFG_EXECUTION_ENGINE =
+    PREFIX_EXECUTION_CONFIG + "engine";
+
+  public static final String PREFIX_EXECUTION_ENGINE_CONFIG =
+    SYSCFG_EXECUTION_ENGINE + ".";
+
   // Connection/Job Configuration forms
 
   public static final String FORM_SECURITY =

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
index 19d0d87..4277311 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
@@ -38,6 +38,10 @@ public enum FrameworkError implements ErrorCode {
 
   FRAMEWORK_0006("Can't bootstrap job"),
 
+  FRAMEWORK_0007("Invalid execution engine"),
+
+  FRAMEWORK_0008("Invalid combination of submission and execution engines"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index 604d403..7e10ddc 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -19,7 +19,6 @@ package org.apache.sqoop.framework;
 
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.MapContext;
-import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.ConnectorManager;
 import org.apache.sqoop.connector.spi.SqoopConnector;
@@ -27,11 +26,8 @@ import org.apache.sqoop.core.SqoopConfiguration;
 import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
 import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
 import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
-import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.etl.CallbackBase;
 import org.apache.sqoop.job.etl.Destroyer;
-import org.apache.sqoop.job.etl.HdfsTextImportLoader;
-import org.apache.sqoop.job.etl.Importer;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.model.FormUtils;
 import org.apache.sqoop.model.MConnection;
@@ -57,47 +53,93 @@ import java.util.ResourceBundle;
 /**
  * Manager for Sqoop framework itself.
  *
- * All Sqoop internals (job execution engine, metadata) should be handled
- * within this manager.
+ * All Sqoop internals are handled in this class:
+ * * Submission engine
+ * * Execution engine
+ * * Framework metadata
  *
  * Current implementation of entire submission engine is using repository
- * for keep of current track, so that server might be restarted at any time
- * without any affect on running jobs. This approach however might not be the
- * fastest way and we might want to introduce internal structures with running
- * jobs in case that this approach will be too slow.
+ * for keeping track of running submissions. Thus, server might be restarted at
+ * any time without any affect on running jobs. This approach however might not
+ * be the fastest way and we might want to introduce internal structures for
+ * running jobs in case that this approach will be too slow.
  */
 public final class FrameworkManager {
 
   private static final Logger LOG = Logger.getLogger(FrameworkManager.class);
 
+  /**
+   * Default interval for purging old submissions from repository.
+   */
   private static final long DEFAULT_PURGE_THRESHOLD = 24*60*60*1000;
 
+  /**
+   * Default sleep interval for purge thread.
+   */
   private static final long DEFAULT_PURGE_SLEEP = 24*60*60*1000;
 
+  /**
+   * Default interval for update thread.
+   */
   private static final long DEFAULT_UPDATE_SLEEP = 60*5*1000;
 
+  /**
+   * Framework metadata structures in MForm format
+   */
   private static MFramework mFramework;
 
+  /**
+   * Validator instance
+   */
   private static final Validator validator;
 
+  /**
+   * Configured submission engine instance
+   */
   private static SubmissionEngine submissionEngine;
 
+  /**
+   * Configured execution engine instance
+   */
+  private static ExecutionEngine executionEngine;
+
+  /**
+   * Purge thread that will periodically remove old submissions from repository.
+   */
   private static PurgeThread purgeThread = null;
 
+  /**
+   * Update thread that will periodically check status of running submissions.
+   */
   private static UpdateThread updateThread = null;
 
+  /**
+   * Synchronization variable between threads.
+   */
   private static boolean running = true;
 
+  /**
+   * Specifies how old submissions should be removed from repository.
+   */
   private static long purgeThreshold;
 
+  /**
+   * Number of milliseconds for purge thread to sleep.
+   */
   private static long purgeSleep;
 
+  /**
+   * Number of milliseconds for update thread to slepp.
+   */
   private static long updateSleep;
 
+  /**
+   * Mutex for creating new submissions. We're not allowing more then one
+   * running submission for one job.
+   */
   private static final Object submissionMutex = new Object();
 
   static {
-
     MConnectionForms connectionForms = new MConnectionForms(
       FormUtils.toForms(getConnectionConfigurationClass())
     );
@@ -123,22 +165,31 @@ public final class FrameworkManager {
     String submissionEngineClassName =
       context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
 
-    Class<?> submissionEngineClass =
-        ClassUtils.loadClass(submissionEngineClassName);
-
-    if (submissionEngineClass == null) {
+    submissionEngine = (SubmissionEngine) ClassUtils.instantiate(submissionEngineClassName);
+    if(submissionEngine == null) {
       throw new SqoopException(FrameworkError.FRAMEWORK_0001,
-          submissionEngineClassName);
+        submissionEngineClassName);
     }
 
-    try {
-      submissionEngine = (SubmissionEngine)submissionEngineClass.newInstance();
-    } catch (Exception ex) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0001,
-          submissionEngineClassName, ex);
+    submissionEngine.initialize(context, FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
+
+    // Execution engine
+    String executionEngineClassName =
+      context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
+
+    executionEngine = (ExecutionEngine) ClassUtils.instantiate(executionEngineClassName);
+    if(executionEngine == null) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0007,
+        executionEngineClassName);
     }
 
-    submissionEngine.initialize(context, FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
+    // We need to make sure that user has configured compatible combination of
+    // submission engine and execution engine
+    if(! submissionEngine.isExecutionEngineSupported(executionEngine.getClass())) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0008);
+    }
+
+    executionEngine.initialize(context, FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
 
     // Set up worker threads
     purgeThreshold = context.getLong(
@@ -161,7 +212,6 @@ public final class FrameworkManager {
     updateThread = new UpdateThread();
     updateThread.start();
 
-
     LOG.info("Submission manager initialized: OK");
   }
 
@@ -189,6 +239,10 @@ public final class FrameworkManager {
     if(submissionEngine != null) {
       submissionEngine.destroy();
     }
+
+    if(executionEngine != null) {
+      executionEngine.destroy();
+    }
   }
 
   public static Validator getValidator() {
@@ -253,22 +307,26 @@ public final class FrameworkManager {
 
     // Create request object
     MSubmission summary = new MSubmission(jobId);
-    SubmissionRequest request = new SubmissionRequest(summary, connector,
-      connectorConnection, connectorJob, frameworkConnection, frameworkJob);
+    SubmissionRequest request = executionEngine.createSubmissionRequest(
+      summary, connector,
+      connectorConnection, connectorJob,
+      frameworkConnection, frameworkJob);
     request.setJobName(job.getName());
 
     // Let's register all important jars
     // sqoop-common
-    request.addJar(ClassUtils.jarForClass(MapContext.class));
+    request.addJarForClass(MapContext.class);
     // sqoop-core
-    request.addJar(ClassUtils.jarForClass(FrameworkManager.class));
+    request.addJarForClass(FrameworkManager.class);
     // sqoop-spi
-    request.addJar(ClassUtils.jarForClass(SqoopConnector.class));
-    // particular connector in use
-    request.addJar(ClassUtils.jarForClass(connector.getClass()));
+    request.addJarForClass(SqoopConnector.class);
+    // Execution engine jar
+    request.addJarForClass(executionEngine.getClass());
+    // Connector in use
+    request.addJarForClass(connector.getClass());
 
     // Extra libraries that Sqoop code requires
-    request.addJar(ClassUtils.jarForClass(JSONValue.class));
+    request.addJarForClass(JSONValue.class);
 
     switch (job.getType()) {
       case IMPORT:
@@ -308,7 +366,7 @@ public final class FrameworkManager {
     // Bootstrap job from framework perspective
     switch (job.getType()) {
       case IMPORT:
-        bootstrapImportSubmission(request);
+        prepareImportSubmission(request);
         break;
       case EXPORT:
         // TODO(jarcec): Implement export path
@@ -342,22 +400,14 @@ public final class FrameworkManager {
     return summary;
   }
 
-  private static void bootstrapImportSubmission(SubmissionRequest request) {
-    Importer importer = (Importer)request.getConnectorCallbacks();
+  private static void prepareImportSubmission(SubmissionRequest request) {
     ImportJobConfiguration jobConfiguration = (ImportJobConfiguration) request.getConfigFrameworkJob();
 
     // Initialize the map-reduce part (all sort of required classes, ...)
     request.setOutputDirectory(jobConfiguration.outputDirectory);
 
-    // Defaults for classes are mostly fine for now.
-
-
-    // Set up framework context
-    MutableMapContext context = request.getFrameworkContext();
-    context.setString(JobConstants.JOB_ETL_PARTITIONER, importer.getPartitioner().getName());
-    context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName());
-    context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName());
-    context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+    // Delegate rest of the job to execution engine
+    executionEngine.prepareImportSubmission(request);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
index f4ad3f5..71e4ec9 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
@@ -42,6 +42,15 @@ public abstract class SubmissionEngine {
   }
 
   /**
+   * Callback to verify that configured submission engine and execution engine
+   * are compatible.
+   *
+   * @param executionEngineClass Configured execution class.
+   * @return True if such execution engine is supported
+   */
+  public abstract boolean isExecutionEngineSupported(Class executionEngineClass);
+
+  /**
    * Submit new job to remote (hadoop) cluster. This method *must* fill
    * submission.getSummary.setExternalId(), otherwise Sqoop framework won't
    * be able to track progress on this job!

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
index 27b0566..c70a5cc 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
@@ -17,15 +17,11 @@
  */
 package org.apache.sqoop.framework;
 
-import org.apache.hadoop.io.NullWritable;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.job.etl.CallbackBase;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
-import org.apache.sqoop.job.mr.SqoopInputFormat;
-import org.apache.sqoop.job.mr.SqoopMapper;
 import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.utils.ClassUtils;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -85,20 +81,6 @@ public class SubmissionRequest {
    */
   String outputDirectory;
 
-  /**
-   * Map-reduce specific options.
-   *
-   * I'm using strings so that this class won't have direct dependency on
-   * hadoop libraries.
-   */
-  Class inputFormatClass;
-  Class mapperClass;
-  Class mapOutputKeyClass;
-  Class mapOutputValueClass;
-  Class outputFormatClass;
-  Class outputKeyClass;
-  Class outputValueClass;
-
 
   public SubmissionRequest(MSubmission submission,
                            SqoopConnector connector,
@@ -115,15 +97,6 @@ public class SubmissionRequest {
     this.configConnectorJob = configConnectorJob;
     this.configFrameworkConnection = configFrameworkConnection;
     this.configFrameworkJob = configFrameworkJob;
-
-    // TODO(Jarcec): Move this to job execution engine
-    this.inputFormatClass = SqoopInputFormat.class;
-    this.mapperClass = SqoopMapper.class;
-    this.mapOutputKeyClass = Data.class;
-    this.mapOutputValueClass = NullWritable.class;
-    this.outputFormatClass = SqoopFileOutputFormat.class;
-    this.outputKeyClass = Data.class;
-    this.outputValueClass = NullWritable.class;
   }
 
   public MSubmission getSummary() {
@@ -150,6 +123,10 @@ public class SubmissionRequest {
     jars.add(jar);
   }
 
+  public void addJarForClass(Class klass) {
+    jars.add(ClassUtils.jarForClass(klass));
+  }
+
   public void addJars(List<String> jars) {
     this.jars.addAll(jars);
   }
@@ -193,31 +170,4 @@ public class SubmissionRequest {
   public void setOutputDirectory(String outputDirectory) {
     this.outputDirectory = outputDirectory;
   }
-  public Class getInputFormatClass() {
-    return inputFormatClass;
-  }
-
-  public Class getMapperClass() {
-    return mapperClass;
-  }
-
-  public Class getMapOutputKeyClass() {
-    return mapOutputKeyClass;
-  }
-
-  public Class getMapOutputValueClass() {
-    return mapOutputValueClass;
-  }
-
-  public Class getOutputFormatClass() {
-    return outputFormatClass;
-  }
-
-  public Class getOutputKeyClass() {
-    return outputKeyClass;
-  }
-
-  public Class getOutputValueClass() {
-    return outputValueClass;
-  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/JobConstants.java b/core/src/main/java/org/apache/sqoop/job/JobConstants.java
deleted file mode 100644
index 19ac91e..0000000
--- a/core/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import org.apache.sqoop.core.ConfigurationConstants;
-
-public final class JobConstants extends Constants {
-  /**
-   * All job related configuration is prefixed with this:
-   * <tt>org.apache.sqoop.job.</tt>
-   */
-  public static final String PREFIX_JOB_CONFIG =
-      ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job.";
-
-
-  public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG
-      + "etl.partitioner";
-
-  public static final String JOB_ETL_EXTRACTOR = PREFIX_JOB_CONFIG
-      + "etl.extractor";
-
-  public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG
-      + "etl.loader";
-
-  public static final String JOB_ETL_DESTROYER = PREFIX_JOB_CONFIG
-      + "etl.destroyer";
-
-
-  public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG
-      + "mr.output.file";
-
-  public static final String JOB_MR_OUTPUT_CODEC = PREFIX_JOB_CONFIG
-      + "mr.output.codec";
-
-
-  public static final String JOB_CONFIG_CLASS_CONNECTOR_CONNECTION =
-    PREFIX_JOB_CONFIG + "config.class.connector.connection";
-
-  public static final String JOB_CONFIG_CLASS_CONNECTOR_JOB =
-    PREFIX_JOB_CONFIG + "config.class.connector.job";
-
-  public static final String JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION =
-    PREFIX_JOB_CONFIG + "config.class.framework.connection";
-
-  public static final String JOB_CONFIG_CLASS_FRAMEWORK_JOB =
-    PREFIX_JOB_CONFIG + "config.class.framework.job";
-
-  public static final String JOB_CONFIG_CONNECTOR_CONNECTION =
-    PREFIX_JOB_CONFIG + "config.connector.connection";
-
-  public static final String JOB_CONFIG_CONNECTOR_JOB =
-    PREFIX_JOB_CONFIG + "config.connector.job";
-
-  public static final String JOB_CONFIG_FRAMEWORK_CONNECTION =
-    PREFIX_JOB_CONFIG + "config.framework.connection";
-
-  public static final String JOB_CONFIG_FRAMEWORK_JOB =
-    PREFIX_JOB_CONFIG + "config.framework.job";
-
-  public static final String PREFIX_CONNECTOR_CONTEXT =
-    PREFIX_JOB_CONFIG + "connector.context.";
-
-
-  private JobConstants() {
-    // Disable explicit object creation
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/PrefixContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/PrefixContext.java b/core/src/main/java/org/apache/sqoop/job/PrefixContext.java
deleted file mode 100644
index 5488b46..0000000
--- a/core/src/main/java/org/apache/sqoop/job/PrefixContext.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sqoop.common.ImmutableContext;
-
-/**
- * Implementation of immutable context that is based on Hadoop configuration
- * object. Each context property is prefixed with special prefix and loaded
- * directly.
- */
-public class PrefixContext implements ImmutableContext {
-
-  Configuration configuration;
-  String prefix;
-
-  public PrefixContext(Configuration configuration, String prefix) {
-    this.configuration = configuration;
-    this.prefix = prefix;
-  }
-
-  @Override
-  public String getString(String key) {
-    return configuration.get(prefix + key);
-  }
-
-  @Override
-  public String getString(String key, String defaultValue) {
-    return configuration.get(prefix + key, defaultValue);
-  }
-
-  @Override
-  public long getLong(String key, long defaultValue) {
-    return configuration.getLong(prefix + key, defaultValue);
-  }
-
-  @Override
-  public int getInt(String key, int defaultValue) {
-    return  configuration.getInt(prefix + key, defaultValue);
-  }
-
-  @Override
-  public boolean getBoolean(String key, boolean defaultValue) {
-    return configuration.getBoolean(prefix + key, defaultValue);
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
deleted file mode 100644
index 1235d1d..0000000
--- a/core/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.sqoop.common.ImmutableContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.core.CoreError;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataReader;
-import org.apache.sqoop.utils.ClassUtils;
-
-public class HdfsSequenceImportLoader extends Loader {
-
-  public static final String EXTENSION = ".seq";
-
-  private final char fieldDelimiter;
-
-  public HdfsSequenceImportLoader() {
-    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
-  }
-
-  @Override
-  public void run(ImmutableContext context, DataReader reader) {
-    reader.setFieldDelimiter(fieldDelimiter);
-
-    Configuration conf = new Configuration();
-//    Configuration conf = ((EtlContext)context).getConfiguration();
-    String filename =
-        context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
-    String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
-
-    CompressionCodec codec = null;
-    if (codecname != null) {
-      Class<?> clz = ClassUtils.loadClass(codecname);
-      if (clz == null) {
-        throw new SqoopException(CoreError.CORE_0009, codecname);
-      }
-
-      try {
-        codec = (CompressionCodec) clz.newInstance();
-        if (codec instanceof Configurable) {
-          ((Configurable) codec).setConf(conf);
-        }
-      } catch (Exception e) {
-        throw new SqoopException(CoreError.CORE_0010, codecname, e);
-      }
-    }
-
-    filename += EXTENSION;
-
-    try {
-      Path filepath = new Path(filename);
-      SequenceFile.Writer filewriter;
-      if (codec != null) {
-        filewriter = SequenceFile.createWriter(conf,
-            SequenceFile.Writer.file(filepath),
-            SequenceFile.Writer.keyClass(Text.class),
-            SequenceFile.Writer.valueClass(NullWritable.class),
-            SequenceFile.Writer.compression(CompressionType.BLOCK, codec));
-      } else {
-        filewriter = SequenceFile.createWriter(conf,
-          SequenceFile.Writer.file(filepath),
-          SequenceFile.Writer.keyClass(Text.class),
-          SequenceFile.Writer.valueClass(NullWritable.class),
-          SequenceFile.Writer.compression(CompressionType.NONE));
-      }
-
-      String csv;
-      Text text = new Text();
-      while ((csv = reader.readCsvRecord()) != null) {
-        text.set(csv);
-        filewriter.append(text, NullWritable.get());
-      }
-      filewriter.close();
-
-    } catch (IOException e) {
-      throw new SqoopException(CoreError.CORE_0018, e);
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
deleted file mode 100644
index 36aa11f..0000000
--- a/core/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.sqoop.common.ImmutableContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.core.CoreError;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataReader;
-import org.apache.sqoop.utils.ClassUtils;
-
-public class HdfsTextImportLoader extends Loader {
-
-  private final char fieldDelimiter;
-  private final char recordDelimiter;
-
-  public HdfsTextImportLoader() {
-    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
-    recordDelimiter = Data.DEFAULT_RECORD_DELIMITER;
-  }
-
-  @Override
-  public void run(ImmutableContext context, DataReader reader) {
-    reader.setFieldDelimiter(fieldDelimiter);
-
-    Configuration conf = new Configuration();
-//    Configuration conf = ((EtlContext)context).getConfiguration();
-    String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
-    String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
-
-    CompressionCodec codec = null;
-    if (codecname != null) {
-      Class<?> clz = ClassUtils.loadClass(codecname);
-      if (clz == null) {
-        throw new SqoopException(CoreError.CORE_0009, codecname);
-      }
-
-      try {
-        codec = (CompressionCodec) clz.newInstance();
-        if (codec instanceof Configurable) {
-          ((Configurable) codec).setConf(conf);
-        }
-      } catch (Exception e) {
-        throw new SqoopException(CoreError.CORE_0010, codecname, e);
-      }
-
-      filename += codec.getDefaultExtension();
-    }
-
-    try {
-      Path filepath = new Path(filename);
-      FileSystem fs = filepath.getFileSystem(conf);
-
-      BufferedWriter filewriter;
-      DataOutputStream filestream = fs.create(filepath, false);
-      if (codec != null) {
-        filewriter = new BufferedWriter(new OutputStreamWriter(
-            codec.createOutputStream(filestream, codec.createCompressor()),
-            Data.CHARSET_NAME));
-      } else {
-        filewriter = new BufferedWriter(new OutputStreamWriter(
-            filestream, Data.CHARSET_NAME));
-      }
-
-      String csv;
-      while ((csv = reader.readCsvRecord()) != null) {
-        filewriter.write(csv + recordDelimiter);
-      }
-      filewriter.close();
-
-    } catch (IOException e) {
-      throw new SqoopException(CoreError.CORE_0018, e);
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/io/Data.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/io/Data.java b/core/src/main/java/org/apache/sqoop/job/io/Data.java
deleted file mode 100644
index 4ddd132..0000000
--- a/core/src/main/java/org/apache/sqoop/job/io/Data.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.regex.Matcher;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.core.CoreError;
-
-public class Data implements WritableComparable<Data> {
-
-  // The content is an Object to accommodate different kinds of data.
-  // For example, it can be:
-  // - Object[] for an array of object record
-  // - String for a text of CSV record
-  private Object content = null;
-
-  public static final int EMPTY_DATA = 0;
-  public static final int CSV_RECORD = 1;
-  public static final int ARRAY_RECORD = 2;
-  private int type = EMPTY_DATA;
-
-  public static final String CHARSET_NAME = "UTF-8";
-
-  public static final char DEFAULT_RECORD_DELIMITER = '\n';
-  public static final char DEFAULT_FIELD_DELIMITER = ',';
-  public static final char DEFAULT_STRING_DELIMITER = '\'';
-  public static final char DEFAULT_STRING_ESCAPE = '\\';
-  private char fieldDelimiter = DEFAULT_FIELD_DELIMITER;
-  private char stringDelimiter = DEFAULT_STRING_DELIMITER;
-  private char stringEscape = DEFAULT_STRING_ESCAPE;
-  private String escapedStringDelimiter = String.valueOf(new char[] {
-      stringEscape, stringDelimiter
-  });
-
-  public void setFieldDelimiter(char fieldDelimiter) {
-    this.fieldDelimiter = fieldDelimiter;
-  }
-
-  public void setContent(Object content, int type) {
-    switch (type) {
-    case EMPTY_DATA:
-    case CSV_RECORD:
-    case ARRAY_RECORD:
-      this.type = type;
-      this.content = content;
-      break;
-    default:
-      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
-    }
-  }
-
-  public Object getContent(int targetType) {
-    switch (targetType) {
-    case CSV_RECORD:
-      return format();
-    case ARRAY_RECORD:
-      return parse();
-    default:
-      throw new SqoopException(CoreError.CORE_0012, String.valueOf(targetType));
-    }
-  }
-
-  public int getType() {
-    return type;
-  }
-
-  public boolean isEmpty() {
-    return (type == EMPTY_DATA);
-  }
-
-  @Override
-  public String toString() {
-    return (String)getContent(CSV_RECORD);
-  }
-
-  @Override
-  public int compareTo(Data other) {
-    byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME));
-    byte[] otherBytes = other.toString().getBytes(
-        Charset.forName(CHARSET_NAME));
-    return WritableComparator.compareBytes(
-        myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (!(other instanceof Data)) {
-      return false;
-    }
-
-    Data data = (Data)other;
-    if (type != data.getType()) {
-      return false;
-    }
-
-    return toString().equals(data.toString());
-  }
-
-  @Override
-  public int hashCode() {
-    int result = super.hashCode();
-    switch (type) {
-    case CSV_RECORD:
-      result += 31 * content.hashCode();
-      return result;
-    case ARRAY_RECORD:
-      Object[] array = (Object[])content;
-      for (int i = 0; i < array.length; i++) {
-        result += 31 * array[i].hashCode();
-      }
-      return result;
-    default:
-      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    type = readType(in);
-    switch (type) {
-    case CSV_RECORD:
-      readCsv(in);
-      break;
-    case ARRAY_RECORD:
-      readArray(in);
-      break;
-    default:
-      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    writeType(out, type);
-    switch (type) {
-    case CSV_RECORD:
-      writeCsv(out);
-      break;
-    case ARRAY_RECORD:
-      writeArray(out);
-      break;
-    default:
-      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
-    }
-  }
-
-  private int readType(DataInput in) throws IOException {
-    return WritableUtils.readVInt(in);
-  }
-
-  private void writeType(DataOutput out, int type) throws IOException {
-    WritableUtils.writeVInt(out, type);
-  }
-
-  private void readCsv(DataInput in) throws IOException {
-    content = in.readUTF();
-  }
-
-  private void writeCsv(DataOutput out) throws IOException {
-    out.writeUTF((String)content);
-  }
-
-  private void readArray(DataInput in) throws IOException {
-    // read number of columns
-    int columns = in.readInt();
-    content = new Object[columns];
-    Object[] array = (Object[])content;
-    // read each column
-    for (int i = 0; i < array.length; i++) {
-      int type = readType(in);
-      switch (type) {
-      case FieldTypes.UTF:
-        array[i] = in.readUTF();
-        break;
-
-      case FieldTypes.BIN:
-        int length = in.readInt();
-        byte[] bytes = new byte[length];
-        in.readFully(bytes);
-        array[i] = bytes;
-        break;
-
-      case FieldTypes.DOUBLE:
-        array[i] = in.readDouble();
-        break;
-
-      case FieldTypes.FLOAT:
-        array[i] = in.readFloat();
-        break;
-
-      case FieldTypes.LONG:
-        array[i] = in.readLong();
-        break;
-
-      case FieldTypes.INT:
-        array[i] = in.readInt();
-        break;
-
-      case FieldTypes.SHORT:
-        array[i] = in.readShort();
-        break;
-
-      case FieldTypes.CHAR:
-        array[i] = in.readChar();
-        break;
-
-      case FieldTypes.BYTE:
-        array[i] = in.readByte();
-        break;
-
-      case FieldTypes.BOOLEAN:
-        array[i] = in.readBoolean();
-        break;
-
-      case FieldTypes.NULL:
-        array[i] = null;
-        break;
-
-      default:
-        throw new IOException(
-          new SqoopException(CoreError.CORE_0012, Integer.toString(type))
-        );
-      }
-    }
-  }
-
-  private void writeArray(DataOutput out) throws IOException {
-    Object[] array = (Object[])content;
-    // write number of columns
-    out.writeInt(array.length);
-    // write each column
-    for (int i = 0; i < array.length; i++) {
-      if (array[i] instanceof String) {
-        writeType(out, FieldTypes.UTF);
-        out.writeUTF((String)array[i]);
-
-      } else if (array[i] instanceof byte[]) {
-        writeType(out, FieldTypes.BIN);
-        out.writeInt(((byte[])array[i]).length);
-        out.write((byte[])array[i]);
-
-      } else if (array[i] instanceof Double) {
-        writeType(out, FieldTypes.DOUBLE);
-        out.writeDouble((Double)array[i]);
-
-      } else if (array[i] instanceof Float) {
-        writeType(out, FieldTypes.FLOAT);
-        out.writeFloat((Float)array[i]);
-
-      } else if (array[i] instanceof Long) {
-        writeType(out, FieldTypes.LONG);
-        out.writeLong((Long)array[i]);
-
-      } else if (array[i] instanceof Integer) {
-        writeType(out, FieldTypes.INT);
-        out.writeInt((Integer)array[i]);
-
-      } else if (array[i] instanceof Short) {
-        writeType(out, FieldTypes.SHORT);
-        out.writeShort((Short)array[i]);
-
-      } else if (array[i] instanceof Character) {
-        writeType(out, FieldTypes.CHAR);
-        out.writeChar((Character)array[i]);
-
-      } else if (array[i] instanceof Byte) {
-        writeType(out, FieldTypes.BYTE);
-        out.writeByte((Byte)array[i]);
-
-      } else if (array[i] instanceof Boolean) {
-        writeType(out, FieldTypes.BOOLEAN);
-        out.writeBoolean((Boolean)array[i]);
-
-      } else if (array[i] == null) {
-        writeType(out, FieldTypes.NULL);
-
-      } else {
-        throw new IOException(
-          new SqoopException(
-              CoreError.CORE_0012, array[i].getClass().getName()
-          )
-        );
-      }
-    }
-  }
-
-  private String format() {
-    switch (type) {
-    case EMPTY_DATA:
-      return null;
-
-    case CSV_RECORD:
-      if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) {
-        return (String)content;
-      } else {
-        // TODO: need to exclude the case where comma is part of a string.
-        return ((String)content).replaceAll(
-            String.valueOf(DEFAULT_FIELD_DELIMITER),
-            String.valueOf(fieldDelimiter));
-      }
-
-    case ARRAY_RECORD:
-      StringBuilder sb = new StringBuilder();
-      Object[] array = (Object[])content;
-      for (int i = 0; i < array.length; i++) {
-        if (i != 0) {
-          sb.append(fieldDelimiter);
-        }
-
-        if (array[i] instanceof String) {
-          sb.append(stringDelimiter);
-          sb.append(escape((String)array[i]));
-          sb.append(stringDelimiter);
-        } else if (array[i] instanceof byte[]) {
-          sb.append(Arrays.toString((byte[])array[i]));
-        } else {
-          sb.append(String.valueOf(array[i]));
-        }
-      }
-      return sb.toString();
-
-    default:
-      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
-    }
-  }
-
-  private Object[] parse() {
-    switch (type) {
-    case EMPTY_DATA:
-      return null;
-
-    case CSV_RECORD:
-      ArrayList<Object> list = new ArrayList<Object>();
-      // todo: need to parse CSV into Array
-      return list.toArray();
-
-    case ARRAY_RECORD:
-      return (Object[])content;
-
-    default:
-      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
-    }
-  }
-
-  private String escape(String string) {
-    // TODO: Also need to escape those special characters as documented in:
-    // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
-    String regex = String.valueOf(stringDelimiter);
-    String replacement = Matcher.quoteReplacement(escapedStringDelimiter);
-    return string.replaceAll(regex, replacement);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java b/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
deleted file mode 100644
index e96dc6e..0000000
--- a/core/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.io;
-
-public final class FieldTypes {
-
-  public static final int NULL    = 0;
-
-  public static final int BOOLEAN = 1;
-
-  public static final int BYTE    = 10;
-  public static final int CHAR    = 11;
-
-  public static final int SHORT   = 20;
-  public static final int INT     = 21;
-  public static final int LONG    = 22;
-
-  public static final int FLOAT   = 50;
-  public static final int DOUBLE  = 51;
-
-  public static final int BIN     = 100;
-  public static final int UTF     = 101;
-
-  private FieldTypes() {
-    // Disable explicit object creation
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/core/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
deleted file mode 100644
index 59baaf6..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.model.FormUtils;
-import org.apache.sqoop.utils.ClassUtils;
-
-/**
- * Helper class to load configuration specific objects from job configuration
- */
-public final class ConfigurationUtils {
-
-  public static Object getConnectorConnection(Configuration configuration) {
-    return loadConfiguration(configuration,
-      JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION,
-      JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION);
-  }
-
-  public static Object getConnectorJob(Configuration configuration) {
-    return loadConfiguration(configuration,
-      JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB,
-      JobConstants.JOB_CONFIG_CONNECTOR_JOB);
-  }
-
-  public static Object getFrameworkConnection(Configuration configuration) {
-    return loadConfiguration(configuration,
-      JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION,
-      JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION);
-  }
-
-  public static Object getFrameworkJob(Configuration configuration) {
-    return loadConfiguration(configuration,
-      JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
-      JobConstants.JOB_CONFIG_FRAMEWORK_JOB);
-  }
-
-  private static Object loadConfiguration(Configuration configuration,
-                                          String classProperty,
-                                          String valueProperty) {
-    Object object = ClassUtils.instantiate(configuration.get(classProperty));
-    FormUtils.fillValues(configuration.get(valueProperty), object);
-    return object;
-  }
-
-  private ConfigurationUtils() {
-    // Instantiation is prohibited
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
deleted file mode 100644
index c465f10..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.job.mr;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.io.Data;
-
-/**
- * An output format for MapReduce job.
- */
-public class SqoopFileOutputFormat
-    extends FileOutputFormat<Data, NullWritable> {
-
-  public static final Log LOG =
-      LogFactory.getLog(SqoopFileOutputFormat.class.getName());
-
-  public static final Class<? extends CompressionCodec> DEFAULT_CODEC =
-      DefaultCodec.class;
-
-  @Override
-  public RecordWriter<Data, NullWritable> getRecordWriter(
-      TaskAttemptContext context) throws IOException {
-    Configuration conf = context.getConfiguration();
-
-    Path filepath = getDefaultWorkFile(context, "");
-    String filename = filepath.toString();
-    conf.set(JobConstants.JOB_MR_OUTPUT_FILE, filename);
-
-    boolean isCompressed = getCompressOutput(context);
-    if (isCompressed) {
-      String codecname =
-          conf.get(FileOutputFormat.COMPRESS_CODEC, DEFAULT_CODEC.getName());
-      conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname);
-    }
-
-    SqoopOutputFormatLoadExecutor executor =
-        new SqoopOutputFormatLoadExecutor(context);
-    return executor.getRecordWriter();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
deleted file mode 100644
index 8fcdc99..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.PrefixContext;
-import org.apache.sqoop.job.etl.Partition;
-import org.apache.sqoop.job.etl.Partitioner;
-import org.apache.sqoop.utils.ClassUtils;
-
-/**
- * An InputFormat for MapReduce job.
- */
-public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
-
-  public static final Log LOG =
-      LogFactory.getLog(SqoopInputFormat.class.getName());
-
-  @Override
-  public RecordReader<SqoopSplit, NullWritable> createRecordReader(
-      InputSplit split, TaskAttemptContext context) {
-    return new SqoopRecordReader();
-  }
-
-  @Override
-  public List<InputSplit> getSplits(JobContext context)
-      throws IOException, InterruptedException {
-    Configuration conf = context.getConfiguration();
-
-    String partitionerName = conf.get(JobConstants.JOB_ETL_PARTITIONER);
-    Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName);
-
-    PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
-    Object connectorConnection = ConfigurationUtils.getConnectorConnection(conf);
-    Object connectorJob = ConfigurationUtils.getConnectorJob(conf);
-
-    List<Partition> partitions = partitioner.getPartitions(connectorContext, connectorConnection, connectorJob);
-    List<InputSplit> splits = new LinkedList<InputSplit>();
-    for (Partition partition : partitions) {
-      LOG.debug("Partition: " + partition);
-      SqoopSplit split = new SqoopSplit();
-      split.setPartition(partition);
-      splits.add(split);
-    }
-
-    return splits;
-  }
-
-  public static class SqoopRecordReader
-      extends RecordReader<SqoopSplit, NullWritable> {
-
-    private boolean delivered = false;
-    private SqoopSplit split = null;
-
-    @Override
-    public boolean nextKeyValue() {
-      if (delivered) {
-        return false;
-      } else {
-        delivered = true;
-        return true;
-      }
-    }
-
-    @Override
-    public SqoopSplit getCurrentKey() {
-      return split;
-    }
-
-    @Override
-    public NullWritable getCurrentValue() {
-      return NullWritable.get();
-    }
-
-    @Override
-    public void close() {
-    }
-
-    @Override
-    public float getProgress() {
-      return delivered ? 1.0f : 0.0f;
-    }
-
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext context) {
-      this.split = (SqoopSplit)split;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
deleted file mode 100644
index 6892b4b..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.core.CoreError;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.PrefixContext;
-import org.apache.sqoop.job.etl.Extractor;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataWriter;
-import org.apache.sqoop.utils.ClassUtils;
-
-/**
- * A mapper to perform map function.
- */
-public class SqoopMapper
-    extends Mapper<SqoopSplit, NullWritable, Data, NullWritable> {
-
-  public static final Log LOG =
-      LogFactory.getLog(SqoopMapper.class.getName());
-
-  @Override
-  public void run(Context context) throws IOException, InterruptedException {
-    Configuration conf = context.getConfiguration();
-
-    String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
-    Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
-
-    PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
-    Object connectorConnection = ConfigurationUtils.getConnectorConnection(conf);
-    Object connectorJob = ConfigurationUtils.getConnectorJob(conf);
-
-    SqoopSplit split = context.getCurrentKey();
-
-    try {
-      extractor.run(connectorContext, connectorConnection, connectorJob, split.getPartition(),
-        new MapDataWriter(context));
-
-    } catch (Exception e) {
-      throw new SqoopException(CoreError.CORE_0017, e);
-    }
-  }
-
-  public class MapDataWriter extends DataWriter {
-    private Context context;
-    private Data data;
-
-    public MapDataWriter(Context context) {
-      this.context = context;
-    }
-
-    @Override
-    public void setFieldDelimiter(char fieldDelimiter) {
-      if (data == null) {
-        data = new Data();
-      }
-
-      data.setFieldDelimiter(fieldDelimiter);
-    }
-
-    @Override
-    public void writeArrayRecord(Object[] array) {
-      writeContent(array, Data.ARRAY_RECORD);
-    }
-
-    @Override
-    public void writeCsvRecord(String csv) {
-      writeContent(csv, Data.CSV_RECORD);
-    }
-
-    @Override
-    public void writeContent(Object content, int type) {
-      if (data == null) {
-        data = new Data();
-      }
-
-      data.setContent(content, type);
-      try {
-        context.write(data, NullWritable.get());
-      } catch (Exception e) {
-        throw new SqoopException(CoreError.CORE_0013, e);
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
deleted file mode 100644
index 1242f90..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.job.mr;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.sqoop.job.io.Data;
-
-/**
- * An output format for MapReduce job.
- */
-public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable> {
-
-  public static final Log LOG =
-      LogFactory.getLog(SqoopNullOutputFormat.class.getName());
-
-  @Override
-  public void checkOutputSpecs(JobContext context) {
-    // do nothing
-  }
-
-  @Override
-  public RecordWriter<Data, NullWritable> getRecordWriter(
-      TaskAttemptContext context) {
-    SqoopOutputFormatLoadExecutor executor =
-        new SqoopOutputFormatLoadExecutor(context);
-    return executor.getRecordWriter();
-  }
-
-  @Override
-  public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
-    // return an output committer that does nothing
-    return new NullOutputCommitter();
-  }
-
-  class NullOutputCommitter extends OutputCommitter {
-    @Override
-    public void setupJob(JobContext jobContext) { }
-
-    @Override
-    public void setupTask(TaskAttemptContext taskContext) { }
-
-    @Override
-    public void commitTask(TaskAttemptContext taskContext) { }
-
-    @Override
-    public void abortTask(TaskAttemptContext taskContext) { }
-
-    @Override
-    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
-      return false;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
deleted file mode 100644
index 96e1533..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.sqoop.job.mr;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.core.CoreError;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.PrefixContext;
-import org.apache.sqoop.job.etl.Loader;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataReader;
-import org.apache.sqoop.utils.ClassUtils;
-
-public class SqoopOutputFormatLoadExecutor {
-
-  public static final Log LOG =
-      LogFactory.getLog(SqoopOutputFormatLoadExecutor.class.getName());
-
-  private boolean readerFinished;
-  private boolean writerFinished;
-  private Data data;
-  private JobContext context;
-  private SqoopRecordWriter producer;
-  private ConsumerThread consumer;
-
-  public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
-    data = new Data();
-    context = jobctx;
-    producer = new SqoopRecordWriter();
-    consumer = new ConsumerThread();
-  }
-
-  public RecordWriter<Data, NullWritable> getRecordWriter() {
-    consumer.setDaemon(true);
-    consumer.start();
-    return producer;
-  }
-
-  public class SqoopRecordWriter extends RecordWriter<Data, NullWritable> {
-    @Override
-    public void write(Data key, NullWritable value) {
-      synchronized (data) {
-        if (readerFinished) {
-          consumer.checkException();
-          return;
-        }
-
-        try {
-          if (!data.isEmpty()) {
-            // wait for reader to consume data
-            data.wait();
-          }
-
-          int type = key.getType();
-          data.setContent(key.getContent(type), type);
-
-          // notify reader that the data is ready
-          data.notify();
-
-        } catch (InterruptedException e) {
-          // inform reader that writer is finished
-          writerFinished = true;
-
-          // unlock reader so it can continue
-          data.notify();
-
-          // throw exception
-          throw new SqoopException(CoreError.CORE_0015, e);
-        }
-      }
-    }
-
-    @Override
-    public void close(TaskAttemptContext context) {
-      synchronized (data) {
-        if (readerFinished) {
-          consumer.checkException();
-          return;
-        }
-
-        try {
-          if (!data.isEmpty()) {
-            // wait for reader to consume data
-            data.wait();
-          }
-
-          writerFinished = true;
-
-          data.notify();
-
-        } catch (InterruptedException e) {
-          // inform reader that writer is finished
-          writerFinished = true;
-
-          // unlock reader so it can continue
-          data.notify();
-
-          // throw exception
-          throw new SqoopException(CoreError.CORE_0015, e);
-        }
-      }
-    }
-  }
-
-  public class OutputFormatDataReader extends DataReader {
-    @Override
-    public void setFieldDelimiter(char fieldDelimiter) {
-      data.setFieldDelimiter(fieldDelimiter);
-    }
-
-    @Override
-    public Object[] readArrayRecord() {
-      return (Object[])readContent(Data.ARRAY_RECORD);
-    }
-
-    @Override
-    public String readCsvRecord() {
-      return (String)readContent(Data.CSV_RECORD);
-    }
-
-    @Override
-    public Object readContent(int type) {
-      synchronized (data) {
-        if (writerFinished) {
-          return null;
-        }
-
-        try {
-          if (data.isEmpty()) {
-            // wait for writer to produce data
-            data.wait();
-          }
-
-          Object content = data.getContent(type);
-          data.setContent(null, Data.EMPTY_DATA);
-
-          // notify writer that data is consumed
-          data.notify();
-
-          return content;
-
-        } catch (InterruptedException e) {
-          // inform writer that reader is finished
-          readerFinished = true;
-
-          // unlock writer so it can continue
-          data.notify();
-
-          // throw exception
-          throw new SqoopException(CoreError.CORE_0016, e);
-        }
-      }
-    }
-  }
-
-  public class ConsumerThread extends Thread {
-    private SqoopException exception = null;
-
-    public void checkException() {
-      if (exception != null) {
-        throw exception;
-      }
-    }
-
-    @Override
-    public void run() {
-      DataReader reader = new OutputFormatDataReader();
-
-      Configuration conf = context.getConfiguration();
-
-
-      String loaderName = conf.get(JobConstants.JOB_ETL_LOADER);
-      Loader loader = (Loader) ClassUtils.instantiate(loaderName);
-
-      // Get together framework context as configuration prefix by nothing
-      PrefixContext frameworkContext = new PrefixContext(conf, "");
-
-      try {
-        loader.run(frameworkContext, reader);
-      } catch (Throwable t) {
-        throw new SqoopException(CoreError.CORE_0018, t);
-      }
-
-      synchronized (data) {
-        // inform writer that reader is finished
-        readerFinished = true;
-
-        // unlock writer so it can continue
-        data.notify();
-
-        // if no exception happens yet
-        if (exception == null && !writerFinished) {
-          // create exception if data are not all consumed
-          exception = new SqoopException(CoreError.CORE_0019);
-        }
-
-        // throw deferred exception if exist
-        if (exception != null) {
-          throw exception;
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
deleted file mode 100644
index d236148..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.sqoop.job.io.Data;
-
-/**
- * A reducer to perform reduce function.
- */
-public class SqoopReducer
-    extends Reducer<Data, NullWritable, Data, NullWritable> {
-
-  public static final Log LOG =
-      LogFactory.getLog(SqoopReducer.class.getName());
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java b/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
deleted file mode 100644
index 7dc9541..0000000
--- a/core/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.core.CoreError;
-import org.apache.sqoop.job.etl.Partition;
-import org.apache.sqoop.utils.ClassUtils;
-
-/**
- * An input split to be read.
- */
-public class SqoopSplit extends InputSplit implements Writable {
-
-  private Partition partition;
-
-  public void setPartition(Partition partition) {
-    this.partition = partition;
-  }
-
-  public Partition getPartition() {
-    return partition;
-  }
-
-  @Override
-  public long getLength() throws IOException {
-    return 0;
-  }
-
-  @Override
-  public String[] getLocations() throws IOException {
-    return new String[] {};
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    // read Partition class name
-    String className = in.readUTF();
-    // instantiate Partition object
-    Class<?> clz = ClassUtils.loadClass(className);
-    if (clz == null) {
-      throw new SqoopException(CoreError.CORE_0009, className);
-    }
-    try {
-      partition = (Partition) clz.newInstance();
-    } catch (Exception e) {
-      throw new SqoopException(CoreError.CORE_0010, className, e);
-    }
-    // read Partition object content
-    partition.readFields(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    // write Partition class name
-    out.writeUTF(partition.getClass().getName());
-    // write Partition object content
-    partition.write(out);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/test/java/org/apache/sqoop/io/TestData.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/io/TestData.java b/core/src/test/java/org/apache/sqoop/io/TestData.java
deleted file mode 100644
index 9fe9d41..0000000
--- a/core/src/test/java/org/apache/sqoop/io/TestData.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.io;
-
-import java.util.Arrays;
-
-import junit.framework.TestCase;
-
-import org.apache.sqoop.job.io.Data;
-import org.junit.Test;
-
-public class TestData extends TestCase {
-
-  private static final double TEST_NUMBER = Math.PI + 100;
-  @Test
-  public void testArrayToCsv() throws Exception {
-    Data data = new Data();
-    String expected;
-    String actual;
-
-    // with special characters:
-    expected =
-        Long.valueOf((long)TEST_NUMBER) + "," +
-        Double.valueOf(TEST_NUMBER) + "," +
-        "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
-        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
-    data.setContent(new Object[] {
-        Long.valueOf((long)TEST_NUMBER),
-        Double.valueOf(TEST_NUMBER),
-        String.valueOf(TEST_NUMBER) + "',s",
-        new byte[] {1, 2, 3, 4, 5} },
-        Data.ARRAY_RECORD);
-    actual = (String)data.getContent(Data.CSV_RECORD);
-    assertEquals(expected, actual);
-
-    // with null characters:
-    expected =
-        Long.valueOf((long)TEST_NUMBER) + "," +
-        Double.valueOf(TEST_NUMBER) + "," +
-        "null" + "," +
-        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
-    data.setContent(new Object[] {
-        Long.valueOf((long)TEST_NUMBER),
-        Double.valueOf(TEST_NUMBER),
-        null,
-        new byte[] {1, 2, 3, 4, 5} },
-        Data.ARRAY_RECORD);
-    actual = (String)data.getContent(Data.CSV_RECORD);
-    assertEquals(expected, actual);
-  }
-
-  public static void assertEquals(Object expected, Object actual) {
-    if (expected instanceof byte[]) {
-      assertEquals(Arrays.toString((byte[])expected),
-          Arrays.toString((byte[])actual));
-    } else {
-      TestCase.assertEquals(expected, actual);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/06e054bc/core/src/test/java/org/apache/sqoop/job/FileUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/sqoop/job/FileUtils.java b/core/src/test/java/org/apache/sqoop/job/FileUtils.java
deleted file mode 100644
index e685883..0000000
--- a/core/src/test/java/org/apache/sqoop/job/FileUtils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class FileUtils {
-
-  public static boolean exists(String file) throws IOException {
-    Path path = new Path(file);
-    FileSystem fs = path.getFileSystem(new Configuration());
-    return fs.exists(path);
-  }
-
-  public static void delete(String file) throws IOException {
-    Path path = new Path(file);
-    FileSystem fs = path.getFileSystem(new Configuration());
-    if (fs.exists(path)) {
-      fs.delete(path, true);
-    }
-  }
-
-  public static void mkdirs(String directory) throws IOException {
-    Path path = new Path(directory);
-    FileSystem fs = path.getFileSystem(new Configuration());
-    if (!fs.exists(path)) {
-      fs.mkdirs(path);
-    }
-  }
-
-  public static InputStream open(String fileName)
-    throws IOException, ClassNotFoundException {
-    Path filepath = new Path(fileName);
-    FileSystem fs = filepath.getFileSystem(new Configuration());
-    return fs.open(filepath);
-  }
-
-  public static OutputStream create(String fileName) throws IOException {
-    Path filepath = new Path(fileName);
-    FileSystem fs = filepath.getFileSystem(new Configuration());
-    return fs.create(filepath, false);
-  }
-
-  private FileUtils() {
-    // Disable explicit object creation
-  }
-
-}