You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/08/12 00:01:06 UTC

[03/19] git commit: SQOOP-777: Sqoop2: Implement intermediate data format representation policy

SQOOP-777: Sqoop2: Implement intermediate data format representation policy

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3c93930b
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3c93930b
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3c93930b

Branch: refs/heads/SQOOP-1367
Commit: 3c93930bf3d35a3910541ec3099b44c32bf7adf7
Parents: 17c7219
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Sat Jul 26 11:37:50 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Sat Jul 26 11:37:50 2014 -0700

----------------------------------------------------------------------
 common/pom.xml                                  |   5 +
 .../org/apache/sqoop/etl/io/DataReader.java     |  25 +-
 .../org/apache/sqoop/etl/io/DataWriter.java     |  20 +-
 .../org/apache/sqoop/schema/type/Column.java    |   5 +
 .../connector/jdbc/GenericJdbcConnector.java    |   4 +-
 .../jdbc/GenericJdbcConnectorError.java         |   2 +
 .../jdbc/GenericJdbcExportInitializer.java      |  53 ++-
 .../jdbc/GenericJdbcImportInitializer.java      |   7 +-
 .../sqoop/connector/jdbc/TestExportLoader.java  |  10 +-
 .../connector/jdbc/TestImportExtractor.java     |   9 +-
 .../connector/jdbc/TestImportInitializer.java   |   2 +-
 connector/connector-sdk/pom.xml                 |   6 +
 .../idf/CSVIntermediateDataFormat.java          | 355 +++++++++++++++++++
 .../connector/idf/IntermediateDataFormat.java   | 143 ++++++++
 .../idf/IntermediateDataFormatError.java        |  57 +++
 .../idf/CSVIntermediateDataFormatTest.java      | 222 ++++++++++++
 .../org/apache/sqoop/framework/JobManager.java  |   8 +
 .../sqoop/framework/SubmissionRequest.java      |  15 +
 execution/mapreduce/pom.xml                     |   4 +
 .../mapreduce/MapreduceExecutionEngine.java     |  61 ++--
 .../java/org/apache/sqoop/job/JobConstants.java |   4 +
 .../sqoop/job/etl/HdfsExportExtractor.java      |  12 +-
 .../sqoop/job/etl/HdfsSequenceImportLoader.java |  10 +-
 .../sqoop/job/etl/HdfsTextImportLoader.java     |  12 +-
 .../org/apache/sqoop/job/io/SqoopWritable.java  |  59 +++
 .../sqoop/job/mr/SqoopFileOutputFormat.java     |   7 +-
 .../org/apache/sqoop/job/mr/SqoopMapper.java    |  47 +--
 .../sqoop/job/mr/SqoopNullOutputFormat.java     |   6 +-
 .../job/mr/SqoopOutputFormatLoadExecutor.java   |  85 +++--
 .../org/apache/sqoop/job/mr/SqoopReducer.java   |   4 +-
 .../mapreduce/MapreduceExecutionEngineTest.java |   3 +
 .../java/org/apache/sqoop/job/JobUtils.java     |  14 +-
 .../org/apache/sqoop/job/TestHdfsExtract.java   | 121 +++----
 .../java/org/apache/sqoop/job/TestHdfsLoad.java |  58 ++-
 .../org/apache/sqoop/job/TestMapReduce.java     |  47 ++-
 .../apache/sqoop/job/io/SqoopWritableTest.java  |  91 +++++
 .../mr/TestSqoopOutputFormatLoadExecutor.java   |  54 +--
 pom.xml                                         |  11 +-
 spi/pom.xml                                     |   5 +
 .../sqoop/connector/spi/SqoopConnector.java     |  12 +
 .../mapreduce/MapreduceSubmissionEngine.java    |   1 +
 41 files changed, 1408 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index db11b5b..9bfa07d 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -45,6 +45,11 @@ limitations under the License.
       <groupId>commons-lang</groupId>
       <artifactId>commons-lang</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java b/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java
index 3e1adc7..a34dfb4 100644
--- a/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java
+++ b/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java
@@ -18,17 +18,32 @@
 package org.apache.sqoop.etl.io;
 
 /**
- * An intermediate layer for passing data from the MR framework
+ * An intermediate layer for passing data from the execution framework
  * to the ETL framework.
  */
 public abstract class DataReader {
 
+  /**
+   * Read data from the execution framework as an object array.
+   * @return - array of objects with each column represented as an object
+   * @throws Exception
+   */
   public abstract Object[] readArrayRecord() throws Exception;
 
-  public abstract String readCsvRecord() throws Exception;
+  /**
+   * Read data from execution framework as text - as a CSV record.
+   * public abstract Object readContent(int type) throws Exception;
+   * @return - CSV formatted data.
+   * @throws Exception
+   */
+  public abstract String readTextRecord() throws Exception;
 
-  public abstract Object readContent(int type) throws Exception;
-
-  public abstract void setFieldDelimiter(char fieldDelimiter);
+  /**
+   * Read data from execution framework as a native format.
+   * @return - the content in the native format of the intermediate data
+   * format being used.
+   * @throws Exception
+   */
+  public abstract Object readContent() throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java b/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java
index d81364e..2166b09 100644
--- a/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java
+++ b/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java
@@ -23,12 +23,24 @@ package org.apache.sqoop.etl.io;
  */
 public abstract class DataWriter {
 
+  /**
+   * Write an array of objects into the execution framework
+   * @param array - data to be written
+   */
   public abstract void writeArrayRecord(Object[] array);
 
-  public abstract void writeCsvRecord(String csv);
+  /**
+   * Write data into execution framework as text. The Intermediate Data Format
+   * may choose to convert the data to another format based on how the data
+   * format is implemented
+   * @param text - data represented as CSV text.
+   */
+  public abstract void writeStringRecord(String text);
 
-  public abstract void writeContent(Object content, int type);
-
-  public abstract void setFieldDelimiter(char fieldDelimiter);
+  /**
+   * Write data in the intermediate data format's native format.
+   * @param obj - data to be written
+   */
+  public abstract void writeRecord(Object obj);
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/common/src/main/java/org/apache/sqoop/schema/type/Column.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/type/Column.java b/common/src/main/java/org/apache/sqoop/schema/type/Column.java
index 8b630b2..30c26a3 100644
--- a/common/src/main/java/org/apache/sqoop/schema/type/Column.java
+++ b/common/src/main/java/org/apache/sqoop/schema/type/Column.java
@@ -98,4 +98,9 @@ public abstract class Column {
     result = 31 * result + (nullable != null ? nullable.hashCode() : 0);
     return result;
   }
+
+  public boolean validate(Object o) {
+    // TODO: Implement this in all subclasses!
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
index e0da80f..298288e 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
@@ -21,6 +21,8 @@ import java.util.Locale;
 import java.util.ResourceBundle;
 
 import org.apache.sqoop.common.VersionInfo;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
@@ -61,7 +63,7 @@ public class GenericJdbcConnector extends SqoopConnector {
   @Override
   public ResourceBundle getBundle(Locale locale) {
     return ResourceBundle.getBundle(
-        GenericJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale);
+      GenericJdbcConnectorConstants.RESOURCE_BUNDLE_NAME, locale);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
index 2b1a0ad..c374750 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorError.java
@@ -78,6 +78,8 @@ public enum GenericJdbcConnectorError implements ErrorCode {
 
   GENERIC_JDBC_CONNECTOR_0018("Error occurred while transferring data from " +
     "stage table to destination table."),
+
+  GENERIC_JDBC_CONNECTOR_0019("Table name extraction not supported.")
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
index ef39cdc..80253be 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
@@ -17,6 +17,9 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -26,9 +29,11 @@ import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
+import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
 import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
 import org.apache.sqoop.utils.ClassUtils;
 
 public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> {
@@ -58,7 +63,53 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur
 
   @Override
   public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration, ExportJobConfiguration exportJobConfiguration) {
-    return null;
+    configureJdbcProperties(context.getContext(), connectionConfiguration, exportJobConfiguration);
+
+    String schemaName = exportJobConfiguration.table.tableName;
+
+    if (schemaName == null) {
+      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0019,
+          "Table name extraction not supported yet.");
+    }
+
+    if(exportJobConfiguration.table.schemaName != null) {
+      schemaName = exportJobConfiguration.table.schemaName + "." + schemaName;
+    }
+
+    Schema schema = new Schema(schemaName);
+    ResultSet rs = null;
+    ResultSetMetaData rsmt = null;
+    try {
+      rs = executor.executeQuery("SELECT * FROM " + schemaName + " WHERE 1 = 0");
+
+      rsmt = rs.getMetaData();
+      for (int i = 1 ; i <= rsmt.getColumnCount(); i++) {
+        Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i));
+
+        String columnName = rsmt.getColumnName(i);
+        if (columnName == null || columnName.equals("")) {
+          columnName = rsmt.getColumnLabel(i);
+          if (null == columnName) {
+            columnName = "Column " + i;
+          }
+        }
+
+        column.setName(columnName);
+        schema.addColumn(column);
+      }
+
+      return schema;
+    } catch (SQLException e) {
+      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
+    } finally {
+      if(rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          LOG.info("Ignoring exception while closing ResultSet", e);
+        }
+      }
+    }
   }
 
   private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig, ExportJobConfiguration jobConfig) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
index 96818ba..2ad3cb2 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
@@ -71,16 +71,17 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
     String schemaName = importJobConfiguration.table.tableName;
     if(schemaName == null) {
       schemaName = "Query";
+    } else if(importJobConfiguration.table.schemaName != null) {
+      schemaName = importJobConfiguration.table.schemaName + "." + schemaName;
     }
 
     Schema schema = new Schema(schemaName);
-
     ResultSet rs = null;
     ResultSetMetaData rsmt = null;
     try {
       rs = executor.executeQuery(
-        context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)
-          .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")
+          context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)
+              .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")
       );
 
       rsmt = rs.getMetaData();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
index d4c4565..fc3ddd0 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
@@ -113,11 +113,6 @@ public class TestExportLoader {
     int index = 0;
 
     @Override
-    public void setFieldDelimiter(char fieldDelimiter) {
-      // do nothing and use default delimiter
-    }
-
-    @Override
     public Object[] readArrayRecord() {
       if (index < numberOfRows) {
         Object[] array = new Object[] {
@@ -132,16 +127,17 @@ public class TestExportLoader {
     }
 
     @Override
-    public String readCsvRecord() {
+    public String readTextRecord() {
       fail("This method should not be invoked.");
       return null;
     }
 
     @Override
-    public Object readContent(int type) {
+    public Object readContent() throws Exception {
       fail("This method should not be invoked.");
       return null;
     }
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
index a7ed6ba..30d0b9a 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
@@ -134,11 +134,6 @@ public class TestImportExtractor extends TestCase {
     int indx = START;
 
     @Override
-    public void setFieldDelimiter(char fieldDelimiter) {
-      // do nothing and use default delimiter
-    }
-
-    @Override
     public void writeArrayRecord(Object[] array) {
       for (int i = 0; i < array.length; i++) {
         if (array[i] instanceof Integer) {
@@ -153,12 +148,12 @@ public class TestImportExtractor extends TestCase {
     }
 
     @Override
-    public void writeCsvRecord(String csv) {
+    public void writeStringRecord(String text) {
       fail("This method should not be invoked.");
     }
 
     @Override
-    public void writeContent(Object content, int type) {
+    public void writeRecord(Object content) {
       fail("This method should not be invoked.");
     }
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
index a33fa36..cd05e30 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
@@ -327,7 +327,7 @@ public class TestImportInitializer extends TestCase {
     Initializer initializer = new GenericJdbcImportInitializer();
     initializer.initialize(initializerContext, connConf, jobConf);
     Schema schema = initializer.getSchema(initializerContext, connConf, jobConf);
-    assertEquals(getSchema(tableName), schema);
+    assertEquals(getSchema(jobConf.table.schemaName + "." + tableName), schema);
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/pom.xml b/connector/connector-sdk/pom.xml
index 4056e14..f54837d 100644
--- a/connector/connector-sdk/pom.xml
+++ b/connector/connector-sdk/pom.xml
@@ -38,6 +38,12 @@ limitations under the License.
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-common</artifactId>
+    </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
new file mode 100644
index 0000000..39d48c7
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.apache.sqoop.schema.type.Type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
+
+  public static final char SEPARATOR_CHARACTER = ',';
+  public static final char ESCAPE_CHARACTER = '\\';
+  public static final char QUOTE_CHARACTER = '\'';
+
+  private static final Logger LOG = Logger.getLogger
+    (CSVIntermediateDataFormat.class);
+
+  private static final char[] originals = {
+    0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27
+  };
+
+  private static final String[] replacements = {
+    new String(new char[] { ESCAPE_CHARACTER, '\\'}),
+    new String(new char[] { ESCAPE_CHARACTER, '0'}),
+    new String(new char[] { ESCAPE_CHARACTER, 'n'}),
+    new String(new char[] { ESCAPE_CHARACTER, 'r'}),
+    new String(new char[] { ESCAPE_CHARACTER, 'Z'}),
+    new String(new char[] { ESCAPE_CHARACTER, '\"'}),
+    new String(new char[] { ESCAPE_CHARACTER, '\''})
+  };
+
+  // ISO-8859-1 is an 8-bit codec that is supported in every java implementation.
+  public static final String BYTE_FIELD_CHARSET = "ISO-8859-1";
+
+  private final List<Integer> stringFieldIndices = new ArrayList<Integer>();
+  private final List<Integer> byteFieldIndices = new ArrayList<Integer>();
+
+  private Schema schema;
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String getTextData() {
+    return data;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setTextData(String text) {
+    this.data = text;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Schema getSchema() {
+    return schema;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setSchema(Schema schema) {
+    if(schema == null) {
+      return;
+    }
+    this.schema = schema;
+    List<Column> columns = schema.getColumns();
+    int i = 0;
+    for(Column col : columns) {
+      if(col.getType() == Type.TEXT) {
+        stringFieldIndices.add(i);
+      } else if(col.getType() == Type.BINARY) {
+        byteFieldIndices.add(i);
+      }
+      i++;
+    }
+  }
+
+  /**
+   * Custom CSV parser that honors quoting and escaped quotes.
+   * All other escaping is handled elsewhere.
+   *
+   * @return String[]
+   */
+  private String[] getFields() {
+    if (data == null) {
+      return null;
+    }
+
+    boolean quoted = false;
+    boolean escaped = false;
+    List<String> parsedData = new LinkedList<String>();
+    StringBuffer buffer = new StringBuffer();
+    for (int i = 0; i < data.length(); ++i) {
+      char c = data.charAt(i);
+      switch(c) {
+        case QUOTE_CHARACTER:
+          buffer.append(c);
+          if (escaped) {
+            escaped = false;
+          } else {
+            quoted = !quoted;
+          }
+          break;
+
+        case ESCAPE_CHARACTER:
+          buffer.append(ESCAPE_CHARACTER);
+          escaped = !escaped;
+          break;
+
+        case SEPARATOR_CHARACTER:
+          if (quoted) {
+            buffer.append(c);
+          } else {
+            parsedData.add(buffer.toString());
+            buffer = new StringBuffer();
+          }
+          break;
+
+        default:
+          if (escaped) {
+            escaped = false;
+          }
+          buffer.append(c);
+          break;
+      }
+    }
+    parsedData.add(buffer.toString());
+
+    return parsedData.toArray(new String[parsedData.size()]);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Object[] getObjectData() {
+    String[] fields = getFields();
+    if (fields == null) {
+      return null;
+    }
+
+    if (fields.length != schema.getColumns().size()) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+        "The data " + getTextData() + " has the wrong number of fields.");
+    }
+
+    Object[] out = new Object[fields.length];
+    Column[] cols = schema.getColumns().toArray(new Column[fields.length]);
+    for (int i = 0; i < fields.length; i++) {
+      Type colType = cols[i].getType();
+      if (fields[i].equals("NULL")) {
+        out[i] = null;
+        continue;
+      }
+      if (colType == Type.TEXT) {
+        out[i] = unescapeStrings(fields[i]);
+      } else if (colType == Type.BINARY) {
+        out[i] = unescapeByteArray(fields[i]);
+      } else if (colType == Type.FIXED_POINT) {
+        Long byteSize = ((FixedPoint) cols[i]).getByteSize();
+        if (byteSize != null && byteSize <= Integer.SIZE) {
+          out[i] = Integer.valueOf(fields[i]);
+        } else {
+          out[i] = Long.valueOf(fields[i]);
+        }
+      } else if (colType == Type.FLOATING_POINT) {
+        Long byteSize = ((FloatingPoint) cols[i]).getByteSize();
+        if (byteSize != null && byteSize <= Float.SIZE) {
+          out[i] = Float.valueOf(fields[i]);
+        } else {
+          out[i] = Double.valueOf(fields[i]);
+        }
+      } else if (colType == Type.DECIMAL) {
+        out[i] = new BigDecimal(fields[i]);
+      } else {
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, "Column type from schema was not recognized for " + colType);
+      }
+    }
+    return out;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @VisibleForTesting
+  @Override
+  public void setObjectData(Object[] data) {
+    escapeArray(data);
+    this.data = StringUtils.join(data, SEPARATOR_CHARACTER);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(this.data);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void read(DataInput in) throws IOException {
+    data = in.readUTF();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean equals(Object other) {
+    if(this == other) {
+      return true;
+    }
+    if(other == null || !(other instanceof CSVIntermediateDataFormat)) {
+      return false;
+    }
+    return data.equals(((CSVIntermediateDataFormat)other).data);
+  }
+
+  public int compareTo(IntermediateDataFormat<?> o) {
+    if(this == o) {
+      return 0;
+    }
+    if(this.equals(o)) {
+      return 0;
+    }
+    if(!(o instanceof CSVIntermediateDataFormat)) {
+      throw new IllegalStateException("Expected Data to be instance of " +
+        "CSVIntermediateFormat, but was an instance of " + o.getClass()
+        .getName());
+    }
+    return data.compareTo(o.getTextData());
+  }
+
+  /**
+   * If the incoming data is an array, parse it and return the CSV-ised version
+   *
+   * @param array
+   */
+  private void escapeArray(Object[] array) {
+    for (int i : stringFieldIndices) {
+      array[i] = escapeStrings((String) array[i]);
+    }
+    for (int i : byteFieldIndices) {
+      array[i] = escapeByteArrays((byte[]) array[i]);
+    }
+  }
+
+  private String escapeByteArrays(byte[] bytes) {
+    try {
+      return escapeStrings(new String(bytes, BYTE_FIELD_CHARSET));
+    } catch (UnsupportedEncodingException e) {
+      // We should never hit this case.
+      // This character set should be distributed with Java.
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The character set " + BYTE_FIELD_CHARSET + " is not available.");
+    }
+  }
+
+  private String getRegExp(char orig) {
+    return getRegExp(String.valueOf(orig));
+  }
+
+  private String getRegExp(String orig) {
+    return orig.replaceAll("\\\\", Matcher.quoteReplacement("\\\\"));
+  }
+
+  private String escapeStrings(String orig) {
+    int j = 0;
+    String replacement = orig;
+    try {
+      for (j = 0; j < replacements.length; j++) {
+        replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j]));
+      }
+    } catch (Exception e) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002, orig + "  " + replacement + "  " + String.valueOf(j) + "  " + e.getMessage());
+    }
+    StringBuilder  builder = new StringBuilder();
+    builder.append(QUOTE_CHARACTER).append(replacement).append(QUOTE_CHARACTER);
+    return builder.toString();
+  }
+
+  private String unescapeStrings(String orig) {
+    //Remove the trailing and starting quotes.
+    orig = orig.substring(1, orig.length() - 1);
+    int j = 0;
+    try {
+      for (j = 0; j < replacements.length; j++) {
+        orig = orig.replaceAll(getRegExp(replacements[j]),
+          Matcher.quoteReplacement(String.valueOf(originals[j])));
+      }
+    } catch (Exception e) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0003, orig + "  " + String.valueOf(j) + e.getMessage());
+    }
+
+    return orig;
+  }
+
+  private byte[] unescapeByteArray(String orig) {
+    // Always encoded in BYTE_FIELD_CHARSET.
+    try {
+      return unescapeStrings(orig).getBytes(BYTE_FIELD_CHARSET);
+    } catch (UnsupportedEncodingException e) {
+      // Should never hit this case.
+      // This character set should be distributed with Java.
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The character set " + BYTE_FIELD_CHARSET + " is not available.");
+    }
+  }
+
+  public String toString() {
+    return data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
new file mode 100644
index 0000000..91b594e
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.Type;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Abstract class representing a pluggable intermediate data format the Sqoop
+ * framework will use to move data to/from the connector. All intermediate
+ * data formats are expected to have an internal/native implementation,
+ * but also should minimally be able to return a text (CSV) version of the
+ * data. The data format should also be able to return the data as an object
+ * array - each array representing one row.
+ * <p/>
+ * Why a "native" internal format and then return text too?
+ * Imagine a connector that moves data from a system that stores data as a
+ * serialization format called FooFormat. If I also need the data to be
+ * written into HDFS as FooFormat, the additional cycles burnt in converting
+ * the FooFormat to text and back is useless - so plugging in an intermediate
+ * format that can store the data as FooFormat saves those cycles!
+ * <p/>
+ * Most fast access mechanisms, like mysqldump or pgsqldump write the data
+ * out as CSV, and most often the destination data is also represented as CSV
+ * - so having a minimal CSV support is important, so we can easily pull the
+ * data out as text.
+ * <p/>
+ * Any conversion to the final format from the native or text format is to be
+ * done by the connector or OutputFormat classes.
+ *
+ * @param <T> - Each data format may have a native representation of the
+ *            data, represented by the parameter.
+ */
+public abstract class IntermediateDataFormat<T> {
+
+  protected volatile T data;
+
+  public int hashCode() {
+    return data.hashCode();
+  }
+
+  /**
+   * Set one row of data. If validate is set to true, the data is validated
+   * against the schema.
+   *
+   * @param data - A single row of data to be moved.
+   */
+  public void setData(T data) {
+    this.data = data;
+  }
+
+  /**
+   * Get one row of data.
+   *
+   * @return - One row of data, represented in the internal/native format of
+   *         the intermediate data format implementation.
+   */
+  public T getData() {
+    return data;
+  }
+
+  /**
+   * Get one row of data as CSV.
+   *
+   * @return - String representing the data in CSV
+   */
+  public abstract String getTextData();
+
+  /**
+   * Set one row of data as CSV.
+   *
+   */
+  public abstract void setTextData(String text);
+
+  /**
+   * Get one row of data as an Object array.
+   *
+   * @return - String representing the data as an Object array
+   */
+  public abstract Object[] getObjectData();
+
+  /**
+   * Set one row of data as an Object array.
+   *
+   */
+  public abstract void setObjectData(Object[] data);
+
+  /**
+   * Set the schema to be used.
+   *
+   * @param schema - the schema to be used
+   */
+  public abstract void setSchema(Schema schema);
+
+  /**
+   * Get the schema of the data.
+   *
+   * @return - The schema of the data.
+   */
+  public abstract Schema getSchema();
+
+  /**
+   * Serialize the fields of this object to <code>out</code>.
+   *
+   * @param out <code>DataOuput</code> to serialize this object into.
+   * @throws IOException
+   */
+  public abstract void write(DataOutput out) throws IOException;
+
+  /**
+   * Deserialize the fields of this object from <code>in</code>.
+   *
+   * <p>For efficiency, implementations should attempt to re-use storage in the
+   * existing object where possible.</p>
+   *
+   * @param in <code>DataInput</code> to deseriablize this object from.
+   * @throws IOException
+   */
+  public abstract void read(DataInput in) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
new file mode 100644
index 0000000..9219074
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum IntermediateDataFormatError implements ErrorCode {
+  /** An unknown error has occurred. */
+  INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
+
+  /** An encoding is missing in the Java native libraries. */
+  INTERMEDIATE_DATA_FORMAT_0001("Native character set error."),
+
+  /** Error while escaping a row. */
+  INTERMEDIATE_DATA_FORMAT_0002("An error has occurred while escaping a row."),
+
+  /** Error while escaping a row. */
+  INTERMEDIATE_DATA_FORMAT_0003("An error has occurred while unescaping a row."),
+
+  /** Column type isn't known by Intermediate Data Format. */
+  INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
+
+  /** Number of fields. */
+  INTERMEDIATE_DATA_FORMAT_0005("Wrong number of fields.")
+
+  ;
+
+  private final String message;
+
+  private IntermediateDataFormatError(String message) {
+    this.message = message;
+  }
+
+  public String getCode() {
+    return name();
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java
new file mode 100644
index 0000000..df6d30f
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Binary;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class CSVIntermediateDataFormatTest {
+
+  private final String BYTE_FIELD_ENCODING = "ISO-8859-1";
+
+  private IntermediateDataFormat<?> data;
+
+  @Before
+  public void setUp() {
+    data = new CSVIntermediateDataFormat();
+  }
+
+  private String getByteFieldString(byte[] byteFieldData) {
+    try {
+      return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_ENCODING)).append("'").toString();
+    } catch(UnsupportedEncodingException e) {
+      // Should never get to this point because ISO-8859-1 is a standard codec.
+      return null;
+    }
+  }
+
+  @Test
+  public void testStringInStringOut() {
+    String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+      + ",'" + String.valueOf(0x0A) + "'";
+    data.setTextData(testData);
+    assertEquals(testData, data.getTextData());
+  }
+
+  @Test
+  public void testNullStringInObjectOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1"))
+        .addColumn(new FixedPoint("2"))
+        .addColumn(new Text("3"))
+        .addColumn(new Text("4"))
+        .addColumn(new Binary("5"))
+        .addColumn(new Text("6"));
+    data.setSchema(schema);
+    data.setTextData(null);
+
+    Object[] out = data.getObjectData();
+
+    assertNull(out);
+  }
+
+  @Test(expected=SqoopException.class)
+  public void testEmptyStringInObjectOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1"))
+        .addColumn(new FixedPoint("2"))
+        .addColumn(new Text("3"))
+        .addColumn(new Text("4"))
+        .addColumn(new Binary("5"))
+        .addColumn(new Text("6"));
+    data.setSchema(schema);
+    data.setTextData("");
+
+    data.getObjectData();
+  }
+
+  @Test
+  public void testStringInObjectOut() {
+
+    //byte[0] = -112, byte[1] = 54 - 2's complements
+    String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+      + ",'\\n'";
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1"))
+        .addColumn(new FixedPoint("2"))
+        .addColumn(new Text("3"))
+        .addColumn(new Text("4"))
+        .addColumn(new Binary("5"))
+        .addColumn(new Text("6"));
+    data.setSchema(schema);
+    data.setTextData(testData);
+
+    Object[] out = data.getObjectData();
+
+    assertEquals(new Long(10),out[0]);
+    assertEquals(new Long(34),out[1]);
+    assertEquals("54",out[2]);
+    assertEquals("random data",out[3]);
+    assertEquals(-112, ((byte[])out[4])[0]);
+    assertEquals(54, ((byte[])out[4])[1]);
+    assertEquals("\n", out[5].toString());
+  }
+
+  @Test
+  public void testObjectInStringOut() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1"))
+        .addColumn(new FixedPoint("2"))
+        .addColumn(new Text("3"))
+        .addColumn(new Text("4"))
+        .addColumn(new Binary("5"))
+        .addColumn(new Text("6"));
+    data.setSchema(schema);
+
+    byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
+    Object[] in = new Object[6];
+    in[0] = new Long(10);
+    in[1] = new Long(34);
+    in[2] = "54";
+    in[3] = "random data";
+    in[4] = byteFieldData;
+    in[5] = new String(new char[] { 0x0A });
+
+    data.setObjectData(in);
+
+    //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
+    String testData = "10,34,'54','random data'," +
+        getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'";
+    assertEquals(testData, data.getTextData());
+  }
+
+  @Test
+  public void testObjectInObjectOut() {
+    //Test escapable sequences too.
+    //byte[0] = -112, byte[1] = 54 - 2's complements
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1"))
+        .addColumn(new FixedPoint("2"))
+        .addColumn(new Text("3"))
+        .addColumn(new Text("4"))
+        .addColumn(new Binary("5"))
+        .addColumn(new Text("6"));
+    data.setSchema(schema);
+
+    Object[] in = new Object[6];
+    in[0] = new Long(10);
+    in[1] = new Long(34);
+    in[2] = "54";
+    in[3] = "random data";
+    in[4] = new byte[] { (byte) -112, (byte) 54};
+    in[5] = new String(new char[] { 0x0A });
+    Object[] inCopy = new Object[6];
+    System.arraycopy(in,0,inCopy,0,in.length);
+
+    // Modifies the input array, so we use the copy to confirm
+    data.setObjectData(in);
+
+    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+  }
+
+  @Test
+  public void testStringFullRangeOfCharacters() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new Text("1"));
+    data.setSchema(schema);
+
+    char[] allCharArr = new char[256];
+    for(int i = 0; i < allCharArr.length; ++i) {
+      allCharArr[i] = (char)i;
+    }
+    String strData = new String(allCharArr);
+
+    Object[] in = {strData};
+    Object[] inCopy = new Object[1];
+    System.arraycopy(in,0,inCopy,0,in.length);
+
+    // Modifies the input array, so we use the copy to confirm
+    data.setObjectData(in);
+
+    assertEquals(strData, data.getObjectData()[0]);
+    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+  }
+
+  @Test
+  public void testByteArrayFullRangeOfCharacters() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new Binary("1"));
+    data.setSchema(schema);
+
+    byte[] allCharByteArr = new byte[256];
+    for(int i = 0; i < allCharByteArr.length; ++i) {
+      allCharByteArr[i] = (byte)i;
+    }
+
+    Object[] in = {allCharByteArr};
+    Object[] inCopy = new Object[1];
+    System.arraycopy(in,0,inCopy,0,in.length);
+
+    // Modifies the input array, so we use the copy to confirm
+    data.setObjectData(in);
+    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index e052584..1700432 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -22,6 +22,7 @@ import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.ConnectorManager;
 import org.apache.sqoop.request.HttpEventContext;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.core.Reconfigurable;
 import org.apache.sqoop.core.SqoopConfiguration;
@@ -327,6 +328,10 @@ public class JobManager implements Reconfigurable {
     request.setJobName(job.getName());
     request.setJobId(job.getPersistenceId());
     request.setNotificationUrl(notificationBaseUrl + jobId);
+    Class<? extends IntermediateDataFormat<?>> dataFormatClass =
+      connector.getIntermediateDataFormat();
+    request.setIntermediateDataFormat(connector.getIntermediateDataFormat());
+    // Create request object
 
     // Let's register all important jars
     // sqoop-common
@@ -343,6 +348,9 @@ public class JobManager implements Reconfigurable {
     // Extra libraries that Sqoop code requires
     request.addJarForClass(JSONValue.class);
 
+    // The IDF is used in the ETL process.
+    request.addJarForClass(dataFormatClass);
+
     // Get connector callbacks
     switch (job.getType()) {
       case IMPORT:

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
index a138db5..7900eee 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
@@ -18,6 +18,7 @@
 package org.apache.sqoop.framework;
 
 import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.job.etl.CallbackBase;
 import org.apache.sqoop.model.MJob;
@@ -107,6 +108,11 @@ public class SubmissionRequest {
    */
   Integer loaders;
 
+  /**
+   * The intermediate data format this submission should use.
+   */
+  Class<? extends IntermediateDataFormat> intermediateDataFormat;
+
   public SubmissionRequest() {
     this.jars = new LinkedList<String>();
     this.connectorContext = new MutableMapContext();
@@ -252,4 +258,13 @@ public class SubmissionRequest {
   public void setLoaders(Integer loaders) {
     this.loaders = loaders;
   }
+
+  public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() {
+    return intermediateDataFormat;
+  }
+
+  public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) {
+    this.intermediateDataFormat = intermediateDataFormat;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/execution/mapreduce/pom.xml b/execution/mapreduce/pom.xml
index f9a2a0e..b23b905 100644
--- a/execution/mapreduce/pom.xml
+++ b/execution/mapreduce/pom.xml
@@ -52,6 +52,10 @@ limitations under the License.
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
 
     <!-- See profiles for Hadoop specific dependencies -->
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 5c0a027..84f6213 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -34,7 +34,7 @@ import org.apache.sqoop.job.etl.HdfsExportPartitioner;
 import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
 import org.apache.sqoop.job.etl.HdfsTextImportLoader;
 import org.apache.sqoop.job.etl.Importer;
-import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.SqoopWritable;
 import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
 import org.apache.sqoop.job.mr.SqoopInputFormat;
 import org.apache.sqoop.job.mr.SqoopMapper;
@@ -53,14 +53,7 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     return new MRSubmissionRequest();
   }
 
-  /**
-   *  {@inheritDoc}
-   */
-  @Override
-  public void prepareImportSubmission(SubmissionRequest gRequest) {
-    MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
-    ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob();
-
+  public void prepareSubmission(MRSubmissionRequest request) {
     // Add jar dependencies
     addDependencies(request);
 
@@ -68,13 +61,35 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     request.setInputFormatClass(SqoopInputFormat.class);
 
     request.setMapperClass(SqoopMapper.class);
-    request.setMapOutputKeyClass(Data.class);
+    request.setMapOutputKeyClass(SqoopWritable.class);
     request.setMapOutputValueClass(NullWritable.class);
 
-    request.setOutputFormatClass(SqoopFileOutputFormat.class);
-    request.setOutputKeyClass(Data.class);
+    request.setOutputFormatClass(SqoopNullOutputFormat.class);
+    request.setOutputKeyClass(SqoopWritable.class);
     request.setOutputValueClass(NullWritable.class);
 
+    // Set up framework context
+    MutableMapContext context = request.getFrameworkContext();
+    context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT,
+      request.getIntermediateDataFormat().getName());
+
+    if(request.getExtractors() != null) {
+      context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
+    }
+  }
+
+  /**
+   *  {@inheritDoc}
+   */
+  @Override
+  public void prepareImportSubmission(SubmissionRequest gRequest) {
+    MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
+
+    prepareSubmission(request);
+    request.setOutputFormatClass(SqoopFileOutputFormat.class);
+
+    ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob();
+
     Importer importer = (Importer)request.getConnectorCallbacks();
 
     // Set up framework context
@@ -83,10 +98,6 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName());
     context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName());
 
-    if(request.getExtractors() != null) {
-      context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
-    }
-
     // TODO: This settings should be abstracted to core module at some point
     if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) {
       context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
@@ -137,19 +148,7 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
     ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob();
 
-    // Add jar dependencies
-    addDependencies(request);
-
-    // Configure map-reduce classes for import
-    request.setInputFormatClass(SqoopInputFormat.class);
-
-    request.setMapperClass(SqoopMapper.class);
-    request.setMapOutputKeyClass(Data.class);
-    request.setMapOutputValueClass(NullWritable.class);
-
-    request.setOutputFormatClass(SqoopNullOutputFormat.class);
-    request.setOutputKeyClass(Data.class);
-    request.setOutputValueClass(NullWritable.class);
+    prepareSubmission(request);
 
     Exporter exporter = (Exporter)request.getConnectorCallbacks();
 
@@ -162,10 +161,6 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     // Extractor that will be able to read all supported file types
     context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsExportExtractor.class.getName());
     context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory);
-
-    if(request.getExtractors() != null) {
-      context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
index 7fd9a01..b2fa15d 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -18,6 +18,7 @@
 package org.apache.sqoop.job;
 
 import org.apache.sqoop.core.ConfigurationConstants;
+import org.apache.sqoop.framework.FrameworkConstants;
 
 public final class JobConstants extends Constants {
   /**
@@ -66,6 +67,9 @@ public final class JobConstants extends Constants {
   public static final String HADOOP_COMPRESS_CODEC =
     "mapred.output.compression.codec";
 
+  public static final String INTERMEDIATE_DATA_FORMAT =
+    FrameworkConstants.PREFIX_EXECUTION_CONFIG + "intermediate.format";
+
   private JobConstants() {
     // Disable explicit object creation
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
index 1978ec6..43e6463 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
@@ -36,7 +36,6 @@ import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
 import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.PrefixContext;
-import org.apache.sqoop.job.io.Data;
 
 /**
  * Extract from HDFS.
@@ -50,12 +49,6 @@ public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, Expo
   private DataWriter dataWriter;
   private long rowRead = 0;
 
-  private final char fieldDelimiter;
-
-  public HdfsExportExtractor() {
-    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
-  }
-
   @Override
   public void extract(ExtractorContext context,
       ConnectionConfiguration connectionConfiguration,
@@ -63,7 +56,6 @@ public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, Expo
 
     conf = ((PrefixContext) context.getContext()).getConfiguration();
     dataWriter = context.getDataWriter();
-    dataWriter.setFieldDelimiter(fieldDelimiter);
 
     try {
       HdfsExportPartition p = partition;
@@ -113,7 +105,7 @@ public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, Expo
     boolean hasNext = filereader.next(line);
     while (hasNext) {
       rowRead++;
-      dataWriter.writeCsvRecord(line.toString());
+      dataWriter.writeStringRecord(line.toString());
       line = new Text();
       hasNext = filereader.next(line);
       if (filereader.getPosition() >= end && filereader.syncSeen()) {
@@ -173,7 +165,7 @@ public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, Expo
         next = fileseeker.getPos();
       }
       rowRead++;
-      dataWriter.writeCsvRecord(line.toString());
+      dataWriter.writeStringRecord(line.toString());
     }
     LOG.info("Extracting ended on position: " + fileseeker.getPos());
     filestream.close();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
index a07c511..d4ffb13 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.etl.io.DataReader;
 import org.apache.sqoop.utils.ClassUtils;
 
@@ -38,16 +37,9 @@ public class HdfsSequenceImportLoader extends Loader {
 
   public static final String EXTENSION = ".seq";
 
-  private final char fieldDelimiter;
-
-  public HdfsSequenceImportLoader() {
-    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
-  }
-
   @Override
   public void load(LoaderContext context, Object oc, Object oj) throws Exception {
     DataReader reader = context.getDataReader();
-    reader.setFieldDelimiter(fieldDelimiter);
 
     Configuration conf = new Configuration();
 //    Configuration conf = ((EtlContext)context).getConfiguration();
@@ -87,7 +79,7 @@ public class HdfsSequenceImportLoader extends Loader {
 
       String csv;
       Text text = new Text();
-      while ((csv = reader.readCsvRecord()) != null) {
+      while ((csv = reader.readTextRecord()) != null) {
         text.set(csv);
         filewriter.append(text, NullWritable.get());
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
index 4621942..7b799ca 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
@@ -22,6 +22,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 
+import com.google.common.base.Charsets;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -36,18 +37,15 @@ import org.apache.sqoop.utils.ClassUtils;
 
 public class HdfsTextImportLoader extends Loader {
 
-  private final char fieldDelimiter;
   private final char recordDelimiter;
 
   public HdfsTextImportLoader() {
-    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
     recordDelimiter = Data.DEFAULT_RECORD_DELIMITER;
   }
 
   @Override
   public void load(LoaderContext context, Object oc, Object oj) throws Exception{
     DataReader reader = context.getDataReader();
-    reader.setFieldDelimiter(fieldDelimiter);
 
     Configuration conf = new Configuration();
 //    Configuration conf = ((EtlContext)context).getConfiguration();
@@ -81,15 +79,15 @@ public class HdfsTextImportLoader extends Loader {
       DataOutputStream filestream = fs.create(filepath, false);
       if (codec != null) {
         filewriter = new BufferedWriter(new OutputStreamWriter(
-            codec.createOutputStream(filestream, codec.createCompressor()),
-            Data.CHARSET_NAME));
+          codec.createOutputStream(filestream, codec.createCompressor()),
+          Charsets.UTF_8));
       } else {
         filewriter = new BufferedWriter(new OutputStreamWriter(
-            filestream, Data.CHARSET_NAME));
+            filestream, Charsets.UTF_8));
       }
 
       String csv;
-      while ((csv = reader.readCsvRecord()) != null) {
+      while ((csv = reader.readTextRecord()) != null) {
         filewriter.write(csv + recordDelimiter);
       }
       filewriter.close();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
new file mode 100644
index 0000000..ed118d2
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/SqoopWritable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.job.io;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SqoopWritable implements WritableComparable<SqoopWritable> {
+  private String strData;
+
+  public SqoopWritable() {}
+
+  public void setString(String data) {
+    strData = data;
+  }
+
+  public String getString() {
+    return strData;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(strData);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    strData = in.readUTF();
+  }
+
+  @Override
+  public int compareTo(SqoopWritable o) {
+    return strData.compareTo(o.getString());
+  }
+
+  @Override
+  public String toString() {
+    return getString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
index 356ae8a..bbf7342 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
@@ -34,13 +34,13 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.SqoopWritable;
 
 /**
  * An output format for MapReduce job.
  */
 public class SqoopFileOutputFormat
-    extends FileOutputFormat<Data, NullWritable> {
+    extends FileOutputFormat<SqoopWritable, NullWritable> {
 
   public static final Logger LOG =
     Logger.getLogger(SqoopFileOutputFormat.class);
@@ -49,7 +49,7 @@ public class SqoopFileOutputFormat
       DefaultCodec.class;
 
   @Override
-  public RecordWriter<Data, NullWritable> getRecordWriter(
+  public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
       TaskAttemptContext context) throws IOException {
     Configuration conf = context.getConfiguration();
 
@@ -69,6 +69,7 @@ public class SqoopFileOutputFormat
     return executor.getRecordWriter();
   }
 
+  @Override
   public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
     Path output = getOutputPath(context);
     return new DestroyerFileOutputCommitter(output, context);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 92de37e..645dbc6 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -27,21 +27,22 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
-import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.etl.io.DataWriter;
 import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.job.io.SqoopWritable;
 import org.apache.sqoop.submission.counter.SqoopCounters;
 import org.apache.sqoop.utils.ClassUtils;
 
 /**
  * A mapper to perform map function.
  */
-public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWritable> {
+public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable> {
 
   static {
     ConfigurationUtils.configureLogging();
@@ -52,6 +53,8 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWrit
    * Service for reporting progress to mapreduce.
    */
   private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor();
+  private IntermediateDataFormat data = null;
+  private SqoopWritable dataOut = null;
 
   @Override
   public void run(Context context) throws IOException, InterruptedException {
@@ -60,6 +63,12 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWrit
     String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
     Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
 
+    String intermediateDataFormatName = conf.get(JobConstants
+      .INTERMEDIATE_DATA_FORMAT);
+    data = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName);
+    data.setSchema(ConfigurationUtils.getConnectorSchema(conf));
+    dataOut = new SqoopWritable();
+
     // Objects that should be pass to the Executor execution
     PrefixContext subContext = null;
     Object configConnection = null;
@@ -109,46 +118,38 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, Data, NullWrit
     }
   }
 
-  public class MapDataWriter extends DataWriter {
+  private class MapDataWriter extends DataWriter {
     private Context context;
-    private Data data;
 
     public MapDataWriter(Context context) {
       this.context = context;
     }
 
     @Override
-    public void setFieldDelimiter(char fieldDelimiter) {
-      if (data == null) {
-        data = new Data();
-      }
-
-      data.setFieldDelimiter(fieldDelimiter);
-    }
-
-    @Override
     public void writeArrayRecord(Object[] array) {
-      writeContent(array, Data.ARRAY_RECORD);
+      data.setObjectData(array);
+      writeContent();
     }
 
     @Override
-    public void writeCsvRecord(String csv) {
-      writeContent(csv, Data.CSV_RECORD);
+    public void writeStringRecord(String text) {
+      data.setTextData(text);
+      writeContent();
     }
 
     @Override
-    public void writeContent(Object content, int type) {
-      if (data == null) {
-        data = new Data();
-      }
+    public void writeRecord(Object obj) {
+      data.setData(obj.toString());
+      writeContent();
+    }
 
-      data.setContent(content, type);
+    private void writeContent() {
       try {
-        context.write(data, NullWritable.get());
+        dataOut.setString(data.getTextData());
+        context.write(dataOut, NullWritable.get());
       } catch (Exception e) {
         throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e);
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
index 90de6ef..b3461bb 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
@@ -28,14 +28,14 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.job.JobConstants;
-import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.SqoopWritable;
 
 import java.io.IOException;
 
 /**
  * An output format for MapReduce job.
  */
-public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable> {
+public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWritable> {
 
   public static final Logger LOG =
     Logger.getLogger(SqoopNullOutputFormat.class);
@@ -46,7 +46,7 @@ public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable> {
   }
 
   @Override
-  public RecordWriter<Data, NullWritable> getRecordWriter(
+  public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
       TaskAttemptContext context) {
     SqoopOutputFormatLoadExecutor executor =
         new SqoopOutputFormatLoadExecutor(context);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 7dedee9..6efadf6 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -31,14 +31,16 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
-import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.etl.io.DataReader;
 import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.job.io.SqoopWritable;
 import org.apache.sqoop.utils.ClassUtils;
 
 public class SqoopOutputFormatLoadExecutor {
@@ -48,7 +50,7 @@ public class SqoopOutputFormatLoadExecutor {
 
   private volatile boolean readerFinished = false;
   private volatile boolean writerFinished = false;
-  private volatile Data data;
+  private volatile IntermediateDataFormat data;
   private JobContext context;
   private SqoopRecordWriter producer;
   private Future<?> consumerFuture;
@@ -60,17 +62,19 @@ public class SqoopOutputFormatLoadExecutor {
   SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){
     this.isTest = isTest;
     this.loaderName = loaderName;
-    data = new Data();
+    data = new CSVIntermediateDataFormat();
     producer = new SqoopRecordWriter();
   }
 
   public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
-    data = new Data();
     context = jobctx;
     producer = new SqoopRecordWriter();
+    data = (IntermediateDataFormat) ClassUtils.instantiate(context
+      .getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
+    data.setSchema(ConfigurationUtils.getConnectorSchema(context.getConfiguration()));
   }
 
-  public RecordWriter<Data, NullWritable> getRecordWriter() {
+  public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
     consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat
         ("OutputFormatLoader-consumer").build()).submit(
             new ConsumerThread());
@@ -81,14 +85,13 @@ public class SqoopOutputFormatLoadExecutor {
    * This is a producer-consumer problem and can be solved
    * with two semaphores.
    */
-  private class SqoopRecordWriter extends RecordWriter<Data, NullWritable> {
+  private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable> {
 
     @Override
-    public void write(Data key, NullWritable value) throws InterruptedException {
+    public void write(SqoopWritable key, NullWritable value) throws InterruptedException {
       free.acquire();
       checkIfConsumerThrew();
-      int type = key.getType();
-      data.setContent(key.getContent(type), type);
+      data.setTextData(key.getString());
       filled.release();
     }
 
@@ -135,48 +138,68 @@ public class SqoopOutputFormatLoadExecutor {
   }
 
   private class OutputFormatDataReader extends DataReader {
-    @Override
-    public void setFieldDelimiter(char fieldDelimiter) {
-      data.setFieldDelimiter(fieldDelimiter);
-    }
 
     @Override
     public Object[] readArrayRecord() throws InterruptedException {
-      return (Object[])readContent(Data.ARRAY_RECORD);
+      acquireSema();
+      // If the writer has finished, there is definitely no data remaining
+      if (writerFinished) {
+        return null;
+      }
+      try {
+        return data.getObjectData();
+      } finally {
+        releaseSema();
+      }
     }
 
     @Override
-    public String readCsvRecord() throws InterruptedException {
-      return (String)readContent(Data.CSV_RECORD);
+    public String readTextRecord() throws InterruptedException {
+      acquireSema();
+      // If the writer has finished, there is definitely no data remaining
+      if (writerFinished) {
+        return null;
+      }
+      try {
+        return data.getTextData();
+      } finally {
+        releaseSema();
+      }
     }
 
     @Override
-    public Object readContent(int type) throws InterruptedException {
-      // Has any more data been produced after I last consumed.
-      // If no, wait for the producer to produce.
-      try {
-        filled.acquire();
-      } catch (InterruptedException ex) {
-        //Really at this point, there is nothing to do. Just throw and get out
-        LOG.error("Interrupted while waiting for data to be available from " +
-            "mapper", ex);
-        throw ex;
-      }
-      // If the writer has finished, there is definitely no data remaining
+    public Object readContent() throws InterruptedException {
+      acquireSema();
       if (writerFinished) {
         return null;
       }
       try {
-        Object content = data.getContent(type);
-        return content;
+        return data.getData();
       } catch (Throwable t) {
         readerFinished = true;
         LOG.error("Caught exception e while getting content ", t);
         throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);
       } finally {
-        free.release();
+        releaseSema();
+      }
+    }
+
+    private void acquireSema() throws InterruptedException {
+      // Has any more data been produced after I last consumed.
+      // If no, wait for the producer to produce.
+      try {
+        filled.acquire();
+      } catch (InterruptedException ex) {
+        //Really at this point, there is nothing to do. Just throw and get out
+        LOG.error("Interrupted while waiting for data to be available from " +
+          "mapper", ex);
+        throw ex;
       }
     }
+
+    private void releaseSema(){
+      free.release();
+    }
   }
 
   private class ConsumerThread implements Runnable {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
index 98a2c51..a55534a 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopReducer.java
@@ -20,7 +20,7 @@ package org.apache.sqoop.job.mr;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.log4j.Logger;
-import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.SqoopWritable;
 
 import java.io.IOException;
 import java.util.concurrent.Executors;
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * A reducer to perform reduce function.
  */
-public class SqoopReducer extends Reducer<Data, NullWritable, Data, NullWritable> {
+public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWritable, NullWritable> {
 
   static {
     ConfigurationUtils.configureLogging();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
index 39d1b53..a849394 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngineTest.java
@@ -18,6 +18,7 @@
 package org.apache.sqoop.execution.mapreduce;
 
 import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.apache.sqoop.framework.SubmissionRequest;
 import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
 import org.apache.sqoop.framework.configuration.OutputCompression;
@@ -71,6 +72,7 @@ public class MapreduceExecutionEngineTest {
     request.setConnectorCallbacks(new Importer(Initializer.class,
       Partitioner.class, Extractor.class, Destroyer.class) {
     });
+    request.setIntermediateDataFormat(CSVIntermediateDataFormat.class);
     executionEngine.prepareImportSubmission(request);
 
     MutableMapContext context = request.getFrameworkContext();
@@ -97,6 +99,7 @@ public class MapreduceExecutionEngineTest {
     request.setConnectorCallbacks(new Importer(Initializer.class,
       Partitioner.class, Extractor.class, Destroyer.class) {
     });
+    request.setIntermediateDataFormat(CSVIntermediateDataFormat.class);
     executionEngine.prepareImportSubmission(request);
 
     MutableMapContext context = request.getFrameworkContext();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
index e21f15b..09e5ec5 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.SqoopWritable;
 import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
 import org.apache.sqoop.job.mr.SqoopInputFormat;
 import org.apache.sqoop.job.mr.SqoopMapper;
@@ -44,17 +44,17 @@ public class JobUtils {
   }
 
   public static void runJob(Configuration conf,
-      Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
-      Class<? extends Mapper<SqoopSplit, NullWritable, Data, NullWritable>> mapper,
-      Class<? extends OutputFormat<Data, NullWritable>> output)
-      throws IOException, InterruptedException, ClassNotFoundException {
+    Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
+    Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper,
+    Class<? extends OutputFormat<SqoopWritable, NullWritable>> output)
+    throws IOException, InterruptedException, ClassNotFoundException {
     Job job = new Job(conf);
     job.setInputFormatClass(input);
     job.setMapperClass(mapper);
-    job.setMapOutputKeyClass(Data.class);
+    job.setMapOutputKeyClass(SqoopWritable.class);
     job.setMapOutputValueClass(NullWritable.class);
     job.setOutputFormatClass(output);
-    job.setOutputKeyClass(Data.class);
+    job.setOutputKeyClass(SqoopWritable.class);
     job.setOutputValueClass(NullWritable.class);
 
     boolean success = job.waitForCompletion(true);