You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/10/14 23:03:54 UTC

[1/2] SQOOP-1585: Sqoop2: Prefix mapreduce classes with MR ( no functionality change)

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 68577fbf7 -> cb8214806


http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java
new file mode 100644
index 0000000..972b555
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.model.ConfigurationClass;
+import org.apache.sqoop.model.Config;
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Current tests are using mockito to propagate credentials from hadoop Job object
+ * to hadoop JobConf object. This implementation was chosen because it's not clear
+ * how MapReduce is converting one object to another.
+ */
+public class TestMRConfigurationUtils {
+
+  Job job;
+  JobConf jobConfSpy;
+
+  @Before
+  public void setUp() throws Exception {
+    setUpHadoopJob();
+    setUpHadoopJobConf();
+  }
+
+  public void setUpHadoopJob() throws Exception {
+    job = new Job();
+  }
+
+  public void setUpHadoopJobConf() throws Exception {
+    jobConfSpy = spy(new JobConf(job.getConfiguration()));
+    when(jobConfSpy.getCredentials()).thenReturn(job.getCredentials());
+  }
+
+  @Test
+  public void testLinkConfiguration() throws Exception {
+    MRConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, getConfig());
+    setUpHadoopJobConf();
+    assertEquals(getConfig(), MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, jobConfSpy));
+
+    MRConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, getConfig());
+    setUpHadoopJobConf();
+    assertEquals(getConfig(), MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, jobConfSpy));
+  }
+
+  @Test
+  public void testJobConfiguration() throws Exception {
+    MRConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, getConfig());
+    setUpHadoopJobConf();
+    assertEquals(getConfig(), MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, jobConfSpy));
+
+    MRConfigurationUtils.setConnectorJobConfig(Direction.TO, job, getConfig());
+    setUpHadoopJobConf();
+    assertEquals(getConfig(), MRConfigurationUtils.getConnectorJobConfig(Direction.TO, jobConfSpy));
+  }
+
+  @Test
+  public void testDriverConfiguration() throws Exception {
+    MRConfigurationUtils.setDriverConfig(job, getConfig());
+    setUpHadoopJobConf();
+    assertEquals(getConfig(), MRConfigurationUtils.getDriverConfig(jobConfSpy));
+  }
+
+  @Test
+  public void testConnectorSchema() throws Exception {
+    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, getSchema("a"));
+    assertEquals(getSchema("a"), MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
+
+    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, getSchema("b"));
+    assertEquals(getSchema("b"), MRConfigurationUtils.getConnectorSchema(Direction.TO, jobConfSpy));
+  }
+
+  @Test
+  public void testConnectorSchemaNull() throws Exception {
+    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, null);
+    assertNull(MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
+
+    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, null);
+    assertNull(MRConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
+  }
+
+  private Schema getSchema(String name) {
+    return new Schema(name).addColumn(new Text("c1"));
+  }
+
+  private TestConfiguration getConfig() {
+    TestConfiguration c = new TestConfiguration();
+    c.c.A = "This is secret text!";
+    return c;
+  }
+
+  @ConfigClass
+  public static class C {
+
+    @Input String A;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof C)) return false;
+
+      C c = (C) o;
+
+      if (A != null ? !A.equals(c.A) : c.A != null) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return A != null ? A.hashCode() : 0;
+    }
+  }
+
+  @ConfigurationClass
+  public static class TestConfiguration {
+    @Config C c;
+
+    public TestConfiguration() {
+      c = new C();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof TestConfiguration)) return false;
+
+      TestConfiguration config = (TestConfiguration) o;
+
+      if (c != null ? !c.equals(config.c) : config.c != null)
+        return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return c != null ? c.hashCode() : 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index 1f411d2..5bd11f0 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.apache.sqoop.connector.idf.IntermediateDataFormat;
-import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.MRJobConstants;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.job.io.SqoopWritable;
@@ -120,13 +120,13 @@ public class TestSqoopOutputFormatLoadExecutor {
   @Before
   public void setUp() {
     conf = new Configuration();
-    conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+    conf.setIfUnset(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
 
   }
 
   @Test(expected = BrokenBarrierException.class)
   public void testWhenLoaderThrows() throws Throwable {
-    conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
+    conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
     SqoopOutputFormatLoadExecutor executor = new
         SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
@@ -145,7 +145,7 @@ public class TestSqoopOutputFormatLoadExecutor {
 
   @Test
   public void testSuccessfulContinuousLoader() throws Throwable {
-    conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName());
+    conf.set(MRJobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName());
     SqoopOutputFormatLoadExecutor executor = new
         SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName());
     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
@@ -192,7 +192,7 @@ public class TestSqoopOutputFormatLoadExecutor {
 
   @Test(expected = ConcurrentModificationException.class)
   public void testThrowingContinuousLoader() throws Throwable {
-    conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName());
+    conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName());
     SqoopOutputFormatLoadExecutor executor = new
         SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName());
     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index 0c492ef..646e8cb 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -39,8 +39,8 @@ import org.apache.sqoop.driver.SubmissionEngine;
 import org.apache.sqoop.execution.mapreduce.MRJobRequest;
 import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine;
 import org.apache.sqoop.driver.JobRequest;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.mr.ConfigurationUtils;
+import org.apache.sqoop.job.MRJobConstants;
+import org.apache.sqoop.job.mr.MRConfigurationUtils;
 import org.apache.sqoop.submission.SubmissionStatus;
 import org.apache.sqoop.submission.counter.Counter;
 import org.apache.sqoop.submission.counter.CounterGroup;
@@ -172,7 +172,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
         continue;
       }
       configuration.set(
-        JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT + entry.getKey(),
+        MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT + entry.getKey(),
         entry.getValue());
     }
 
@@ -182,7 +182,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
         continue;
       }
       configuration.set(
-          JobConstants.PREFIX_CONNECTOR_TO_CONTEXT + entry.getKey(),
+          MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT + entry.getKey(),
           entry.getValue());
     }
 
@@ -202,17 +202,17 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
       Job job = new Job(configuration);
 
       // link configs
-      ConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, request.getConnectorLinkConfig(Direction.FROM));
-      ConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, request.getConnectorLinkConfig(Direction.TO));
+      MRConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, request.getConnectorLinkConfig(Direction.FROM));
+      MRConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, request.getConnectorLinkConfig(Direction.TO));
 
       // from and to configs
-      ConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, request.getJobConfig(Direction.FROM));
-      ConfigurationUtils.setConnectorJobConfig(Direction.TO, job, request.getJobConfig(Direction.TO));
+      MRConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, request.getJobConfig(Direction.FROM));
+      MRConfigurationUtils.setConnectorJobConfig(Direction.TO, job, request.getJobConfig(Direction.TO));
 
-      ConfigurationUtils.setDriverConfig(job, request.getDriverConfig());
+      MRConfigurationUtils.setDriverConfig(job, request.getDriverConfig());
       // @TODO(Abe): Persist TO schema.
-      ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema());
-      ConfigurationUtils.setConnectorSchema(Direction.TO, job, request.getSummary().getToSchema());
+      MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema());
+      MRConfigurationUtils.setConnectorSchema(Direction.TO, job, request.getSummary().getToSchema());
 
       if(request.getJobName() != null) {
         job.setJobName("Sqoop: " + request.getJobName());


[2/2] git commit: SQOOP-1585: Sqoop2: Prefix mapreduce classes with MR ( no functionality change)

Posted by ja...@apache.org.
SQOOP-1585: Sqoop2: Prefix mapreduce classes with MR ( no functionality change)

(Veena Basavaraj via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cb821480
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cb821480
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cb821480

Branch: refs/heads/sqoop2
Commit: cb8214806b4e47dc3ad30d5bff0a42b04a412a06
Parents: 68577fb
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Oct 14 17:03:12 2014 -0400
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Oct 14 17:03:12 2014 -0400

----------------------------------------------------------------------
 .../apache/sqoop/job/etl/ExtractorContext.java  |   1 -
 .../mapreduce/MapreduceExecutionEngine.java     |  16 +-
 .../java/org/apache/sqoop/job/JobConstants.java |  81 ------
 .../org/apache/sqoop/job/MRExecutionError.java  |  97 +++++++
 .../org/apache/sqoop/job/MRJobConstants.java    |  81 ++++++
 .../sqoop/job/MapreduceExecutionError.java      |  97 -------
 .../main/java/org/apache/sqoop/job/io/Data.java |  26 +-
 .../apache/sqoop/job/mr/ConfigurationUtils.java | 278 -------------------
 .../sqoop/job/mr/MRConfigurationUtils.java      | 278 +++++++++++++++++++
 .../apache/sqoop/job/mr/ProgressRunnable.java   |  45 ---
 .../sqoop/job/mr/SqoopDestroyerExecutor.java    |  16 +-
 .../sqoop/job/mr/SqoopFileOutputFormat.java     |   8 +-
 .../apache/sqoop/job/mr/SqoopInputFormat.java   |  18 +-
 .../org/apache/sqoop/job/mr/SqoopMapper.java    |  26 +-
 .../sqoop/job/mr/SqoopNullOutputFormat.java     |   2 +-
 .../job/mr/SqoopOutputFormatLoadExecutor.java   |  28 +-
 .../sqoop/job/mr/SqoopProgressRunnable.java     |  45 +++
 .../org/apache/sqoop/job/mr/SqoopReducer.java   |   4 +-
 .../org/apache/sqoop/job/mr/SqoopSplit.java     |   6 +-
 .../org/apache/sqoop/job/TestMapReduce.java     |  32 +--
 .../java/org/apache/sqoop/job/TestMatching.java |  12 +-
 .../apache/sqoop/job/io/SqoopWritableTest.java  |   2 +-
 .../sqoop/job/mr/TestConfigurationUtils.java    | 168 -----------
 .../sqoop/job/mr/TestMRConfigurationUtils.java  | 168 +++++++++++
 .../mr/TestSqoopOutputFormatLoadExecutor.java   |  10 +-
 .../mapreduce/MapreduceSubmissionEngine.java    |  22 +-
 26 files changed, 783 insertions(+), 784 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
index 3272b56..4875ed0 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
@@ -19,7 +19,6 @@ package org.apache.sqoop.job.etl;
 
 import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.etl.io.DataWriter;
-import org.apache.sqoop.schema.Schema;
 
 /**
  * Context implementation for Extractor.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 47f8478..9b3eb44 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.driver.ExecutionEngine;
 import org.apache.sqoop.driver.JobRequest;
-import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.MRJobConstants;
 import org.apache.sqoop.job.etl.From;
 import org.apache.sqoop.job.etl.To;
 import org.apache.sqoop.job.io.SqoopWritable;
@@ -64,16 +64,16 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     From from = (From) mrJobRequest.getFrom();
     To to = (To) mrJobRequest.getTo();
     MutableMapContext context = mrJobRequest.getDriverContext();
-    context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName());
-    context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName());
-    context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName());
-    context.setString(JobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName());
-    context.setString(JobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName());
-    context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT,
+    context.setString(MRJobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName());
+    context.setString(MRJobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName());
+    context.setString(MRJobConstants.JOB_ETL_LOADER, to.getLoader().getName());
+    context.setString(MRJobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName());
+    context.setString(MRJobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName());
+    context.setString(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
         mrJobRequest.getIntermediateDataFormat().getName());
 
     if(mrJobRequest.getExtractors() != null) {
-      context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors());
+      context.setInteger(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
deleted file mode 100644
index 349bb60..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import org.apache.sqoop.core.ConfigurationConstants;
-import org.apache.sqoop.driver.DriverConstants;
-
-public final class JobConstants extends Constants {
-  /**
-   * All job related configuration is prefixed with this:
-   * <tt>org.apache.sqoop.job.</tt>
-   */
-  public static final String PREFIX_JOB_CONFIG =
-      ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job.";
-
-  public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG
-      + "etl.partitioner";
-
-  public static final String JOB_ETL_EXTRACTOR = PREFIX_JOB_CONFIG
-      + "etl.extractor";
-
-  public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG
-      + "etl.loader";
-
-  public static final String JOB_ETL_FROM_DESTROYER = PREFIX_JOB_CONFIG
-      + "etl.from.destroyer";
-
-  public static final String JOB_ETL_TO_DESTROYER = PREFIX_JOB_CONFIG
-      + "etl.to.destroyer";
-
-  public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG
-      + "mr.output.file";
-
-  public static final String JOB_MR_OUTPUT_CODEC = PREFIX_JOB_CONFIG
-      + "mr.output.codec";
-
-
-  public static final String JOB_ETL_EXTRACTOR_NUM = PREFIX_JOB_CONFIG
-    + "etl.extractor.count";
-
-  public static final String PREFIX_CONNECTOR_FROM_CONTEXT =
-    PREFIX_JOB_CONFIG + "connector.from.context.";
-
-  public static final String PREFIX_CONNECTOR_TO_CONTEXT =
-      PREFIX_JOB_CONFIG + "connector.to.context.";
-
-  // Hadoop specific constants
-  // We're using constants from Hadoop 1. Hadoop 2 has different names, but
-  // provides backward compatibility layer for those names as well.
-
-  public static final String HADOOP_INPUTDIR = "mapred.input.dir";
-
-  public static final String HADOOP_OUTDIR = "mapred.output.dir";
-
-  public static final String HADOOP_COMPRESS = "mapred.output.compress";
-
-  public static final String HADOOP_COMPRESS_CODEC =
-    "mapred.output.compression.codec";
-
-  public static final String INTERMEDIATE_DATA_FORMAT =
-    DriverConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format";
-
-  private JobConstants() {
-    // Disable explicit object creation
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java
new file mode 100644
index 0000000..e70b7e2
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRExecutionError.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import org.apache.sqoop.common.ErrorCode;
+
+/**
+ *
+ */
+public enum MRExecutionError implements ErrorCode {
+
+  MAPRED_EXEC_0000("Unknown error"),
+
+  /** Error occurs during job execution. */
+  MAPRED_EXEC_0008("Error occurs during job execution"),
+
+  /** The system was unable to load the specified class. */
+  MAPRED_EXEC_0009("Unable to load the specified class"),
+
+  /** The system was unable to instantiate the specified class. */
+  MAPRED_EXEC_0010("Unable to instantiate the specified class"),
+
+  /** The parameter already exists in the context */
+  MAPRED_EXEC_0011("The parameter already exists in the context"),
+
+  /** The type is not supported */
+  MAPRED_EXEC_0012("The type is not supported"),
+
+  /** Cannot write to the data writer */
+  MAPRED_EXEC_0013("Cannot write to the data writer"),
+
+  /** Cannot read from the data reader */
+  MAPRED_EXEC_0014("Cannot read to the data reader"),
+
+  /** Unable to write data due to interrupt */
+  MAPRED_EXEC_0015("Unable to write data due to interrupt"),
+
+  /** Unable to read data due to interrupt */
+  MAPRED_EXEC_0016("Unable to read data due to interrupt"),
+
+  /** Error occurs during extractor run */
+  MAPRED_EXEC_0017("Error occurs during extractor run"),
+
+  /** Error occurs during loader run */
+  MAPRED_EXEC_0018("Error occurs during loader run"),
+
+  MAPRED_EXEC_0019("Data have not been completely consumed yet"),
+
+  /** The required option has not been set yet */
+  MAPRED_EXEC_0020("The required option has not been set yet"),
+
+  /** Error occurs during partitioner run */
+  MAPRED_EXEC_0021("Error occurs during partitioner run"),
+
+  /** Unable to parse because it is not properly delimited */
+  MAPRED_EXEC_0022("Unable to parse because it is not properly delimited"),
+
+  /** Unknown job type */
+  MAPRED_EXEC_0023("Unknown job type"),
+
+  /** Unsupported output format type found **/
+  MAPRED_EXEC_0024("Unknown output format type"),
+
+  /** Got invalid number of partitions from Partitioner */
+  MAPRED_EXEC_0025("Retrieved invalid number of partitions from Partitioner"),
+
+  ;
+
+  private final String message;
+
+  private MRExecutionError(String message) {
+    this.message = message;
+  }
+
+  public String getCode() {
+    return name();
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
new file mode 100644
index 0000000..67021a8
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import org.apache.sqoop.core.ConfigurationConstants;
+import org.apache.sqoop.driver.DriverConstants;
+
+public final class MRJobConstants extends Constants {
+  /**
+   * All job related configuration is prefixed with this:
+   * <tt>org.apache.sqoop.job.</tt>
+   */
+  public static final String PREFIX_JOB_CONFIG =
+      ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job.";
+
+  public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG
+      + "etl.partitioner";
+
+  public static final String JOB_ETL_EXTRACTOR = PREFIX_JOB_CONFIG
+      + "etl.extractor";
+
+  public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG
+      + "etl.loader";
+
+  public static final String JOB_ETL_FROM_DESTROYER = PREFIX_JOB_CONFIG
+      + "etl.from.destroyer";
+
+  public static final String JOB_ETL_TO_DESTROYER = PREFIX_JOB_CONFIG
+      + "etl.to.destroyer";
+
+  public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG
+      + "mr.output.file";
+
+  public static final String JOB_MR_OUTPUT_CODEC = PREFIX_JOB_CONFIG
+      + "mr.output.codec";
+
+
+  public static final String JOB_ETL_EXTRACTOR_NUM = PREFIX_JOB_CONFIG
+    + "etl.extractor.count";
+
+  public static final String PREFIX_CONNECTOR_FROM_CONTEXT =
+    PREFIX_JOB_CONFIG + "connector.from.context.";
+
+  public static final String PREFIX_CONNECTOR_TO_CONTEXT =
+      PREFIX_JOB_CONFIG + "connector.to.context.";
+
+  // Hadoop specific constants
+  // We're using constants from Hadoop 1. Hadoop 2 has different names, but
+  // provides backward compatibility layer for those names as well.
+
+  public static final String HADOOP_INPUTDIR = "mapred.input.dir";
+
+  public static final String HADOOP_OUTDIR = "mapred.output.dir";
+
+  public static final String HADOOP_COMPRESS = "mapred.output.compress";
+
+  public static final String HADOOP_COMPRESS_CODEC =
+    "mapred.output.compression.codec";
+
+  public static final String INTERMEDIATE_DATA_FORMAT =
+    DriverConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format";
+
+  private MRJobConstants() {
+    // Disable explicit object creation
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java
deleted file mode 100644
index 1dc12d1..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import org.apache.sqoop.common.ErrorCode;
-
-/**
- *
- */
-public enum MapreduceExecutionError implements ErrorCode {
-
-  MAPRED_EXEC_0000("Unknown error"),
-
-  /** Error occurs during job execution. */
-  MAPRED_EXEC_0008("Error occurs during job execution"),
-
-  /** The system was unable to load the specified class. */
-  MAPRED_EXEC_0009("Unable to load the specified class"),
-
-  /** The system was unable to instantiate the specified class. */
-  MAPRED_EXEC_0010("Unable to instantiate the specified class"),
-
-  /** The parameter already exists in the context */
-  MAPRED_EXEC_0011("The parameter already exists in the context"),
-
-  /** The type is not supported */
-  MAPRED_EXEC_0012("The type is not supported"),
-
-  /** Cannot write to the data writer */
-  MAPRED_EXEC_0013("Cannot write to the data writer"),
-
-  /** Cannot read from the data reader */
-  MAPRED_EXEC_0014("Cannot read to the data reader"),
-
-  /** Unable to write data due to interrupt */
-  MAPRED_EXEC_0015("Unable to write data due to interrupt"),
-
-  /** Unable to read data due to interrupt */
-  MAPRED_EXEC_0016("Unable to read data due to interrupt"),
-
-  /** Error occurs during extractor run */
-  MAPRED_EXEC_0017("Error occurs during extractor run"),
-
-  /** Error occurs during loader run */
-  MAPRED_EXEC_0018("Error occurs during loader run"),
-
-  MAPRED_EXEC_0019("Data have not been completely consumed yet"),
-
-  /** The required option has not been set yet */
-  MAPRED_EXEC_0020("The required option has not been set yet"),
-
-  /** Error occurs during partitioner run */
-  MAPRED_EXEC_0021("Error occurs during partitioner run"),
-
-  /** Unable to parse because it is not properly delimited */
-  MAPRED_EXEC_0022("Unable to parse because it is not properly delimited"),
-
-  /** Unknown job type */
-  MAPRED_EXEC_0023("Unknown job type"),
-
-  /** Unsupported output format type found **/
-  MAPRED_EXEC_0024("Unknown output format type"),
-
-  /** Got invalid number of partitions from Partitioner */
-  MAPRED_EXEC_0025("Retrieved invalid number of partitions from Partitioner"),
-
-  ;
-
-  private final String message;
-
-  private MapreduceExecutionError(String message) {
-    this.message = message;
-  }
-
-  public String getCode() {
-    return name();
-  }
-
-  public String getMessage() {
-    return message;
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
index 5423b7b..139883e 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.MRExecutionError;
 
 public class Data implements WritableComparable<Data> {
 
@@ -76,7 +76,7 @@ public class Data implements WritableComparable<Data> {
       this.content = content;
       break;
     default:
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
     }
   }
 
@@ -87,7 +87,7 @@ public class Data implements WritableComparable<Data> {
     case ARRAY_RECORD:
       return parse();
     default:
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType));
+      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType));
     }
   }
 
@@ -141,7 +141,7 @@ public class Data implements WritableComparable<Data> {
       }
       return result;
     default:
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
     }
   }
 
@@ -156,7 +156,7 @@ public class Data implements WritableComparable<Data> {
       readArray(in);
       break;
     default:
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
     }
   }
 
@@ -171,7 +171,7 @@ public class Data implements WritableComparable<Data> {
       writeArray(out);
       break;
     default:
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
     }
   }
 
@@ -249,7 +249,7 @@ public class Data implements WritableComparable<Data> {
 
       default:
         throw new IOException(
-          new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, Integer.toString(type))
+          new SqoopException(MRExecutionError.MAPRED_EXEC_0012, Integer.toString(type))
         );
       }
     }
@@ -307,7 +307,7 @@ public class Data implements WritableComparable<Data> {
 
       } else {
         throw new IOException(
-          new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012,
+          new SqoopException(MRExecutionError.MAPRED_EXEC_0012,
             array[i].getClass().getName()
           )
         );
@@ -351,7 +351,7 @@ public class Data implements WritableComparable<Data> {
       return sb.toString();
 
     default:
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
     }
   }
 
@@ -399,7 +399,7 @@ public class Data implements WritableComparable<Data> {
       return (Object[])content;
 
     default:
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
     }
   }
 
@@ -418,7 +418,7 @@ public class Data implements WritableComparable<Data> {
     case FieldTypes.UTF:
       if (field.charAt(0) != stringDelimiter ||
           field.charAt(field.length()-1) != stringDelimiter) {
-        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0022);
+        throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022);
       }
       list.add(index, unescape(field.substring(1, field.length()-1)));
       break;
@@ -426,7 +426,7 @@ public class Data implements WritableComparable<Data> {
     case FieldTypes.BIN:
       if (field.charAt(0) != '[' ||
           field.charAt(field.length()-1) != ']') {
-        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0022);
+        throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022);
       }
       String[] splits =
           field.substring(1, field.length()-1).split(String.valueOf(','));
@@ -474,7 +474,7 @@ public class Data implements WritableComparable<Data> {
       break;
 
     default:
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0012, String.valueOf(fieldType));
+      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(fieldType));
     }
 
     return ++index;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
deleted file mode 100644
index 0fa07f7..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.json.util.SchemaSerialization;
-import org.apache.sqoop.model.ConfigUtils;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.utils.ClassUtils;
-import org.json.simple.JSONObject;
-import org.json.simple.JSONValue;
-
-import java.io.InputStream;
-import java.util.Properties;
-
-/**
- * Helper class to store and load various information in/from MapReduce configuration
- * object and JobConf object.
- */
-public final class ConfigurationUtils {
-
-  private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.link";
-
-  private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.link";
-
-  private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.job";
-
-  private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB = JobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.job";
-
-  private static final String MR_JOB_CONFIG_DRIVER_CONFIG_CLASS = JobConstants.PREFIX_JOB_CONFIG + "config.class.driver";
-
-  private static final String MR_JOB_CONFIG_FROM_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.link";
-
-  private static final Text MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_FROM_CONNECTOR_LINK);
-
-  private static final String MR_JOB_CONFIG_TO_CONNECTOR_LINK = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.link";
-
-  private static final Text MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_TO_CONNECTOR_LINK);
-
-  private static final String MR_JOB_CONFIG_FROM_JOB_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.connector.from.job";
-
-  private static final Text MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_FROM_JOB_CONFIG);
-
-  private static final String MR_JOB_CONFIG_TO_JOB_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.connector.to.job";
-
-  private static final Text MR_JOB_CONFIG_TO_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_TO_JOB_CONFIG);
-
-  private static final String MR_JOB_CONFIG_DRIVER_CONFIG = JobConstants.PREFIX_JOB_CONFIG + "config.driver";
-
-  private static final Text MR_JOB_CONFIG_DRIVER_CONFIG_KEY = new Text(MR_JOB_CONFIG_DRIVER_CONFIG);
-
-  private static final String SCHEMA_FROM = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.from";
-
-  private static final Text SCHEMA_FROM_KEY = new Text(SCHEMA_FROM);
-
-  private static final String SCHEMA_TO = JobConstants.PREFIX_JOB_CONFIG + "schema.connector.to";
-
-  private static final Text SCHEMA_TO_KEY = new Text(SCHEMA_TO);
-
-
-  /**
-   * Persist Connector configuration object for link.
-   *
-   * @param job MapReduce job object
-   * @param obj Configuration object
-   */
-  public static void setConnectorLinkConfig(Direction type, Job job, Object obj) {
-    switch (type) {
-      case FROM:
-        job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, obj.getClass().getName());
-        job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes());
-        break;
-
-      case TO:
-        job.getConfiguration().set(MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK, obj.getClass().getName());
-        job.getCredentials().addSecretKey(MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes());
-        break;
-    }
-  }
-
-  /**
-   * Persist Connector configuration objects for job.
-   *
-   * @param job MapReduce job object
-   * @param obj Configuration object
-   */
-  public static void setConnectorJobConfig(Direction type, Job job, Object obj) {
-    switch (type) {
-      case FROM:
-        job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, obj.getClass().getName());
-        job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes());
-        break;
-
-      case TO:
-        job.getConfiguration().set(MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, obj.getClass().getName());
-        job.getCredentials().addSecretKey(MR_JOB_CONFIG_TO_JOB_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes());
-        break;
-    }
-  }
-
-
-  /**
-   * Persist driver configuration object for job.
-   *
-   * @param job MapReduce job object
-   * @param obj Configuration object
-   */
-  public static void setDriverConfig(Job job, Object obj) {
-    job.getConfiguration().set(MR_JOB_CONFIG_DRIVER_CONFIG_CLASS, obj.getClass().getName());
-    job.getCredentials().addSecretKey(MR_JOB_CONFIG_DRIVER_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes());
-  }
-
-  /**
-   * Persist Connector generated schema.
-   *
-   * @param type  Direction of schema we are persisting
-   * @param job MapReduce Job object
-   * @param schema Schema
-   */
-  public static void setConnectorSchema(Direction type, Job job, Schema schema) {
-    if(schema != null) {
-      String jsonSchema =  SchemaSerialization.extractSchema(schema).toJSONString();
-      switch (type) {
-        case FROM:
-          job.getCredentials().addSecretKey(SCHEMA_FROM_KEY,jsonSchema.getBytes());
-          return;
-        case TO:
-          job.getCredentials().addSecretKey(SCHEMA_TO_KEY, jsonSchema.getBytes());
-          return;
-      }
-    }
-  }
-
-  /**
-   * Retrieve Connector configuration object for connection.
-   * @param configuration MapReduce configuration object
-   * @return Configuration object
-   */
-  public static Object getConnectorConnectionConfig(Direction type, Configuration configuration) {
-    switch (type) {
-      case FROM:
-        return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY);
-
-      case TO:
-        return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK, MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY);
-    }
-
-    return null;
-  }
-
-  /**
-   * Retrieve Connector configuration object for job.
-   *
-   * @param configuration MapReduce configuration object
-   * @return Configuration object
-   */
-  public static Object getConnectorJobConfig(Direction type, Configuration configuration) {
-    switch (type) {
-      case FROM:
-        return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY);
-
-      case TO:
-        return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, MR_JOB_CONFIG_TO_JOB_CONFIG_KEY);
-    }
-
-    return null;
-  }
-
-  /**
-   * Retrieve Framework configuration object for job.
-   *
-   * @param configuration MapReduce configuration object
-   * @return Configuration object
-   */
-  public static Object getDriverConfig(Configuration configuration) {
-    return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_DRIVER_CONFIG_CLASS, MR_JOB_CONFIG_DRIVER_CONFIG_KEY);
-  }
-
-
-
-  /**
-   * Retrieve Connector generated schema.
-   *
-   * @param type The FROM or TO connector
-   * @param configuration MapReduce configuration object
-   */
-  public static Schema getConnectorSchema(Direction type, Configuration configuration) {
-    switch (type) {
-      case FROM:
-        return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_KEY));
-
-      case TO:
-        return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_KEY));
-    }
-
-    return null;
-  }
-
-  /**
-   * Deserialize schema from JSON encoded bytes.
-   *
-   * This method is null safe.
-   *
-   * @param bytes
-   * @return
-   */
-  private static Schema getSchemaFromBytes(byte[] bytes) {
-    if(bytes == null) {
-      return null;
-    }
-
-    JSONObject jsonSchema = (JSONObject) JSONValue.parse(new String(bytes));
-    return SchemaSerialization.restoreSchema(jsonSchema);
-  }
-
-  /**
-   * Load configuration instance serialized in Hadoop credentials cache.
-   *
-   * @param configuration JobConf object associated with the job
-   * @param classProperty Property with stored configuration class name
-   * @param valueProperty Property with stored JSON representation of the
-   *                      configuration object
-   * @return New instance with loaded data
-   */
-  private static Object loadConfiguration(JobConf configuration, String classProperty, Text valueProperty) {
-    // Create new instance of configuration class
-    Object object = ClassUtils.instantiate(configuration.get(classProperty));
-    if(object == null) {
-      return null;
-    }
-
-    String json = new String(configuration.getCredentials().getSecretKey(valueProperty));
-
-    // Fill it with JSON data
-    ConfigUtils.fillValues(json, object);
-
-    // And give it back
-    return object;
-  }
-
-  private ConfigurationUtils() {
-    // Instantiation is prohibited
-  }
-
-  public static void configureLogging() {
-    try {
-      Properties props = new Properties();
-      InputStream resourceAsStream =
-          SqoopMapper.class.getResourceAsStream("/META-INF/log4j.properties");
-      props.load(resourceAsStream);
-      PropertyConfigurator.configure(props);
-    } catch (Exception e) {
-      System.err.println("Encountered exception while configuring logging " +
-        "for sqoop: " + e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java
new file mode 100644
index 0000000..03a1dec
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.job.MRJobConstants;
+import org.apache.sqoop.json.util.SchemaSerialization;
+import org.apache.sqoop.model.ConfigUtils;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.utils.ClassUtils;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * Helper class to store and load various information in/from MapReduce configuration
+ * object and JobConf object.
+ */
+public final class MRConfigurationUtils {
+
+  private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.link";
+
+  private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.link";
+
+  private static final String MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.from.job";
+
+  private static final String MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.connector.to.job";
+
+  private static final String MR_JOB_CONFIG_DRIVER_CONFIG_CLASS = MRJobConstants.PREFIX_JOB_CONFIG + "config.class.driver";
+
+  private static final String MR_JOB_CONFIG_FROM_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.from.link";
+
+  private static final Text MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_FROM_CONNECTOR_LINK);
+
+  private static final String MR_JOB_CONFIG_TO_CONNECTOR_LINK = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.to.link";
+
+  private static final Text MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY = new Text(MR_JOB_CONFIG_TO_CONNECTOR_LINK);
+
+  private static final String MR_JOB_CONFIG_FROM_JOB_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.from.job";
+
+  private static final Text MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_FROM_JOB_CONFIG);
+
+  private static final String MR_JOB_CONFIG_TO_JOB_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.connector.to.job";
+
+  private static final Text MR_JOB_CONFIG_TO_JOB_CONFIG_KEY = new Text(MR_JOB_CONFIG_TO_JOB_CONFIG);
+
+  private static final String MR_JOB_CONFIG_DRIVER_CONFIG = MRJobConstants.PREFIX_JOB_CONFIG + "config.driver";
+
+  private static final Text MR_JOB_CONFIG_DRIVER_CONFIG_KEY = new Text(MR_JOB_CONFIG_DRIVER_CONFIG);
+
+  private static final String SCHEMA_FROM = MRJobConstants.PREFIX_JOB_CONFIG + "schema.connector.from";
+
+  private static final Text SCHEMA_FROM_KEY = new Text(SCHEMA_FROM);
+
+  private static final String SCHEMA_TO = MRJobConstants.PREFIX_JOB_CONFIG + "schema.connector.to";
+
+  private static final Text SCHEMA_TO_KEY = new Text(SCHEMA_TO);
+
+
+  /**
+   * Persist Connector configuration object for link.
+   *
+   * @param job MapReduce job object
+   * @param obj Configuration object
+   */
+  public static void setConnectorLinkConfig(Direction type, Job job, Object obj) {
+    switch (type) {
+      case FROM:
+        job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, obj.getClass().getName());
+        job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes());
+        break;
+
+      case TO:
+        job.getConfiguration().set(MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK, obj.getClass().getName());
+        job.getCredentials().addSecretKey(MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes());
+        break;
+    }
+  }
+
+  /**
+   * Persist Connector configuration objects for job.
+   *
+   * @param job MapReduce job object
+   * @param obj Configuration object
+   */
+  public static void setConnectorJobConfig(Direction type, Job job, Object obj) {
+    switch (type) {
+      case FROM:
+        job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, obj.getClass().getName());
+        job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes());
+        break;
+
+      case TO:
+        job.getConfiguration().set(MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, obj.getClass().getName());
+        job.getCredentials().addSecretKey(MR_JOB_CONFIG_TO_JOB_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes());
+        break;
+    }
+  }
+
+
+  /**
+   * Persist driver configuration object for job.
+   *
+   * @param job MapReduce job object
+   * @param obj Configuration object
+   */
+  public static void setDriverConfig(Job job, Object obj) {
+    job.getConfiguration().set(MR_JOB_CONFIG_DRIVER_CONFIG_CLASS, obj.getClass().getName());
+    job.getCredentials().addSecretKey(MR_JOB_CONFIG_DRIVER_CONFIG_KEY, ConfigUtils.toJson(obj).getBytes());
+  }
+
+  /**
+   * Persist Connector generated schema.
+   *
+   * @param type  Direction of schema we are persisting
+   * @param job MapReduce Job object
+   * @param schema Schema
+   */
+  public static void setConnectorSchema(Direction type, Job job, Schema schema) {
+    if(schema != null) {
+      String jsonSchema =  SchemaSerialization.extractSchema(schema).toJSONString();
+      switch (type) {
+        case FROM:
+          job.getCredentials().addSecretKey(SCHEMA_FROM_KEY,jsonSchema.getBytes());
+          return;
+        case TO:
+          job.getCredentials().addSecretKey(SCHEMA_TO_KEY, jsonSchema.getBytes());
+          return;
+      }
+    }
+  }
+
+  /**
+   * Retrieve Connector configuration object for connection.
+   * @param configuration MapReduce configuration object
+   * @return Configuration object
+   */
+  public static Object getConnectorConnectionConfig(Direction type, Configuration configuration) {
+    switch (type) {
+      case FROM:
+        return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY);
+
+      case TO:
+        return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_TO_CONNECTOR_LINK, MR_JOB_CONFIG_TO_CONNECTOR_LINK_KEY);
+    }
+
+    return null;
+  }
+
+  /**
+   * Retrieve Connector configuration object for job.
+   *
+   * @param configuration MapReduce configuration object
+   * @return Configuration object
+   */
+  public static Object getConnectorJobConfig(Direction type, Configuration configuration) {
+    switch (type) {
+      case FROM:
+        return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_JOB, MR_JOB_CONFIG_FROM_JOB_CONFIG_KEY);
+
+      case TO:
+        return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_TO_CONNECTOR_JOB, MR_JOB_CONFIG_TO_JOB_CONFIG_KEY);
+    }
+
+    return null;
+  }
+
+  /**
+   * Retrieve Framework configuration object for job.
+   *
+   * @param configuration MapReduce configuration object
+   * @return Configuration object
+   */
+  public static Object getDriverConfig(Configuration configuration) {
+    return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_DRIVER_CONFIG_CLASS, MR_JOB_CONFIG_DRIVER_CONFIG_KEY);
+  }
+
+
+
+  /**
+   * Retrieve Connector generated schema.
+   *
+   * @param type The FROM or TO connector
+   * @param configuration MapReduce configuration object
+   */
+  public static Schema getConnectorSchema(Direction type, Configuration configuration) {
+    switch (type) {
+      case FROM:
+        return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_FROM_KEY));
+
+      case TO:
+        return getSchemaFromBytes(((JobConf) configuration).getCredentials().getSecretKey(SCHEMA_TO_KEY));
+    }
+
+    return null;
+  }
+
+  /**
+   * Deserialize schema from JSON encoded bytes.
+   *
+   * This method is null safe.
+   *
+   * @param bytes
+   * @return
+   */
+  private static Schema getSchemaFromBytes(byte[] bytes) {
+    if(bytes == null) {
+      return null;
+    }
+
+    JSONObject jsonSchema = (JSONObject) JSONValue.parse(new String(bytes));
+    return SchemaSerialization.restoreSchema(jsonSchema);
+  }
+
+  /**
+   * Load configuration instance serialized in Hadoop credentials cache.
+   *
+   * @param configuration JobConf object associated with the job
+   * @param classProperty Property with stored configuration class name
+   * @param valueProperty Property with stored JSON representation of the
+   *                      configuration object
+   * @return New instance with loaded data
+   */
+  private static Object loadConfiguration(JobConf configuration, String classProperty, Text valueProperty) {
+    // Create new instance of configuration class
+    Object object = ClassUtils.instantiate(configuration.get(classProperty));
+    if(object == null) {
+      return null;
+    }
+
+    String json = new String(configuration.getCredentials().getSecretKey(valueProperty));
+
+    // Fill it with JSON data
+    ConfigUtils.fillValues(json, object);
+
+    // And give it back
+    return object;
+  }
+
+  private MRConfigurationUtils() {
+    // Instantiation is prohibited
+  }
+
+  public static void configureLogging() {
+    try {
+      Properties props = new Properties();
+      InputStream resourceAsStream =
+          SqoopMapper.class.getResourceAsStream("/META-INF/log4j.properties");
+      props.load(resourceAsStream);
+      PropertyConfigurator.configure(props);
+    } catch (Exception e) {
+      System.err.println("Encountered exception while configuring logging " +
+        "for sqoop: " + e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java
deleted file mode 100644
index 4c2e206..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ProgressRunnable.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.log4j.Logger;
-
-
-/**
-  * Runnable that will ping mapreduce context about progress.
-  */
-public class ProgressRunnable implements Runnable {
-
-  public static final Logger LOG = Logger.getLogger(ProgressRunnable.class);
-
-  /**
-   * Context class that we should use for reporting progress.
-   */
-  private final TaskInputOutputContext<?,?,?,?> context;
-
-  public ProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctxt) {
-    this.context = ctxt;
-  }
-
-  @Override
-  public void run() {
-    LOG.debug("Auto-progress thread reporting progress");
-    this.context.progress();
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
index 8d2a1da..b385926 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
@@ -20,7 +20,7 @@ package org.apache.sqoop.job.mr;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.MRJobConstants;
 import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
@@ -47,13 +47,13 @@ public class SqoopDestroyerExecutor {
     switch (direction) {
       default:
       case FROM:
-        destroyerPropertyName = JobConstants.JOB_ETL_FROM_DESTROYER;
-        prefixPropertyName = JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT;
+        destroyerPropertyName = MRJobConstants.JOB_ETL_FROM_DESTROYER;
+        prefixPropertyName = MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT;
         break;
 
       case TO:
-        destroyerPropertyName = JobConstants.JOB_ETL_TO_DESTROYER;
-        prefixPropertyName = JobConstants.PREFIX_CONNECTOR_TO_CONTEXT;
+        destroyerPropertyName = MRJobConstants.JOB_ETL_TO_DESTROYER;
+        prefixPropertyName = MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT;
         break;
     }
 
@@ -66,11 +66,11 @@ public class SqoopDestroyerExecutor {
 
     // Objects that should be pass to the Destroyer execution
     PrefixContext subContext = new PrefixContext(configuration, prefixPropertyName);
-    Object configConnection = ConfigurationUtils.getConnectorConnectionConfig(direction, configuration);
-    Object configJob = ConfigurationUtils.getConnectorJobConfig(direction, configuration);
+    Object configConnection = MRConfigurationUtils.getConnectorConnectionConfig(direction, configuration);
+    Object configJob = MRConfigurationUtils.getConnectorJobConfig(direction, configuration);
 
     // Propagate connector schema in every case for now
-    Schema schema = ConfigurationUtils.getConnectorSchema(direction, configuration);
+    Schema schema = MRConfigurationUtils.getConnectorSchema(direction, configuration);
 
     DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
index ca77e16..f451044 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.MRJobConstants;
 import org.apache.sqoop.job.io.SqoopWritable;
 
 /**
@@ -56,13 +56,13 @@ public class SqoopFileOutputFormat
 
     Path filepath = getDefaultWorkFile(context, "");
     String filename = filepath.toString();
-    conf.set(JobConstants.JOB_MR_OUTPUT_FILE, filename);
+    conf.set(MRJobConstants.JOB_MR_OUTPUT_FILE, filename);
 
     boolean isCompressed = getCompressOutput(context);
     if (isCompressed) {
       String codecname =
-          conf.get(JobConstants.HADOOP_COMPRESS_CODEC, DEFAULT_CODEC.getName());
-      conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname);
+          conf.get(MRJobConstants.HADOOP_COMPRESS_CODEC, DEFAULT_CODEC.getName());
+      conf.set(MRJobConstants.JOB_MR_OUTPUT_CODEC, codecname);
     }
 
     return  new SqoopOutputFormatLoadExecutor(context).getRecordWriter();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
index 1c1133a..d2cf5e4 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.MRJobConstants;
+import org.apache.sqoop.job.MRExecutionError;
 import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
@@ -59,15 +59,15 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
       throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
 
-    String partitionerName = conf.get(JobConstants.JOB_ETL_PARTITIONER);
+    String partitionerName = conf.get(MRJobConstants.JOB_ETL_PARTITIONER);
     Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName);
 
-    PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
-    Object connectorConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
-    Object connectorJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
-    Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
+    PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
+    Object connectorConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
+    Object connectorJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
+    Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
 
-    long maxPartitions = conf.getLong(JobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
+    long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
     PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, schema);
 
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorConnection, connectorJob);
@@ -80,7 +80,7 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
     }
 
     if(splits.size() > maxPartitions) {
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0025,
+      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0025,
         String.format("Got %d, max was %d", splits.size(), maxPartitions));
     }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 03d84d4..d31aa20 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -31,8 +31,8 @@ import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.connector.matcher.Matcher;
 import org.apache.sqoop.connector.matcher.MatcherFactory;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.MRJobConstants;
+import org.apache.sqoop.job.MRExecutionError;
 import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
@@ -47,7 +47,7 @@ import org.apache.sqoop.utils.ClassUtils;
 public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable> {
 
   static {
-    ConfigurationUtils.configureLogging();
+    MRConfigurationUtils.configureLogging();
   }
   public static final Logger LOG = Logger.getLogger(SqoopMapper.class);
 
@@ -63,14 +63,14 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
   public void run(Context context) throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
 
-    String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
+    String extractorName = conf.get(MRJobConstants.JOB_ETL_EXTRACTOR);
     Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
 
     matcher = MatcherFactory.getMatcher(
-        ConfigurationUtils.getConnectorSchema(Direction.FROM, conf),
-        ConfigurationUtils.getConnectorSchema(Direction.TO, conf));
+        MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf),
+        MRConfigurationUtils.getConnectorSchema(Direction.TO, conf));
 
-    String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT);
+    String intermediateDataFormatName = conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT);
     fromDataFormat = (IntermediateDataFormat<String>) ClassUtils
         .instantiate(intermediateDataFormatName);
     fromDataFormat.setSchema(matcher.getFromSchema());
@@ -79,16 +79,16 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
     toDataFormat.setSchema(matcher.getToSchema());
 
     // Objects that should be passed to the Executor execution
-    PrefixContext subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
-    Object fromConfig = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
-    Object fromJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
+    PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
+    Object fromConfig = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
+    Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
 
     SqoopSplit split = context.getCurrentKey();
     ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context));
 
     try {
       LOG.info("Starting progress service");
-      progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
+      progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
 
       LOG.info("Running extractor class " + extractorName);
       extractor.extract(extractorContext, fromConfig, fromJob, split.getPartition());
@@ -96,7 +96,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
       context.getCounter(SqoopCounters.ROWS_READ)
               .increment(extractor.getRowsRead());
     } catch (Exception e) {
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
+      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0017, e);
     } finally {
       LOG.info("Stopping progress service");
       progressService.shutdown();
@@ -145,7 +145,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
         writable.setString(toDataFormat.getTextData());
         context.write(writable, NullWritable.get());
       } catch (Exception e) {
-        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e);
+        throw new SqoopException(MRExecutionError.MAPRED_EXEC_0013, e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
index 594b5e9..1148c4a 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.MRJobConstants;
 import org.apache.sqoop.job.io.SqoopWritable;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 1ebd3e4..9108981 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -36,8 +36,8 @@ import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.connector.matcher.Matcher;
 import org.apache.sqoop.connector.matcher.MatcherFactory;
-import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.MRJobConstants;
+import org.apache.sqoop.job.MRExecutionError;
 import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
@@ -75,10 +75,10 @@ public class SqoopOutputFormatLoadExecutor {
     context = jobctx;
     writer = new SqoopRecordWriter();
     matcher = MatcherFactory.getMatcher(
-        ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()),
-        ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()));
+        MRConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()),
+        MRConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()));
     dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context
-        .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
+        .getConfiguration().get(MRJobConstants.INTERMEDIATE_DATA_FORMAT));
     dataFormat.setSchema(matcher.getToSchema());
   }
 
@@ -141,7 +141,7 @@ public class SqoopOutputFormatLoadExecutor {
       //In the rare case, it was not a SqoopException
       Throwables.propagate(t);
     } catch (Exception ex) {
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019, ex);
+      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex);
     }
   }
 
@@ -186,7 +186,7 @@ public class SqoopOutputFormatLoadExecutor {
       } catch (Throwable t) {
         readerFinished = true;
         LOG.error("Caught exception e while getting content ", t);
-        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
+        throw new SqoopException(MRExecutionError.MAPRED_EXEC_0018, t);
       } finally {
         releaseSema();
       }
@@ -221,7 +221,7 @@ public class SqoopOutputFormatLoadExecutor {
         Configuration conf = null;
         if (!isTest) {
           conf = context.getConfiguration();
-          loaderName = conf.get(JobConstants.JOB_ETL_LOADER);
+          loaderName = conf.get(MRJobConstants.JOB_ETL_LOADER);
         }
         Loader loader = (Loader) ClassUtils.instantiate(loaderName);
 
@@ -233,11 +233,11 @@ public class SqoopOutputFormatLoadExecutor {
 
         if (!isTest) {
           // Using the TO schema since the IDF returns data in TO schema
-          schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
+          schema = MRConfigurationUtils.getConnectorSchema(Direction.TO, conf);
 
-          subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
-          configConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf);
-          configJob = ConfigurationUtils.getConnectorJobConfig(Direction.TO, conf);
+          subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
+          configConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf);
+          configJob = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf);
         }
 
         // Create loader context
@@ -252,7 +252,7 @@ public class SqoopOutputFormatLoadExecutor {
         // Release so that the writer can tell Sqoop something went
         // wrong.
         free.release();
-        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
+        throw new SqoopException(MRExecutionError.MAPRED_EXEC_0018, t);
       }
 
       // if no exception happens yet and reader finished before writer,
@@ -264,7 +264,7 @@ public class SqoopOutputFormatLoadExecutor {
         // Release so that the writer can tell Sqoop something went
         // wrong.
         free.release();
-        throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0019);
+        throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019);
 
       }
       // inform writer that reader is finished

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java
new file mode 100644
index 0000000..cd4f8b9
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopProgressRunnable.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.log4j.Logger;
+
+
+/**
+  * Runnable that will ping mapreduce context about progress.
+  */
+public class SqoopProgressRunnable implements Runnable {
+
+  public static final Logger LOG = Logger.getLogger(SqoopProgressRunnable.class);
+
+  /**
+   * Context class that we should use for reporting progress.
+   */
+  private final TaskInputOutputContext<?,?,?,?> context;
+
+  public SqoopProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctx) {
+    this.context = ctx;
+  }
+
+  @Override
+  public void run() {
+    LOG.debug("Auto-progress thread reporting progress");
+    this.context.progress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
index a55534a..cf023c3 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
@@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit;
 public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWritable, NullWritable> {
 
   static {
-    ConfigurationUtils.configureLogging();
+    MRConfigurationUtils.configureLogging();
   }
   public static final Logger LOG = Logger.getLogger(SqoopReducer.class);
 
@@ -46,7 +46,7 @@ public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWrit
   public void run(Context context) throws IOException, InterruptedException {
     try {
       LOG.info("Starting progress service");
-      progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
+      progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
 
       // Delegating all functionality to our parent
       super.run(context);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
index dca4c90..c2f5756 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopSplit.java
@@ -24,7 +24,7 @@ import java.io.IOException;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.MRExecutionError;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.utils.ClassUtils;
 
@@ -60,12 +60,12 @@ public class SqoopSplit extends InputSplit implements Writable {
     // instantiate Partition object
     Class<?> clz = ClassUtils.loadClass(className);
     if (clz == null) {
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0009, className);
+      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0009, className);
     }
     try {
       partition = (Partition) clz.newInstance();
     } catch (Exception e) {
-      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0010, className, e);
+      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0010, className, e);
     }
     // read Partition object content
     partition.readFields(in);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index e3b68e2..6d0dcb4 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -45,7 +45,7 @@ import org.apache.sqoop.job.etl.Partitioner;
 import org.apache.sqoop.job.etl.PartitionerContext;
 import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.job.io.SqoopWritable;
-import org.apache.sqoop.job.mr.ConfigurationUtils;
+import org.apache.sqoop.job.mr.MRConfigurationUtils;
 import org.apache.sqoop.job.mr.SqoopInputFormat;
 import org.apache.sqoop.job.mr.SqoopMapper;
 import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
@@ -68,8 +68,8 @@ public class TestMapReduce {
   @Test
   public void testInputFormat() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+    conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
       CSVIntermediateDataFormat.class.getName());
     Job job = new Job(conf);
 
@@ -87,17 +87,17 @@ public class TestMapReduce {
   @Test
   public void testMapper() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+    conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
       CSVIntermediateDataFormat.class.getName());
     Schema schema = new Schema("Test");
     schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
       .addColumn(new org.apache.sqoop.schema.type.Text("3"));
 
     Job job = new Job(conf);
-    ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
-    ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
+    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
+    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
     boolean success = JobUtils.runJob(job.getConfiguration(),
         SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class);
     Assert.assertEquals("Job failed!", true, success);
@@ -106,20 +106,20 @@ public class TestMapReduce {
   @Test
   public void testOutputFormat() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-    conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
-    conf.set(JobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
-    conf.set(JobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName());
-    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+    conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+    conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
+    conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName());
+    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
       CSVIntermediateDataFormat.class.getName());
     Schema schema = new Schema("Test");
     schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
       .addColumn(new Text("3"));
 
     Job job = new Job(conf);
-    ConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
-    ConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
+    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
+    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
     boolean success = JobUtils.runJob(job.getConfiguration(),
         SqoopInputFormat.class, SqoopMapper.class,
         SqoopNullOutputFormat.class);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
index 7f9a147..665a65b 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
@@ -42,7 +42,7 @@ import org.apache.sqoop.job.etl.Partitioner;
 import org.apache.sqoop.job.etl.PartitionerContext;
 import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.job.io.SqoopWritable;
-import org.apache.sqoop.job.mr.ConfigurationUtils;
+import org.apache.sqoop.job.mr.MRConfigurationUtils;
 import org.apache.sqoop.job.mr.SqoopInputFormat;
 import org.apache.sqoop.job.mr.SqoopMapper;
 import org.apache.sqoop.schema.Schema;
@@ -123,14 +123,14 @@ public class TestMatching {
   @Test
   public void testSchemaMatching() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-    conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-    conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+    conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+    conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
         CSVIntermediateDataFormat.class.getName());
 
     Job job = new Job(conf);
-    ConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);
-    ConfigurationUtils.setConnectorSchema(Direction.TO, job, to);
+    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);
+    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, to);
     JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
         DummyOutputFormat.class);
     boolean success = JobUtils.runJob(job.getConfiguration(),

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
index f5742a2..68ce5ed 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
@@ -31,7 +31,7 @@ import java.io.InputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
-import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.MRJobConstants;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cb821480/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
deleted file mode 100644
index 501e32c..0000000
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestConfigurationUtils.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.mr;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Config;
-import org.apache.sqoop.model.ConfigClass;
-import org.apache.sqoop.model.Input;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.Text;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Current tests are using mockito to propagate credentials from hadoop Job object
- * to hadoop JobConf object. This implementation was chosen because it's not clear
- * how MapReduce is converting one object to another.
- */
-public class TestConfigurationUtils {
-
-  Job job;
-  JobConf jobConfSpy;
-
-  @Before
-  public void setUp() throws Exception {
-    setUpHadoopJob();
-    setUpHadoopJobConf();
-  }
-
-  public void setUpHadoopJob() throws Exception {
-    job = new Job();
-  }
-
-  public void setUpHadoopJobConf() throws Exception {
-    jobConfSpy = spy(new JobConf(job.getConfiguration()));
-    when(jobConfSpy.getCredentials()).thenReturn(job.getCredentials());
-  }
-
-  @Test
-  public void testLinkConfiguration() throws Exception {
-    ConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, getConfig());
-    setUpHadoopJobConf();
-    assertEquals(getConfig(), ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, jobConfSpy));
-
-    ConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, getConfig());
-    setUpHadoopJobConf();
-    assertEquals(getConfig(), ConfigurationUtils.getConnectorConnectionConfig(Direction.TO, jobConfSpy));
-  }
-
-  @Test
-  public void testJobConfiguration() throws Exception {
-    ConfigurationUtils.setConnectorJobConfig(Direction.FROM, job, getConfig());
-    setUpHadoopJobConf();
-    assertEquals(getConfig(), ConfigurationUtils.getConnectorJobConfig(Direction.FROM, jobConfSpy));
-
-    ConfigurationUtils.setConnectorJobConfig(Direction.TO, job, getConfig());
-    setUpHadoopJobConf();
-    assertEquals(getConfig(), ConfigurationUtils.getConnectorJobConfig(Direction.TO, jobConfSpy));
-  }
-
-  @Test
-  public void testDriverConfiguration() throws Exception {
-    ConfigurationUtils.setDriverConfig(job, getConfig());
-    setUpHadoopJobConf();
-    assertEquals(getConfig(), ConfigurationUtils.getDriverConfig(jobConfSpy));
-  }
-
-  @Test
-  public void testConnectorSchema() throws Exception {
-    ConfigurationUtils.setConnectorSchema(Direction.FROM, job, getSchema("a"));
-    assertEquals(getSchema("a"), ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
-
-    ConfigurationUtils.setConnectorSchema(Direction.TO, job, getSchema("b"));
-    assertEquals(getSchema("b"), ConfigurationUtils.getConnectorSchema(Direction.TO, jobConfSpy));
-  }
-
-  @Test
-  public void testConnectorSchemaNull() throws Exception {
-    ConfigurationUtils.setConnectorSchema(Direction.FROM, job, null);
-    assertNull(ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
-
-    ConfigurationUtils.setConnectorSchema(Direction.TO, job, null);
-    assertNull(ConfigurationUtils.getConnectorSchema(Direction.FROM, jobConfSpy));
-  }
-
-  private Schema getSchema(String name) {
-    return new Schema(name).addColumn(new Text("c1"));
-  }
-
-  private TestConfiguration getConfig() {
-    TestConfiguration c = new TestConfiguration();
-    c.c.A = "This is secret text!";
-    return c;
-  }
-
-  @ConfigClass
-  public static class C {
-
-    @Input String A;
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (!(o instanceof C)) return false;
-
-      C c = (C) o;
-
-      if (A != null ? !A.equals(c.A) : c.A != null) return false;
-
-      return true;
-    }
-
-    @Override
-    public int hashCode() {
-      return A != null ? A.hashCode() : 0;
-    }
-  }
-
-  @ConfigurationClass
-  public static class TestConfiguration {
-    @Config C c;
-
-    public TestConfiguration() {
-      c = new C();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (!(o instanceof TestConfiguration)) return false;
-
-      TestConfiguration config = (TestConfiguration) o;
-
-      if (c != null ? !c.equals(config.c) : config.c != null)
-        return false;
-
-      return true;
-    }
-
-    @Override
-    public int hashCode() {
-      return c != null ? c.hashCode() : 0;
-    }
-  }
-}