You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by hs...@apache.org on 2013/07/10 19:58:28 UTC

git commit: SQOOP-1121. Sqoop2: Serialize schema and make them available in the MR job.

Updated Branches:
  refs/heads/sqoop2 251c8334f -> 1b2441d21


SQOOP-1121. Sqoop2: Serialize schema and make them available in the MR job.

(Jarek Jarcec Cecho via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/1b2441d2
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/1b2441d2
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/1b2441d2

Branch: refs/heads/sqoop2
Commit: 1b2441d21de3eddfabb4091dac84b04f74f8e744
Parents: 251c833
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed Jul 10 10:57:54 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Wed Jul 10 10:57:54 2013 -0700

----------------------------------------------------------------------
 .../apache/sqoop/job/etl/DestroyerContext.java  |  15 +-
 .../apache/sqoop/job/etl/ExtractorContext.java  |  15 +-
 .../org/apache/sqoop/job/etl/LoaderContext.java |  16 +-
 .../sqoop/job/etl/PartitionerContext.java       |  15 +-
 .../sqoop/connector/jdbc/TestExportLoader.java  |   2 +-
 .../connector/jdbc/TestImportExtractor.java     |   4 +-
 .../connector/jdbc/TestImportPartitioner.java   |  36 ++--
 .../org/apache/sqoop/framework/JobManager.java  |   2 +-
 execution/mapreduce/pom.xml                     |   6 +
 .../java/org/apache/sqoop/job/JobConstants.java |  39 ----
 .../apache/sqoop/job/mr/ConfigurationUtils.java | 211 +++++++++++++++++--
 .../sqoop/job/mr/SqoopDestroyerExecutor.java    |  11 +-
 .../apache/sqoop/job/mr/SqoopInputFormat.java   |   8 +-
 .../org/apache/sqoop/job/mr/SqoopMapper.java    |  15 +-
 .../job/mr/SqoopOutputFormatLoadExecutor.java   |  16 +-
 .../org/apache/sqoop/job/TestHdfsExtract.java   |  12 +-
 .../java/org/apache/sqoop/job/TestHdfsLoad.java |  10 +-
 .../org/apache/sqoop/job/TestMapReduce.java     |   8 +-
 .../sqoop/job/mr/TestConfigurationUtils.java    | 180 ++++++++++++++++
 .../mr/TestSqoopOutputFormatLoadExecutor.java   |   7 +-
 .../mapreduce/MapreduceSubmissionEngine.java    |  30 +--
 21 files changed, 516 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java b/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
index 10cfb10..2f29de4 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
@@ -18,6 +18,7 @@
 package org.apache.sqoop.job.etl;
 
 import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.schema.Schema;
 
 /**
  * Context implementation for Destroyer.
@@ -28,9 +29,12 @@ public class DestroyerContext extends ActorContext {
 
   private boolean success;
 
-  public DestroyerContext(ImmutableContext context, boolean success) {
+  private Schema schema;
+
+  public DestroyerContext(ImmutableContext context, boolean success, Schema schema) {
     super(context);
     this.success = success;
+    this.schema = schema;
   }
 
   /**
@@ -41,4 +45,13 @@ public class DestroyerContext extends ActorContext {
   public boolean isSuccess() {
     return success;
   }
+
+  /**
+   * Return schema associated with this step.
+   *
+   * @return
+   */
+  public Schema getSchema() {
+    return schema;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
index f9d7a8b..af03f0a 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.job.etl;
 
 import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.schema.Schema;
 
 /**
  * Context implementation for Extractor.
@@ -29,9 +30,12 @@ public class ExtractorContext extends ActorContext {
 
   private DataWriter writer;
 
-  public ExtractorContext(ImmutableContext context, DataWriter writer) {
+  private Schema schema;
+
+  public ExtractorContext(ImmutableContext context, DataWriter writer, Schema schema) {
     super(context);
     this.writer = writer;
+    this.schema = schema;
   }
 
   /**
@@ -42,4 +46,13 @@ public class ExtractorContext extends ActorContext {
   public DataWriter getDataWriter() {
     return writer;
   }
+
+  /**
+   * Return schema associated with this step.
+   *
+   * @return
+   */
+  public Schema getSchema() {
+    return schema;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java b/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
index dad19f1..f2e6b97 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.job.etl;
 
 import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.schema.Schema;
 
 /**
  * Context implementation for Loader.
@@ -27,11 +28,14 @@ import org.apache.sqoop.etl.io.DataReader;
  */
 public class LoaderContext extends ActorContext {
 
-  DataReader reader;
+  private DataReader reader;
 
-  public LoaderContext(ImmutableContext context, DataReader reader) {
+  private Schema schema;
+
+  public LoaderContext(ImmutableContext context, DataReader reader, Schema schema) {
     super(context);
     this.reader = reader;
+    this.schema = schema;
   }
 
   /**
@@ -43,4 +47,12 @@ public class LoaderContext extends ActorContext {
     return reader;
   }
 
+  /**
+   * Return schema associated with this step.
+   *
+   * @return
+   */
+  public Schema getSchema() {
+    return schema;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java b/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
index 5e7cea7..e7daeee 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
@@ -18,6 +18,7 @@
 package org.apache.sqoop.job.etl;
 
 import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.schema.Schema;
 
 /**
  * Context implementation for Partitioner.
@@ -28,9 +29,12 @@ public class PartitionerContext extends ActorContext {
 
   private long maxPartitions;
 
-  public PartitionerContext(ImmutableContext context, long maxPartitions) {
+  private Schema schema;
+
+  public PartitionerContext(ImmutableContext context, long maxPartitions, Schema schema) {
     super(context);
     this.maxPartitions = maxPartitions;
+    this.schema = schema;
   }
 
   /**
@@ -44,4 +48,13 @@ public class PartitionerContext extends ActorContext {
   public long getMaxPartitions() {
     return maxPartitions;
   }
+
+  /**
+   * Return schema associated with this step.
+   *
+   * @return
+   */
+  public Schema getSchema() {
+    return schema;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
index 50a32d9..aa1c4ff 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
@@ -74,7 +74,7 @@ public class TestExportLoader extends TestCase {
 
     Loader loader = new GenericJdbcExportLoader();
     DummyReader reader = new DummyReader();
-    LoaderContext loaderContext = new LoaderContext(context, reader);
+    LoaderContext loaderContext = new LoaderContext(context, reader, null);
     loader.load(loaderContext, connectionConfig, jobConfig);
 
     int index = START;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
index 54ffe5b..a7ed6ba 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
@@ -81,7 +81,7 @@ public class TestImportExtractor extends TestCase {
 
     Extractor extractor = new GenericJdbcImportExtractor();
     DummyWriter writer = new DummyWriter();
-    ExtractorContext extractorContext = new ExtractorContext(context, writer);
+    ExtractorContext extractorContext = new ExtractorContext(context, writer, null);
 
     partition = new GenericJdbcImportPartition();
     partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665");
@@ -115,7 +115,7 @@ public class TestImportExtractor extends TestCase {
 
     Extractor extractor = new GenericJdbcImportExtractor();
     DummyWriter writer = new DummyWriter();
-    ExtractorContext extractorContext = new ExtractorContext(context, writer);
+    ExtractorContext extractorContext = new ExtractorContext(context, writer, null);
 
     partition = new GenericJdbcImportPartition();
     partition.setConditions("-50 <= ICOL AND ICOL < -16");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
index 0afec49..7ecc900 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
@@ -61,7 +61,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 5);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
@@ -92,7 +92,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 3);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
@@ -121,7 +121,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 13);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 13, null);
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
@@ -157,7 +157,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 5);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
@@ -188,7 +188,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 3);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
@@ -209,7 +209,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 5);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
@@ -232,7 +232,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 3);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[]{
@@ -253,7 +253,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 3);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[]{
@@ -278,8 +278,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context,
-        3);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
 
@@ -307,8 +306,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context,
-        3);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[]{
@@ -333,8 +331,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context,
-        3);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
     verifyResult(partitions, new String[]{
         "'2013-01-01 01:01:01.123' <= TSCOL AND TSCOL < '2013-05-02 12:14:17.634'",
@@ -358,8 +355,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context,
-        3);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
     verifyResult(partitions, new String[]{
       "BCOL = TRUE",
@@ -382,8 +378,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context,
-        25);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 25, null);
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
@@ -428,7 +423,7 @@ public class TestImportPartitioner extends TestCase {
     ConnectionConfiguration connConf = new ConnectionConfiguration();
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 5);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
     assertEquals(partitions.size(), 5);
   }
@@ -448,8 +443,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context,
-        5);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
 
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index 58d6c10..a9645d0 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -443,7 +443,7 @@ public class JobManager implements Reconfigurable {
                    "Can't create destroyer instance: " + destroyerClass.getName());
        }
 
-       DestroyerContext destroyerContext = new DestroyerContext(request.getConnectorContext(), false);
+       DestroyerContext destroyerContext = new DestroyerContext(request.getConnectorContext(), false, request.getSummary().getConnectorSchema());
 
        // Initialize submission from connector perspective
        destroyer.destroy(destroyerContext, request.getConfigConnectorConnection(), request.getConfigConnectorJob());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/execution/mapreduce/pom.xml b/execution/mapreduce/pom.xml
index 31da5f1..f9a2a0e 100644
--- a/execution/mapreduce/pom.xml
+++ b/execution/mapreduce/pom.xml
@@ -47,6 +47,12 @@ limitations under the License.
       <scope>test</scope>
     </dependency>
 
+     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <!-- See profiles for Hadoop specific dependencies -->
 
   </dependencies>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
index e2b3ce8..7fd9a01 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -17,7 +17,6 @@
  */
 package org.apache.sqoop.job;
 
-import org.apache.hadoop.io.Text;
 import org.apache.sqoop.core.ConfigurationConstants;
 
 public final class JobConstants extends Constants {
@@ -28,8 +27,6 @@ public final class JobConstants extends Constants {
   public static final String PREFIX_JOB_CONFIG =
       ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job.";
 
-  public static final String JOB_TYPE = PREFIX_JOB_CONFIG + "type";
-
   public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG
       + "etl.partitioner";
 
@@ -53,42 +50,6 @@ public final class JobConstants extends Constants {
   public static final String JOB_ETL_EXTRACTOR_NUM = PREFIX_JOB_CONFIG
     + "etl.extractor.count";
 
-  public static final String JOB_CONFIG_CLASS_CONNECTOR_CONNECTION =
-    PREFIX_JOB_CONFIG + "config.class.connector.connection";
-
-  public static final String JOB_CONFIG_CLASS_CONNECTOR_JOB =
-    PREFIX_JOB_CONFIG + "config.class.connector.job";
-
-  public static final String JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION =
-    PREFIX_JOB_CONFIG + "config.class.framework.connection";
-
-  public static final String JOB_CONFIG_CLASS_FRAMEWORK_JOB =
-    PREFIX_JOB_CONFIG + "config.class.framework.job";
-
-  public static final String JOB_CONFIG_CONNECTOR_CONNECTION =
-    PREFIX_JOB_CONFIG + "config.connector.connection";
-
-  public static final Text JOB_CONFIG_CONNECTOR_CONNECTION_KEY =
-    new Text(JOB_CONFIG_CONNECTOR_CONNECTION);
-
-  public static final String JOB_CONFIG_CONNECTOR_JOB =
-    PREFIX_JOB_CONFIG + "config.connector.job";
-
-  public static final Text JOB_CONFIG_CONNECTOR_JOB_KEY =
-    new Text(JOB_CONFIG_CONNECTOR_JOB);
-
-  public static final String JOB_CONFIG_FRAMEWORK_CONNECTION =
-    PREFIX_JOB_CONFIG + "config.framework.connection";
-
-  public static final Text JOB_CONFIG_FRAMEWORK_CONNECTION_KEY =
-    new Text(JOB_CONFIG_FRAMEWORK_CONNECTION);
-
-  public static final String JOB_CONFIG_FRAMEWORK_JOB =
-    PREFIX_JOB_CONFIG + "config.framework.job";
-
-  public static final Text JOB_CONFIG_FRAMEWORK_JOB_KEY =
-    new Text(JOB_CONFIG_FRAMEWORK_JOB);
-
   public static final String PREFIX_CONNECTOR_CONTEXT =
     PREFIX_JOB_CONFIG + "connector.context.";
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
index 64ec437..f5f6d8e 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
@@ -20,42 +20,217 @@ package org.apache.sqoop.job.mr;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.json.util.SchemaSerialization;
 import org.apache.sqoop.model.FormUtils;
 import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.utils.ClassUtils;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
 
 /**
- * Helper class to load configuration specific objects from job configuration
+ * Helper class to store and load various information in/from MapReduce configuration
+ * object and JobConf object.
  */
 public final class ConfigurationUtils {
 
+  private static final String JOB_TYPE = JobConstants.PREFIX_JOB_CONFIG + "type";
+
+  private static final String JOB_CONFIG_CLASS_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.connection";
+
+  private static final String JOB_CONFIG_CLASS_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.job";
+
+  private static final String JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION =  JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.connection";
+
+  private static final String JOB_CONFIG_CLASS_FRAMEWORK_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.framework.job";
+
+  private static final String JOB_CONFIG_CONNECTOR_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.connector.connection";
+
+  private static final Text JOB_CONFIG_CONNECTOR_CONNECTION_KEY = new Text(JOB_CONFIG_CONNECTOR_CONNECTION);
+
+  private static final String JOB_CONFIG_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.connector.job";
+
+  private static final Text JOB_CONFIG_CONNECTOR_JOB_KEY = new Text(JOB_CONFIG_CONNECTOR_JOB);
+
+  private static final String JOB_CONFIG_FRAMEWORK_CONNECTION = JobConstants.PREFIX_JOB_CONFIG + "config.framework.connection";
+
+  private static final Text JOB_CONFIG_FRAMEWORK_CONNECTION_KEY = new Text(JOB_CONFIG_FRAMEWORK_CONNECTION);
+
+  private static final String JOB_CONFIG_FRAMEWORK_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.framework.job";
+
+  private static final Text JOB_CONFIG_FRAMEWORK_JOB_KEY = new Text(JOB_CONFIG_FRAMEWORK_JOB);
+
+  private static final String SCHEMA_CONNECTOR = JobConstants.PREFIX_JOB_CONFIG + "schema.connector";
+
+  private static final Text SCHEMA_CONNECTOR_KEY = new Text(SCHEMA_CONNECTOR);
+
+  private static final String SCHEMA_HIO = JobConstants.PREFIX_JOB_CONFIG + "schema.hio";
+
+  private static final Text SCHEMA_HIO_KEY = new Text(SCHEMA_HIO);
+
+  /**
+   * Persist job type in the configuration object.
+   *
+   * @param configuration MapReduce configuration object
+   * @param type Job type
+   */
+  public static void setJobType(Configuration configuration, MJob.Type type) {
+    configuration.set(JOB_TYPE, type.name());
+  }
+
+  /**
+   * Retrieve job type.
+   *
+   * @param configuration MapReduce configuration object
+   * @return Job type
+   */
   public static MJob.Type getJobType(Configuration configuration) {
-    return MJob.Type.valueOf(configuration.get(JobConstants.JOB_TYPE));
+    return MJob.Type.valueOf(configuration.get(JOB_TYPE));
+  }
+
+  /**
+   * Persist Connector configuration object for connection.
+   *
+   * @param job MapReduce job object
+   * @param obj Configuration object
+   */
+  public static void setConfigConnectorConnection(Job job, Object obj) {
+    job.getConfiguration().set(JOB_CONFIG_CLASS_CONNECTOR_CONNECTION, obj.getClass().getName());
+    job.getCredentials().addSecretKey(JOB_CONFIG_CONNECTOR_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
+  }
+
+  /**
+   * Persist Connector configuration object for job.
+   *
+   * @param job MapReduce job object
+   * @param obj Configuration object
+   */
+  public static void setConfigConnectorJob(Job job, Object obj) {
+    job.getConfiguration().set(JOB_CONFIG_CLASS_CONNECTOR_JOB, obj.getClass().getName());
+    job.getCredentials().addSecretKey(JOB_CONFIG_CONNECTOR_JOB_KEY, FormUtils.toJson(obj).getBytes());
   }
 
-  public static Object getConnectorConnection(Configuration configuration) {
-    return loadConfiguration((JobConf) configuration,
-      JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION,
-      JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION_KEY);
+  /**
+   * Persist Framework configuration object for connection.
+   *
+   * @param job MapReduce job object
+   * @param obj Configuration object
+   */
+  public static void setConfigFrameworkConnection(Job job, Object obj) {
+    job.getConfiguration().set(JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION, obj.getClass().getName());
+    job.getCredentials().addSecretKey(JOB_CONFIG_FRAMEWORK_CONNECTION_KEY, FormUtils.toJson(obj).getBytes());
   }
 
-  public static Object getConnectorJob(Configuration configuration) {
-    return loadConfiguration((JobConf) configuration,
-      JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB,
-      JobConstants.JOB_CONFIG_CONNECTOR_JOB_KEY);
+  /**
+   * Persist Framework configuration object for job.
+   *
+   * @param job MapReduce job object
+   * @param obj Configuration object
+   */
+  public static void setConfigFrameworkJob(Job job, Object obj) {
+    job.getConfiguration().set(JOB_CONFIG_CLASS_FRAMEWORK_JOB, obj.getClass().getName());
+    job.getCredentials().addSecretKey(JOB_CONFIG_FRAMEWORK_JOB_KEY, FormUtils.toJson(obj).getBytes());
   }
 
-  public static Object getFrameworkConnection(Configuration configuration) {
-    return loadConfiguration((JobConf) configuration,
-      JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION,
-      JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION_KEY);
+  /**
+   * Retrieve Connector configuration object for connection.
+   *
+   * @param configuration MapReduce configuration object
+   * @return Configuration object
+   */
+  public static Object getConfigConnectorConnection(Configuration configuration) {
+    return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_CONNECTOR_CONNECTION, JOB_CONFIG_CONNECTOR_CONNECTION_KEY);
   }
 
-  public static Object getFrameworkJob(Configuration configuration) {
-    return loadConfiguration((JobConf) configuration,
-      JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
-      JobConstants.JOB_CONFIG_FRAMEWORK_JOB_KEY);
+  /**
+   * Retrieve Connector configuration object for job.
+   *
+   * @param configuration MapReduce configuration object
+   * @return Configuration object
+   */
+  public static Object getConfigConnectorJob(Configuration configuration) {
+    return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_CONNECTOR_JOB, JOB_CONFIG_CONNECTOR_JOB_KEY);
+  }
+
+  /**
+   * Retrieve Framework configuration object for connection.
+   *
+   * @param configuration MapReduce configuration object
+   * @return Configuration object
+   */
+  public static Object getConfigFrameworkConnection(Configuration configuration) {
+    return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION, JOB_CONFIG_FRAMEWORK_CONNECTION_KEY);
+  }
+
+  /**
+   * Retrieve Framework configuration object for job.
+   *
+   * @param configuration MapReduce configuration object
+   * @return Configuration object
+   */
+  public static Object getConfigFrameworkJob(Configuration configuration) {
+    return loadConfiguration((JobConf) configuration, JOB_CONFIG_CLASS_FRAMEWORK_JOB, JOB_CONFIG_FRAMEWORK_JOB_KEY);
+  }
+
+  /**
+   * Persist Connector generated schema.
+   *
+   * @param job MapReduce Job object
+   * @param schema Schema
+   */
+  public static void setConnectorSchema(Job job, Schema schema) {
+    if(schema != null) {
+      job.getCredentials().addSecretKey(SCHEMA_CONNECTOR_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
+    }
+  }
+
+  /**
+   * Persist Framework generated schema.
+   *
+   * @param job MapReduce Job object
+   * @param schema Schema
+   */
+  public static void setHioSchema(Job job, Schema schema) {
+    if(schema != null) {
+      job.getCredentials().addSecretKey(SCHEMA_HIO_KEY, SchemaSerialization.extractSchema(schema).toJSONString().getBytes());
+    }
+  }
+
+  /**
+   * Retrieve Connector generated schema.
+   *
+   * @param configuration MapReduce configuration object
+   * @return Schema
+   */
+  public static Schema getConnectorSchema(Configuration configuration) {
+    return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_CONNECTOR_KEY));
+  }
+
+  /**
+   * Retrieve Framework generated schema.
+   *
+   * @param configuration MapReduce configuration object
+   * @return Schema
+   */
+  public static Schema getHioSchema(Configuration configuration) {
+    return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_HIO_KEY));
+  }
+
+  /**
+   * Deserialize schema from JSON encoded bytes.
+   *
+   * This method is null safe.
+   *
+   * @param bytes
+   * @return
+   */
+  private static Schema getSchemaFromBytes(byte[] bytes) {
+    if(bytes == null) {
+      return null;
+    }
+    return SchemaSerialization.restoreSchemna((JSONObject) JSONValue.parse(new String(bytes)));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
index 4493a45..8cae18e 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
@@ -24,6 +24,7 @@ import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
+import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.utils.ClassUtils;
 
 /**
@@ -52,10 +53,14 @@ public class SqoopDestroyerExecutor {
 
     // Objects that should be pass to the Destroyer execution
     PrefixContext subContext = new PrefixContext(configuration, JobConstants.PREFIX_CONNECTOR_CONTEXT);
-    Object configConnection = ConfigurationUtils.getConnectorConnection(configuration);
-    Object configJob = ConfigurationUtils.getConnectorJob(configuration);
+    Object configConnection = ConfigurationUtils.getConfigConnectorConnection(configuration);
+    Object configJob = ConfigurationUtils.getConfigConnectorJob(configuration);
 
-    DestroyerContext destroyerContext = new DestroyerContext(subContext, success);
+    // Propagate connector schema in every case for now
+    // TODO: Change to coditional choosing between HIO and Connector schema
+    Schema schema = ConfigurationUtils.getConnectorSchema(configuration);
+
+    DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema);
 
     LOG.info("Executing destroyer class " + destroyer.getClass());
     destroyer.destroy(destroyerContext, configConnection, configJob);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
index 0721b7e..3dec782 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -37,6 +37,7 @@ import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
 import org.apache.sqoop.job.etl.PartitionerContext;
+import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.utils.ClassUtils;
 
 /**
@@ -62,11 +63,12 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
     Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName);
 
     PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
-    Object connectorConnection = ConfigurationUtils.getConnectorConnection(conf);
-    Object connectorJob = ConfigurationUtils.getConnectorJob(conf);
+    Object connectorConnection = ConfigurationUtils.getConfigConnectorConnection(conf);
+    Object connectorJob = ConfigurationUtils.getConfigConnectorJob(conf);
+    Schema schema = ConfigurationUtils.getConnectorSchema(conf);
 
     long maxPartitions = conf.getLong(JobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
-    PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions);
+    PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema);
 
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorConnection, connectorJob);
     List<InputSplit> splits = new LinkedList<InputSplit>();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 7715d5f..a20d28c 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -35,6 +35,7 @@ import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.submission.counter.SqoopCounters;
 import org.apache.sqoop.utils.ClassUtils;
 
@@ -62,24 +63,28 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWrit
     Object configConnection = null;
     Object configJob = null;
 
+    // Propagate connector schema in every case for now
+    // TODO: Change to coditional choosing between HIO and Connector schema
+    Schema schema = ConfigurationUtils.getConnectorSchema(conf);
+
     // Executor is in connector space for IMPORT and in framework space for EXPORT
     switch (ConfigurationUtils.getJobType(conf)) {
       case IMPORT:
         subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
-        configConnection = ConfigurationUtils.getConnectorConnection(conf);
-        configJob = ConfigurationUtils.getConnectorJob(conf);
+        configConnection = ConfigurationUtils.getConfigConnectorConnection(conf);
+        configJob = ConfigurationUtils.getConfigConnectorJob(conf);
         break;
       case EXPORT:
         subContext = new PrefixContext(conf, "");
-        configConnection = ConfigurationUtils.getFrameworkConnection(conf);
-        configJob = ConfigurationUtils.getFrameworkJob(conf);
+        configConnection = ConfigurationUtils.getConfigFrameworkConnection(conf);
+        configJob = ConfigurationUtils.getConfigFrameworkJob(conf);
         break;
       default:
         throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
     }
 
     SqoopSplit split = context.getCurrentKey();
-    ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context));
+    ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context), schema);
 
     try {
       LOG.info("Starting progress service");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index d47f861..9232b76 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -39,6 +39,7 @@ import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.etl.io.DataReader;
+import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.utils.ClassUtils;
 
 public class SqoopOutputFormatLoadExecutor {
@@ -191,18 +192,23 @@ public class SqoopOutputFormatLoadExecutor {
         PrefixContext subContext = null;
         Object configConnection = null;
         Object configJob = null;
+        Schema schema = null;
 
         if (!isTest) {
+          // Propagate connector schema in every case for now
+          // TODO: Change to coditional choosing between HIO and Connector schema
+          schema = ConfigurationUtils.getConnectorSchema(conf);
+
           switch (ConfigurationUtils.getJobType(conf)) {
             case EXPORT:
               subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
-              configConnection = ConfigurationUtils.getConnectorConnection(conf);
-              configJob = ConfigurationUtils.getConnectorJob(conf);
+              configConnection = ConfigurationUtils.getConfigConnectorConnection(conf);
+              configJob = ConfigurationUtils.getConfigConnectorJob(conf);
               break;
             case IMPORT:
               subContext = new PrefixContext(conf, "");
-              configConnection = ConfigurationUtils.getFrameworkConnection(conf);
-              configJob = ConfigurationUtils.getFrameworkJob(conf);
+              configConnection = ConfigurationUtils.getConfigFrameworkConnection(conf);
+              configJob = ConfigurationUtils.getConfigFrameworkJob(conf);
               break;
             default:
               throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
@@ -210,7 +216,7 @@ public class SqoopOutputFormatLoadExecutor {
         }
 
         // Create loader context
-        LoaderContext loaderContext = new LoaderContext(subContext, reader);
+        LoaderContext loaderContext = new LoaderContext(subContext, reader, schema);
 
         LOG.info("Running loader class " + loaderName);
         loader.load(loaderContext, configConnection, configJob);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
index 58c3068..b7079dd 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
@@ -42,7 +42,9 @@ import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.PartitionerContext;
 import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.mr.ConfigurationUtils;
 import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
+import org.apache.sqoop.model.MJob;
 import org.junit.Test;
 
 public class TestHdfsExtract extends TestCase {
@@ -77,7 +79,7 @@ public class TestHdfsExtract extends TestCase {
     int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17};
 
     for(int maxPartitions : partitionValues) {
-      PartitionerContext partCont = new PartitionerContext(prefixContext, maxPartitions);
+      PartitionerContext partCont = new PartitionerContext(prefixContext, maxPartitions, null);
       List<Partition> partitionList = partitioner.getPartitions(partCont, null, null);
       assertTrue(partitionList.size()<=maxPartitions);
     }
@@ -90,7 +92,7 @@ public class TestHdfsExtract extends TestCase {
     createTextInput(null);
 
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
     conf.set(JobConstants.JOB_ETL_PARTITIONER,
         HdfsExportPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR,
@@ -108,7 +110,7 @@ public class TestHdfsExtract extends TestCase {
     createTextInput(SqoopFileOutputFormat.DEFAULT_CODEC);
 
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
     conf.set(JobConstants.JOB_ETL_PARTITIONER,
         HdfsExportPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR,
@@ -139,7 +141,7 @@ public class TestHdfsExtract extends TestCase {
     createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC);
 
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
     conf.set(JobConstants.JOB_ETL_PARTITIONER,
         HdfsExportPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR,
@@ -157,7 +159,7 @@ public class TestHdfsExtract extends TestCase {
     createSequenceInput(null);
 
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
     conf.set(JobConstants.JOB_ETL_PARTITIONER,
         HdfsExportPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR,

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
index b9b0631..f849aae 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -42,7 +42,9 @@ import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
 import org.apache.sqoop.job.etl.PartitionerContext;
 import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.mr.ConfigurationUtils;
 import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
+import org.apache.sqoop.model.MJob;
 
 public class TestHdfsLoad extends TestCase {
 
@@ -62,7 +64,7 @@ public class TestHdfsLoad extends TestCase {
     FileUtils.delete(outdir);
 
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_TYPE, "IMPORT");
+    ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
@@ -80,7 +82,7 @@ public class TestHdfsLoad extends TestCase {
     FileUtils.delete(outdir);
 
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_TYPE, "IMPORT");
+    ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
@@ -123,7 +125,7 @@ public class TestHdfsLoad extends TestCase {
     FileUtils.delete(outdir);
 
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_TYPE, "IMPORT");
+    ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
@@ -141,7 +143,7 @@ public class TestHdfsLoad extends TestCase {
     FileUtils.delete(outdir);
 
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_TYPE, "IMPORT");
+    ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index ee03427..7b264c6 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -42,10 +42,12 @@ import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
 import org.apache.sqoop.job.etl.PartitionerContext;
 import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.mr.ConfigurationUtils;
 import org.apache.sqoop.job.mr.SqoopInputFormat;
 import org.apache.sqoop.job.mr.SqoopMapper;
 import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
 import org.apache.sqoop.job.mr.SqoopSplit;
+import org.apache.sqoop.model.MJob;
 
 public class TestMapReduce extends TestCase {
 
@@ -55,7 +57,7 @@ public class TestMapReduce extends TestCase {
 
   public void testInputFormat() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_TYPE, "IMPORT");
+    ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     Job job = new Job(conf);
 
@@ -72,7 +74,7 @@ public class TestMapReduce extends TestCase {
 
   public void testMapper() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_TYPE, "IMPORT");
+    ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
 
@@ -82,7 +84,7 @@ public class TestMapReduce extends TestCase {
 
   public void testOutputFormat() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_TYPE, "IMPORT");
+    ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
new file mode 100644
index 0000000..5e2d099
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Form;
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Current tests are using mockito to propagate credentials from Job object
+ * to JobConf object. This implementation was chosen because it's not clear
+ * how MapReduce is converting one object to another.
+ */
+public class TestConfigurationUtils {
+
+  Job job;
+  JobConf jobConf;
+
+  @Before
+  public void setUp() throws Exception {
+    setUpJob();
+    setUpJobConf();
+  }
+
+  public void setUpJob() throws Exception {
+    job = Job.getInstance();
+  }
+
+  public void setUpJobConf() throws Exception {
+    jobConf = spy(new JobConf(job.getConfiguration()));
+    when(jobConf.getCredentials()).thenReturn(job.getCredentials());
+  }
+
+  @Test
+  public void testJobType() throws Exception {
+    ConfigurationUtils.setJobType(job.getConfiguration(), MJob.Type.IMPORT);
+    setUpJobConf();
+    assertEquals(MJob.Type.IMPORT, ConfigurationUtils.getJobType(jobConf));
+  }
+
+  @Test
+  public void testConfigConnectorConnection() throws Exception {
+    ConfigurationUtils.setConfigConnectorConnection(job, getConfig());
+    setUpJobConf();
+    assertEquals(getConfig(), ConfigurationUtils.getConfigConnectorConnection(jobConf));
+  }
+
+  @Test
+  public void testConfigConnectorJob() throws Exception {
+    ConfigurationUtils.setConfigConnectorJob(job, getConfig());
+    setUpJobConf();
+    assertEquals(getConfig(), ConfigurationUtils.getConfigConnectorJob(jobConf));
+  }
+
+  @Test
+  public void testConfigFrameworkConnection() throws Exception {
+    ConfigurationUtils.setConfigFrameworkConnection(job, getConfig());
+    setUpJobConf();
+    assertEquals(getConfig(), ConfigurationUtils.getConfigFrameworkConnection(jobConf));
+  }
+
+  @Test
+  public void testConfigFrameworkJob() throws Exception {
+    ConfigurationUtils.setConfigFrameworkJob(job, getConfig());
+    setUpJobConf();
+    assertEquals(getConfig(), ConfigurationUtils.getConfigFrameworkJob(jobConf));
+  }
+
+  @Test
+  public void testConnectorSchema() throws Exception {
+    ConfigurationUtils.setConnectorSchema(job, getSchema("a"));
+    assertEquals(getSchema("a"), ConfigurationUtils.getConnectorSchema(jobConf));
+  }
+
+  @Test
+  public void testConnectorSchemaNull() throws Exception {
+    ConfigurationUtils.setConnectorSchema(job, null);
+    assertNull(ConfigurationUtils.getConnectorSchema(jobConf));
+  }
+
+  @Test
+  public void testHioSchema() throws Exception {
+    ConfigurationUtils.setHioSchema(job, getSchema("a"));
+    assertEquals(getSchema("a"), ConfigurationUtils.getHioSchema(jobConf));
+  }
+
+  @Test
+  public void testHioSchemaNull() throws Exception {
+    ConfigurationUtils.setHioSchema(job, null);
+    assertNull(ConfigurationUtils.getHioSchema(jobConf));
+  }
+
+  private Schema getSchema(String name) {
+    return new Schema(name).addColumn(new Text("c1"));
+  }
+
+  private Config getConfig() {
+    Config c = new Config();
+    c.f.A = "This is secret text!";
+    return c;
+  }
+
+  @FormClass
+  public static class F {
+
+    @Input String A;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof F)) return false;
+
+      F f = (F) o;
+
+      if (A != null ? !A.equals(f.A) : f.A != null) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return A != null ? A.hashCode() : 0;
+    }
+  }
+
+  @ConfigurationClass
+  public static class Config {
+    @Form F f;
+
+    public Config() {
+      f = new F();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof Config)) return false;
+
+      Config config = (Config) o;
+
+      if (f != null ? !f.equals(config.f) : config.f != null)
+        return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return f != null ? f.hashCode() : 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index d2b501e..bee8ab7 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -27,6 +27,7 @@ import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.model.MJob;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -123,7 +124,7 @@ public class TestSqoopOutputFormatLoadExecutor {
 
   @Test(expected = BrokenBarrierException.class)
   public void testWhenLoaderThrows() throws Throwable {
-    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
     conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
     SqoopOutputFormatLoadExecutor executor = new
         SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
@@ -141,7 +142,7 @@ public class TestSqoopOutputFormatLoadExecutor {
 
   @Test
   public void testSuccessfulContinuousLoader() throws Throwable {
-    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
     conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName());
     SqoopOutputFormatLoadExecutor executor = new
         SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName());
@@ -184,7 +185,7 @@ public class TestSqoopOutputFormatLoadExecutor {
 
   @Test(expected = ConcurrentModificationException.class)
   public void testThrowingContinuousLoader() throws Throwable {
-    conf.set(JobConstants.JOB_TYPE, "EXPORT");
+    ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
     conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName());
     SqoopOutputFormatLoadExecutor executor = new
         SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/1b2441d2/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index 0e8c9f7..6fc485b 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -19,7 +19,6 @@ package org.apache.sqoop.submission.mapreduce;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
@@ -27,7 +26,6 @@ import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.security.Credentials;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
@@ -36,7 +34,7 @@ import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine;
 import org.apache.sqoop.framework.SubmissionRequest;
 import org.apache.sqoop.framework.SubmissionEngine;
 import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.model.FormUtils;
+import org.apache.sqoop.job.mr.ConfigurationUtils;
 import org.apache.sqoop.submission.counter.Counter;
 import org.apache.sqoop.submission.counter.CounterGroup;
 import org.apache.sqoop.submission.counter.Counters;
@@ -55,7 +53,6 @@ import java.util.Map;
  */
 public class MapreduceSubmissionEngine extends SubmissionEngine {
 
-
   private static Logger LOG = Logger.getLogger(MapreduceSubmissionEngine.class);
 
   /**
@@ -158,7 +155,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
     Configuration configuration = new Configuration(globalConfiguration);
 
     // Serialize job type as it will be needed by underlying execution engine
-    configuration.set(JobConstants.JOB_TYPE, request.getJobType().name());
+    ConfigurationUtils.setJobType(configuration, request.getJobType());
 
     // Serialize framework context into job configuration
     for(Map.Entry<String, String> entry: request.getFrameworkContext()) {
@@ -180,16 +177,6 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
         entry.getValue());
     }
 
-    // Serialize configuration objects - Firstly configuration classes
-    configuration.set(JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION,
-      request.getConfigConnectorConnection().getClass().getName());
-    configuration.set(JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB,
-      request.getConfigConnectorJob().getClass().getName());
-    configuration.set(JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION,
-      request.getConfigFrameworkConnection().getClass().getName());
-    configuration.set(JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
-      request.getConfigFrameworkJob().getClass().getName());
-
     // Set up notification URL if it's available
     if(request.getNotificationUrl() != null) {
       configuration.set("job.end.notification.url", request.getNotificationUrl());
@@ -217,15 +204,10 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
       Job job = new Job(configuration);
 
       // And finally put all configuration objects to credentials cache
-      Credentials credentials = job.getCredentials();
-      credentials.addSecretKey(JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION_KEY,
-        FormUtils.toJson(request.getConfigConnectorConnection()).getBytes());
-      credentials.addSecretKey(JobConstants.JOB_CONFIG_CONNECTOR_JOB_KEY,
-        FormUtils.toJson(request.getConfigConnectorJob()).getBytes());
-      credentials.addSecretKey(JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION_KEY,
-        FormUtils.toJson(request.getConfigFrameworkConnection()).getBytes());
-      credentials.addSecretKey(JobConstants.JOB_CONFIG_FRAMEWORK_JOB_KEY,
-        FormUtils.toJson(request.getConfigFrameworkConnection()).getBytes());
+      ConfigurationUtils.setConfigConnectorConnection(job, request.getConfigConnectorConnection());
+      ConfigurationUtils.setConfigConnectorJob(job, request.getConfigConnectorJob());
+      ConfigurationUtils.setConfigFrameworkConnection(job, request.getConfigFrameworkConnection());
+      ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob());
 
       if(request.getJobName() != null) {
         job.setJobName("Sqoop: " + request.getJobName());