You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/12/14 21:40:13 UTC

sqoop git commit: SQOOP-1882: JobManager currently ignores the TO connector IDF and assumed all IDF use String for the generic T

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 9df8a53dd -> 452791676


SQOOP-1882: JobManager currently ignores the TO connector IDF and assumed all IDF use String for the generic T

(Veena Basavaraj via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/45279167
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/45279167
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/45279167

Branch: refs/heads/sqoop2
Commit: 4527916766bfadf10654d0c694db470ef4641724
Parents: 9df8a53
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Sun Dec 14 12:39:41 2014 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Sun Dec 14 12:39:41 2014 -0800

----------------------------------------------------------------------
 .../idf/CSVIntermediateDataFormat.java          | 26 -------------
 .../connector/idf/IntermediateDataFormat.java   |  6 +--
 .../idf/TestCSVIntermediateDataFormat.java      |  2 +-
 .../org/apache/sqoop/driver/JobManager.java     | 35 +++++++++--------
 .../org/apache/sqoop/driver/JobRequest.java     | 25 ++++++++----
 .../mapreduce/MapreduceExecutionEngine.java     |  7 +++-
 .../org/apache/sqoop/job/MRJobConstants.java    |  7 +++-
 .../org/apache/sqoop/job/io/SqoopWritable.java  | 23 ++++++-----
 .../org/apache/sqoop/job/mr/SqoopMapper.java    | 40 ++++++++++----------
 .../job/mr/SqoopOutputFormatLoadExecutor.java   | 25 ++++++------
 .../org/apache/sqoop/job/TestMapReduce.java     | 11 ++++--
 .../java/org/apache/sqoop/job/TestMatching.java |  4 +-
 .../mr/TestSqoopOutputFormatLoadExecutor.java   | 10 ++---
 13 files changed, 113 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index dbe193d..275321a 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -23,24 +23,16 @@ import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.common.FileFormat;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.AbstractComplexListType;
 import org.apache.sqoop.schema.type.Column;
 import org.apache.sqoop.schema.type.ColumnType;
-import org.apache.sqoop.schema.type.FixedPoint;
-import org.apache.sqoop.schema.type.FloatingPoint;
 import org.apache.sqoop.utils.ClassUtils;
 import org.joda.time.DateTime;
 import org.joda.time.LocalDate;
 import org.joda.time.LocalDateTime;
 import org.joda.time.LocalTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
-import org.json.simple.parser.JSONParser;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -319,20 +311,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     return data.equals(((CSVIntermediateDataFormat) other).data);
   }
 
-  public int compareTo(IntermediateDataFormat<?> o) {
-    if (this == o) {
-      return 0;
-    }
-    if (this.equals(o)) {
-      return 0;
-    }
-    if (!(o instanceof CSVIntermediateDataFormat)) {
-      throw new IllegalStateException("Expected Data to be instance of "
-          + "CSVIntermediateFormat, but was an instance of " + o.getClass().getName());
-    }
-    return data.compareTo(o.getCSVTextData());
-  }
-
  /**
   * Encode to the sqoop prescribed CSV String for every element in the objet array
   * @param objectArray
@@ -392,10 +370,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     }
   }
 
-  public String toString() {
-    return data;
-  }
-
   /**
    * {@inheritDoc}
    */

http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index 93698a8..b8c8042 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -63,10 +63,10 @@ public abstract class IntermediateDataFormat<T> {
    * Set one row of data. If validate is set to true, the data is validated
    * against the schema.
    *
-   * @param data - A single row of data to be moved.
+   * @param obj - A single row of data to be moved.
    */
-  public void setData(T data) {
-    this.data = data;
+  public void setData(T obj) {
+    this.data = obj;
   }
   /**
    * Get one row of data as CSV text. Use {@link #SqoopIDFUtils} for reading and writing

http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index 83a95ec..f6852a0 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -51,7 +51,7 @@ import org.junit.Test;
 
 public class TestCSVIntermediateDataFormat {
 
-  private IntermediateDataFormat<?> dataFormat;
+  private CSVIntermediateDataFormat dataFormat;
 
   @Before
   public void setUp() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index 01073d4..ff263ae 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -332,7 +332,6 @@ public class JobManager implements Reconfigurable {
         .instantiate(Driver.getInstance().getDriverJobConfigurationClass());
     ConfigUtils.fromConfigs(job.getDriverConfig().getConfigs(), driverConfig);
 
-
     // Create a job request for submit/execution
     JobRequest jobRequest = executionEngine.createJobRequest();
     // Save important variables to the job request
@@ -350,19 +349,23 @@ public class JobManager implements Reconfigurable {
     jobRequest.setJobName(job.getName());
     jobRequest.setJobId(job.getPersistenceId());
     jobRequest.setNotificationUrl(notificationBaseUrl + jobId);
-    Class<? extends IntermediateDataFormat<?>> dataFormatClass = fromConnector
-        .getIntermediateDataFormat();
-    jobRequest.setIntermediateDataFormat(dataFormatClass);
+    jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat(), Direction.FROM);
+    jobRequest.setIntermediateDataFormat(toConnector.getIntermediateDataFormat(), Direction.TO);
 
     jobRequest.setFrom(fromConnector.getFrom());
     jobRequest.setTo(toConnector.getTo());
 
     // set all the jars
     addStandardJars(jobRequest);
-    addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass);
+    addConnectorClass(jobRequest, fromConnector);
+    addConnectorClass(jobRequest, toConnector);
+    addConnectorIDFClass(jobRequest, fromConnector.getIntermediateDataFormat());
+    addConnectorIDFClass(jobRequest, toConnector.getIntermediateDataFormat());
+
     addConnectorInitializerJars(jobRequest, Direction.FROM);
     addConnectorInitializerJars(jobRequest, Direction.TO);
-    addIDFJars(jobRequest);
+    addIDFDependentJars(jobRequest, Direction.FROM);
+    addIDFDependentJars(jobRequest, Direction.TO);
 
     // call the intialize method
     initializeConnector(jobRequest, Direction.FROM);
@@ -375,11 +378,12 @@ public class JobManager implements Reconfigurable {
     return jobRequest;
   }
 
-  private void addConnectorJars(JobRequest jobRequest, SqoopConnector fromConnector,
-      SqoopConnector toConnector, Class<? extends IntermediateDataFormat<?>> dataFormatClass) {
-    jobRequest.addJarForClass(fromConnector.getClass());
-    jobRequest.addJarForClass(toConnector.getClass());
-    jobRequest.addJarForClass(dataFormatClass);
+  private void addConnectorClass(final JobRequest jobRequest, final SqoopConnector connector) {
+    jobRequest.addJarForClass(connector.getClass());
+  }
+
+  private void addConnectorIDFClass(final JobRequest jobRequest, Class<? extends IntermediateDataFormat<?>>  idfClass) {
+    jobRequest.addJarForClass(idfClass);
   }
 
   private void addStandardJars(JobRequest jobRequest) {
@@ -455,11 +459,10 @@ public class JobManager implements Reconfigurable {
         jobRequest.getJobConfig(direction));
   }
 
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  // TODO:SQOOP-1882 , should add the FROM and TO connector IDF jars
-  private void addIDFJars(JobRequest jobRequest) {
-    Class<? extends IntermediateDataFormat> idfClass = jobRequest.getIntermediateDataFormat();
-    IntermediateDataFormat idf = (IntermediateDataFormat) ClassUtils.instantiate(idfClass);
+  @SuppressWarnings("unchecked")
+  private void addIDFDependentJars(JobRequest jobRequest, Direction direction) {
+    Class<? extends IntermediateDataFormat<?>> idfClass = jobRequest.getIntermediateDataFormat(direction);
+    IntermediateDataFormat<?> idf = ((IntermediateDataFormat<?>) ClassUtils.instantiate(idfClass));
     jobRequest.addJars(idf.getJars());
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
index 8c1cc95..c9377a7 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java
@@ -27,8 +27,10 @@ import org.apache.sqoop.job.etl.Transferable;
 import org.apache.sqoop.model.MSubmission;
 import org.apache.sqoop.utils.ClassUtils;
 
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Submission details class is used when creating new submission and contains
@@ -111,9 +113,14 @@ public class JobRequest {
   Integer loaders;
 
   /**
-   * The intermediate data format this submission should use.
+   * The intermediate data format this submission should use to read/extract.
    */
-  Class<? extends IntermediateDataFormat> intermediateDataFormat;
+  Class<? extends IntermediateDataFormat<?>> fromIDF;
+
+  /**
+   * The intermediate data format this submission should use to write/load.
+   */
+  Class<? extends IntermediateDataFormat<?>> toIDF;
 
   public JobRequest() {
     this.jars = new LinkedList<String>();
@@ -191,7 +198,7 @@ public class JobRequest {
     }
   }
 
-  public void addJarForClass(Class klass) {
+  public void addJarForClass(Class<?> klass) {
     addJar(ClassUtils.jarForClass(klass));
   }
 
@@ -318,12 +325,16 @@ public class JobRequest {
     this.loaders = loaders;
   }
 
-  public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() {
-    return intermediateDataFormat;
+  public Class<? extends IntermediateDataFormat<?>> getIntermediateDataFormat(Direction direction) {
+    return direction.equals(Direction.FROM) ? fromIDF : toIDF;
   }
 
-  public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) {
-    this.intermediateDataFormat = intermediateDataFormat;
+  public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat<? extends Object>> intermediateDataFormat, Direction direction) {
+    if (direction.equals(Direction.FROM)) {
+      fromIDF = intermediateDataFormat;
+    } else if (direction.equals(Direction.TO)) {
+      toIDF = intermediateDataFormat;
+    }
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 9b3eb44..3f79325 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -18,6 +18,7 @@
 package org.apache.sqoop.execution.mapreduce;
 
 import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.driver.ExecutionEngine;
 import org.apache.sqoop.driver.JobRequest;
@@ -69,8 +70,10 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     context.setString(MRJobConstants.JOB_ETL_LOADER, to.getLoader().getName());
     context.setString(MRJobConstants.JOB_ETL_FROM_DESTROYER, from.getDestroyer().getName());
     context.setString(MRJobConstants.JOB_ETL_TO_DESTROYER, to.getDestroyer().getName());
-    context.setString(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
-        mrJobRequest.getIntermediateDataFormat().getName());
+    context.setString(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT,
+        mrJobRequest.getIntermediateDataFormat(Direction.FROM).getName());
+    context.setString(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT,
+        mrJobRequest.getIntermediateDataFormat(Direction.TO).getName());
 
     if(mrJobRequest.getExtractors() != null) {
       context.setInteger(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
index 67021a8..b7aa8c6 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MRJobConstants.java
@@ -72,8 +72,11 @@ public final class MRJobConstants extends Constants {
   public static final String HADOOP_COMPRESS_CODEC =
     "mapred.output.compression.codec";
 
-  public static final String INTERMEDIATE_DATA_FORMAT =
-    DriverConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format";
+  public static final String FROM_INTERMEDIATE_DATA_FORMAT =
+    DriverConstants.PREFIX_EXECUTION_CONFIG + "from.intermediate.format";
+
+  public static final String TO_INTERMEDIATE_DATA_FORMAT =
+      DriverConstants.PREFIX_EXECUTION_CONFIG + "to.intermediate.format";
 
   private MRJobConstants() {
     // Disable explicit object creation

http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
index 336ab97..ed182cb 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
@@ -21,7 +21,6 @@ package org.apache.sqoop.job.io;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
-
 import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.job.MRJobConstants;
 import org.apache.sqoop.utils.ClassUtils;
@@ -30,8 +29,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+/**
+ * Writable used to load the data to the {@link #Transferable} entity TO
+ */
+
 public class SqoopWritable implements Configurable, WritableComparable<SqoopWritable> {
-  private IntermediateDataFormat<?> dataFormat;
+  private IntermediateDataFormat<?> toIDF;
   private Configuration conf;
 
   public SqoopWritable() {
@@ -39,22 +42,22 @@ public class SqoopWritable implements Configurable, WritableComparable<SqoopWrit
   }
 
   public SqoopWritable(IntermediateDataFormat<?> dataFormat) {
-    this.dataFormat = dataFormat;
+    this.toIDF = dataFormat;
   }
 
   public void setString(String data) {
-    this.dataFormat.setCSVTextData(data);
+    this.toIDF.setCSVTextData(data);
   }
 
-  public String getString() { return dataFormat.getCSVTextData(); }
+  public String getString() { return toIDF.getCSVTextData(); }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeUTF(dataFormat.getCSVTextData());
+    out.writeUTF(toIDF.getCSVTextData());
   }
 
   @Override
-  public void readFields(DataInput in) throws IOException { dataFormat.setCSVTextData(in.readUTF()); }
+  public void readFields(DataInput in) throws IOException { toIDF.setCSVTextData(in.readUTF()); }
 
   @Override
   public int compareTo(SqoopWritable o) { return getString().compareTo(o.getString()); }
@@ -68,9 +71,9 @@ public class SqoopWritable implements Configurable, WritableComparable<SqoopWrit
   public void setConf(Configuration conf) {
     this.conf = conf;
 
-    if (dataFormat == null) {
-      String intermediateDataFormatName = conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT);
-      this.dataFormat = (IntermediateDataFormat<?>) ClassUtils.instantiate(intermediateDataFormatName);
+    if (toIDF == null) {
+      String toIDFClass = conf.get(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT);
+      this.toIDF = (IntermediateDataFormat<?>) ClassUtils.instantiate(toIDFClass);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 7434243..dee0011 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -56,8 +56,8 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
    * Service for reporting progress to mapreduce.
    */
   private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor();
-  private IntermediateDataFormat<String> fromDataFormat = null;
-  private IntermediateDataFormat<String> toDataFormat = null;
+  private IntermediateDataFormat<Object> fromIDF = null;
+  private IntermediateDataFormat<Object> toIDF = null;
   private Matcher matcher;
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -72,11 +72,12 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
     Schema toSchema = MRConfigurationUtils.getConnectorSchema(Direction.TO, conf);
     matcher = MatcherFactory.getMatcher(fromSchema, toSchema);
 
-    String intermediateDataFormatName = conf.get(MRJobConstants.INTERMEDIATE_DATA_FORMAT);
-    fromDataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(intermediateDataFormatName);
-    fromDataFormat.setSchema(matcher.getFromSchema());
-    toDataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(intermediateDataFormatName);
-    toDataFormat.setSchema(matcher.getToSchema());
+    String fromIDFClass = conf.get(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT);
+    fromIDF = (IntermediateDataFormat<Object>) ClassUtils.instantiate(fromIDFClass);
+    fromIDF.setSchema(matcher.getFromSchema());
+    String toIDFClass = conf.get(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT);
+    toIDF = (IntermediateDataFormat<Object>) ClassUtils.instantiate(toIDFClass);
+    toIDF.setSchema(matcher.getToSchema());
 
     // Objects that should be passed to the Executor execution
     PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
@@ -107,45 +108,46 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
   }
 
   // There are two IDF objects we carry around in memory during the sqoop job execution.
-  // The fromDataFormat has the fromSchema in it, the toDataFormat has the toSchema in it.
-  // Before we do the writing to the toDatFormat object we do the matching process to negotiate between 
-  // the two schemas and their corresponding column types before we write the data to the toDataFormat object
+  // The fromIDF has the fromSchema in it, the toIDF has the toSchema in it.
+  // Before we do the writing to the toIDF object we do the matching process to negotiate between
+  // the two schemas and their corresponding column types before we write the data to the toIDF object
   private class SqoopMapDataWriter extends DataWriter {
     private Context context;
     private SqoopWritable writable;
 
     public SqoopMapDataWriter(Context context) {
       this.context = context;
-      this.writable = new SqoopWritable(toDataFormat);
+      this.writable = new SqoopWritable(toIDF);
     }
 
     @Override
     public void writeArrayRecord(Object[] array) {
-      fromDataFormat.setObjectData(array);
+      fromIDF.setObjectData(array);
       writeContent();
     }
 
     @Override
     public void writeStringRecord(String text) {
-      fromDataFormat.setCSVTextData(text);
+      fromIDF.setCSVTextData(text);
       writeContent();
     }
 
     @Override
     public void writeRecord(Object obj) {
-      fromDataFormat.setData(obj.toString());
+      fromIDF.setData(obj);
       writeContent();
     }
 
     private void writeContent() {
       try {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Extracted data: " + fromDataFormat.getCSVTextData());
+          LOG.debug("Extracted data: " + fromIDF.getCSVTextData());
         }
-        // NOTE: The fromDataFormat and the corresponding fromSchema is used only for the matching process
-        // The output of the mappers is finally written to the toDataFormat object after the matching process
-        // since the writable encapsulates the toDataFormat ==> new SqoopWritable(toDataFormat)
-        toDataFormat.setObjectData(matcher.getMatchingData(fromDataFormat.getObjectData()));
+        // NOTE: The fromIDF and the corresponding fromSchema is used only for the matching process
+        // The output of the mappers is finally written to the toIDF object after the matching process
+        // since the writable encapsulates the toIDF ==> new SqoopWritable(toIDF)
+        toIDF.setObjectData(matcher.getMatchingData(fromIDF.getObjectData()));
+        // NOTE: We do not use the reducer to do the writing (a.k.a LOAD in ETL). Hence the mapper sets up the writable
         context.write(writable, NullWritable.get());
       } catch (Exception e) {
         throw new SqoopException(MRExecutionError.MAPRED_EXEC_0013, e);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index d664337..aaf771c 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.concurrent.*;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -32,7 +33,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.connector.matcher.Matcher;
 import org.apache.sqoop.connector.matcher.MatcherFactory;
@@ -53,7 +53,7 @@ public class SqoopOutputFormatLoadExecutor {
 
   private volatile boolean readerFinished = false;
   private volatile boolean writerFinished = false;
-  private volatile IntermediateDataFormat<String> dataFormat;
+  private volatile IntermediateDataFormat<? extends Object> toDataFormat;
   private Matcher matcher;
   private JobContext context;
   private SqoopRecordWriter writer;
@@ -63,11 +63,11 @@ public class SqoopOutputFormatLoadExecutor {
   private volatile boolean isTest = false;
   private String loaderName;
 
-  // NOTE: This method is only exposed for test cases and hence assumes CSVIntermediateDataFormat
-  SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){
+  // NOTE: This method is only exposed for test cases
+  SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName, IntermediateDataFormat<?> idf) {
     this.isTest = isTest;
     this.loaderName = loaderName;
-    dataFormat = new CSVIntermediateDataFormat();
+    toDataFormat = idf;
     writer = new SqoopRecordWriter();
     matcher = null;
   }
@@ -78,10 +78,10 @@ public class SqoopOutputFormatLoadExecutor {
     matcher = MatcherFactory.getMatcher(
         MRConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration()),
         MRConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration()));
-    dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context
-        .getConfiguration().get(MRJobConstants.INTERMEDIATE_DATA_FORMAT));
+    toDataFormat = (IntermediateDataFormat<?>) ClassUtils.instantiate(context
+        .getConfiguration().get(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT));
     // Using the TO schema since the SqoopDataWriter in the SqoopMapper encapsulates the toDataFormat
-    dataFormat.setSchema(matcher.getToSchema());
+    toDataFormat.setSchema(matcher.getToSchema());
   }
 
   public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
@@ -102,7 +102,7 @@ public class SqoopOutputFormatLoadExecutor {
       free.acquire();
       checkIfConsumerThrew();
       // NOTE: this is the place where data written from SqoopMapper writable is available to the SqoopOutputFormat
-      dataFormat.setCSVTextData(key.getString());
+      toDataFormat.setCSVTextData(key.getString());
       filled.release();
     }
 
@@ -158,7 +158,7 @@ public class SqoopOutputFormatLoadExecutor {
         return null;
       }
       try {
-        return dataFormat.getObjectData();
+        return toDataFormat.getObjectData();
       } finally {
         releaseSema();
       }
@@ -172,7 +172,7 @@ public class SqoopOutputFormatLoadExecutor {
         return null;
       }
       try {
-        return dataFormat.getCSVTextData();
+        return toDataFormat.getCSVTextData();
       } finally {
         releaseSema();
       }
@@ -185,7 +185,7 @@ public class SqoopOutputFormatLoadExecutor {
         return null;
       }
       try {
-        return dataFormat.getData();
+        return toDataFormat.getData();
       } catch (Throwable t) {
         readerFinished = true;
         LOG.error("Caught exception e while getting content ", t);
@@ -215,6 +215,7 @@ public class SqoopOutputFormatLoadExecutor {
 
   private class ConsumerThread implements Runnable {
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
     public void run() {
       LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 256c34d..47696cc 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -67,7 +67,9 @@ public class TestMapReduce {
   public void testSqoopInputFormat() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+    conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+    conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+
     Job job = new Job(conf);
 
     SqoopInputFormat inputformat = new SqoopInputFormat();
@@ -86,7 +88,9 @@ public class TestMapReduce {
     Configuration conf = new Configuration();
     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+    conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+    conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+
     Job job = new Job(conf);
     // from and to have the same schema in this test case
     MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema());
@@ -106,7 +110,8 @@ public class TestMapReduce {
     conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
     conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
     conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName());
-    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+    conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+    conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
 
     Job job = new Job(conf);
     // from and to have the same schema in this test case

http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
index 67c8525..1692ddb 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
@@ -128,8 +128,8 @@ public class TestMatching {
     Configuration conf = new Configuration();
     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
-        CSVIntermediateDataFormat.class.getName());
+    conf.set(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+    conf.set(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
 
     Job job = new Job(conf);
     MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/45279167/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index d897125..ec0e886 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -121,7 +121,7 @@ public class TestSqoopOutputFormatLoadExecutor {
   @Before
   public void setUp() {
     conf = new Configuration();
-    conf.setIfUnset(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+    conf.setIfUnset(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
 
   }
 
@@ -129,7 +129,7 @@ public class TestSqoopOutputFormatLoadExecutor {
   public void testWhenLoaderThrows() throws Throwable {
     conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
     SqoopOutputFormatLoadExecutor executor = new
-        SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
+        SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName(), new CSVIntermediateDataFormat());
     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
     IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
     SqoopWritable writable = new SqoopWritable(dataFormat);
@@ -147,7 +147,7 @@ public class TestSqoopOutputFormatLoadExecutor {
   public void testSuccessfulContinuousLoader() throws Throwable {
     conf.set(MRJobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName());
     SqoopOutputFormatLoadExecutor executor = new
-        SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName());
+        SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName(), new CSVIntermediateDataFormat());
     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
     IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
     SqoopWritable writable = new SqoopWritable(dataFormat);
@@ -168,7 +168,7 @@ public class TestSqoopOutputFormatLoadExecutor {
   @Test (expected = SqoopException.class)
   public void testSuccessfulLoader() throws Throwable {
     SqoopOutputFormatLoadExecutor executor = new
-        SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName());
+        SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName(), new CSVIntermediateDataFormat());
     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
     IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
     SqoopWritable writable = new SqoopWritable(dataFormat);
@@ -192,7 +192,7 @@ public class TestSqoopOutputFormatLoadExecutor {
   public void testThrowingContinuousLoader() throws Throwable {
     conf.set(MRJobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName());
     SqoopOutputFormatLoadExecutor executor = new
-        SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName());
+        SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName(), new CSVIntermediateDataFormat());
     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
     IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
     SqoopWritable writable = new SqoopWritable(dataFormat);