You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/01/09 20:17:32 UTC

sqoop git commit: SQOOP-1956: Sqoop2: Cleanup IDF implementations

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 aeab9150b -> 89dcbe879


SQOOP-1956: Sqoop2: Cleanup IDF implementations

(Veena Basavaraj via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/89dcbe87
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/89dcbe87
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/89dcbe87

Branch: refs/heads/sqoop2
Commit: 89dcbe879e81c75a27603d8cd2b2458cc0ce62a0
Parents: aeab915
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Fri Jan 9 11:16:05 2015 -0800
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Fri Jan 9 11:16:05 2015 -0800

----------------------------------------------------------------------
 .../connector/jdbc/util/SqlTypesUtils.java      |   2 +-
 .../connector/jdbc/TestFromInitializer.java     |   2 +-
 .../sqoop/connector/common/SqoopIDFUtils.java   |  52 +++---
 .../idf/AVROIntermediateDataFormat.java         |  22 +--
 .../idf/CSVIntermediateDataFormat.java          | 171 +++++++++----------
 .../connector/idf/IntermediateDataFormat.java   |  63 +++----
 .../idf/JSONIntermediateDataFormat.java         |  31 ++--
 .../connector/common/TestSqoopIDFUtils.java     |  24 +--
 .../idf/TestCSVIntermediateDataFormat.java      | 155 +++++++++--------
 .../java/org/apache/sqoop/job/TestMatching.java |  12 +-
 .../apache/sqoop/job/io/TestSqoopWritable.java  |   7 +-
 .../mr/TestSqoopOutputFormatLoadExecutor.java   |  10 +-
 .../apache/sqoop/job/util/MRJobTestUtil.java    |   2 +-
 13 files changed, 272 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/89dcbe87/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
index 9cfee46..c0ca7f2 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
@@ -49,7 +49,7 @@ public class SqlTypesUtils {
       case Types.SMALLINT:
       case Types.TINYINT:
       case Types.INTEGER:
-        return new FixedPoint(columnName);
+        return new FixedPoint(columnName).setByteSize(2L);
 
       case Types.CLOB:
       case Types.VARCHAR:

http://git-wip-us.apache.org/repos/asf/sqoop/blob/89dcbe87/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
index e8c0f0b..5bdcd99 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
@@ -102,7 +102,7 @@ public class TestFromInitializer {
    */
   public Schema getSchema(String name) {
     return new Schema(name)
-      .addColumn(new FixedPoint("ICOL"))
+      .addColumn(new FixedPoint("ICOL").setByteSize(2L))
       .addColumn(new FloatingPoint("DCOL"))
       .addColumn(new Text("VCOL"))
     ;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/89dcbe87/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
index 26ff629..800630f 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
@@ -94,7 +94,7 @@ public class SqoopIDFUtils {
 
   // ******** Number Column Type utils***********
 
-  public static String encodeToCSVFixedPoint(Object obj, Column column) {
+  public static String toCSVFixedPoint(Object obj, Column column) {
     Long byteSize = ((FixedPoint) column).getByteSize();
     if (byteSize != null && byteSize <= Integer.SIZE) {
       return ((Integer) obj).toString();
@@ -114,7 +114,7 @@ public class SqoopIDFUtils {
     return returnValue;
   }
 
-  public static String encodeToCSVFloatingPoint(Object obj, Column column) {
+  public static String toCSVFloatingPoint(Object obj, Column column) {
     Long byteSize = ((FloatingPoint) column).getByteSize();
     if (byteSize != null && byteSize <= Float.SIZE) {
       return ((Float) obj).toString();
@@ -134,7 +134,7 @@ public class SqoopIDFUtils {
     return returnValue;
   }
 
-  public static String encodeToCSVDecimal(Object obj) {
+  public static String toCSVDecimal(Object obj) {
     return ((BigDecimal) obj).toString();
   }
 
@@ -143,7 +143,7 @@ public class SqoopIDFUtils {
   }
 
   // ********** BIT Column Type utils******************
-  public static String encodeToCSVBit(Object obj) {
+  public static String toCSVBit(Object obj) {
     String bitStringValue = obj.toString();
     if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
       return bitStringValue;
@@ -164,16 +164,16 @@ public class SqoopIDFUtils {
 
   // *********** DATE and TIME Column Type utils **********
 
-  public static String encodeToCSVDate(Object obj) {
+  public static String toCSVDate(Object obj) {
     org.joda.time.LocalDate date = (org.joda.time.LocalDate) obj;
-    return encloseWithQuote(df.print(date));
+    return encloseWithQuotes(df.print(date));
   }
 
-  public static String encodeToCSVTime(Object obj, Column col) {
+  public static String toCSVTime(Object obj, Column col) {
     if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) {
-      return encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) obj));
+      return encloseWithQuotes(tfWithFraction.print((org.joda.time.LocalTime) obj));
     } else {
-      return encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) obj));
+      return encloseWithQuotes(tfWithNoFraction.print((org.joda.time.LocalTime) obj));
     }
   }
 
@@ -187,27 +187,27 @@ public class SqoopIDFUtils {
 
   // *********** DATE TIME Column Type utils **********
 
-  public static String encodeToCSVLocalDateTime(Object obj, Column col) {
+  public static String toCSVLocalDateTime(Object obj, Column col) {
     org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) obj;
     org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
     if (column.hasFraction()) {
-      return encloseWithQuote(dtfWithFractionNoTimeZone.print(localDateTime));
+      return encloseWithQuotes(dtfWithFractionNoTimeZone.print(localDateTime));
     } else {
-      return encloseWithQuote(dtfWithNoFractionAndTimeZone.print(localDateTime));
+      return encloseWithQuotes(dtfWithNoFractionAndTimeZone.print(localDateTime));
     }
   }
 
-  public static String encodeToCSVDateTime(Object obj, Column col) {
+  public static String toCSVDateTime(Object obj, Column col) {
     org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj;
     org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
     if (column.hasFraction() && column.hasTimezone()) {
-      return encloseWithQuote(dtfWithFractionAndTimeZone.print(dateTime));
+      return encloseWithQuotes(dtfWithFractionAndTimeZone.print(dateTime));
     } else if (column.hasFraction() && !column.hasTimezone()) {
-      return encloseWithQuote(dtfWithFractionNoTimeZone.print(dateTime));
+      return encloseWithQuotes(dtfWithFractionNoTimeZone.print(dateTime));
     } else if (column.hasTimezone()) {
-      return encloseWithQuote(dtfWithNoFractionWithTimeZone.print(dateTime));
+      return encloseWithQuotes(dtfWithNoFractionWithTimeZone.print(dateTime));
     } else {
-      return encloseWithQuote(dtfWithNoFractionAndTimeZone.print(dateTime));
+      return encloseWithQuotes(dtfWithNoFractionAndTimeZone.print(dateTime));
     }
   }
 
@@ -256,10 +256,10 @@ public class SqoopIDFUtils {
   // ************ MAP Column Type utils*********
 
   @SuppressWarnings("unchecked")
-  public static String encodeToCSVMap(Map<Object, Object> map, Column column) {
+  public static String toCSVMap(Map<Object, Object> map, Column column) {
     JSONObject object = new JSONObject();
     object.putAll(map);
-    return encloseWithQuote(object.toJSONString());
+    return encloseWithQuotes(object.toJSONString());
   }
 
   public static Map<Object, Object> toMap(String csvString) {
@@ -314,7 +314,7 @@ public class SqoopIDFUtils {
   // ************ LIST Column Type utils*********
 
   @SuppressWarnings("unchecked")
-  public static String encodeToCSVList(Object[] list, Column column) {
+  public static String toCSVList(Object[] list, Column column) {
     List<Object> elementList = new ArrayList<Object>();
     for (int n = 0; n < list.length; n++) {
       Column listType = ((AbstractComplexListType) column).getListType();
@@ -332,7 +332,7 @@ public class SqoopIDFUtils {
     }
     JSONArray array = new JSONArray();
     array.addAll(elementList);
-    return encloseWithQuote(array.toJSONString());
+    return encloseWithQuotes(array.toJSONString());
   }
 
   public static Object[] toList(String csvString) {
@@ -397,7 +397,7 @@ public class SqoopIDFUtils {
     return string.replaceAll("\\\\", Matcher.quoteReplacement("\\\\"));
   }
 
-  public static String encodeToCSVString(String string) {
+  public static String toCSVString(String string) {
     int j = 0;
     String replacement = string;
     try {
@@ -408,7 +408,7 @@ public class SqoopIDFUtils {
       throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + "  " + replacement
           + "  " + String.valueOf(j) + "  " + e.getMessage());
     }
-    return encloseWithQuote(replacement);
+    return encloseWithQuotes(replacement);
   }
 
   public static String toText(String string) {
@@ -429,10 +429,10 @@ public class SqoopIDFUtils {
 
   // ************ BINARY Column type utils*********
 
-  public static String encodeToCSVByteArray(Object obj) {
+  public static String toCSVByteArray(Object obj) {
     byte[] bytes = (byte[]) obj;
     try {
-      return encodeToCSVString(new String(bytes, BYTE_FIELD_CHARSET));
+      return toCSVString(new String(bytes, BYTE_FIELD_CHARSET));
     } catch (UnsupportedEncodingException e) {
       // We should never hit this case.
       // This character set should be distributed with Java.
@@ -455,7 +455,7 @@ public class SqoopIDFUtils {
 
   // *********** SQOOP CSV standard encoding utils********************
 
-  public static String encloseWithQuote(String string) {
+  public static String encloseWithQuotes(String string) {
     StringBuilder builder = new StringBuilder();
     builder.append(QUOTE_CHARACTER).append(string).append(QUOTE_CHARACTER);
     return builder.toString();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/89dcbe87/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
index b12b59a..f0dd914 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
@@ -302,47 +302,47 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
       case ARRAY:
       case SET:
         List<Object> objList = (List<Object>) obj;
-        csvString.append(encodeToCSVList(toObjectArray(objList), cols[i]));
+        csvString.append(toCSVList(toObjectArray(objList), cols[i]));
         break;
       case MAP:
         Map<Object, Object> objMap = (Map<Object, Object>) obj;
-        csvString.append(encodeToCSVMap(objMap, cols[i]));
+        csvString.append(toCSVMap(objMap, cols[i]));
         break;
       case ENUM:
       case TEXT:
-        csvString.append(encodeToCSVString(obj.toString()));
+        csvString.append(toCSVString(obj.toString()));
         break;
       case BINARY:
       case UNKNOWN:
-        csvString.append(encodeToCSVByteArray(getBytesFromByteBuffer(obj)));
+        csvString.append(toCSVByteArray(getBytesFromByteBuffer(obj)));
         break;
       case FIXED_POINT:
-        csvString.append(encodeToCSVFixedPoint(obj, cols[i]));
+        csvString.append(toCSVFixedPoint(obj, cols[i]));
         break;
       case FLOATING_POINT:
-        csvString.append(encodeToCSVFloatingPoint(obj, cols[i]));
+        csvString.append(toCSVFloatingPoint(obj, cols[i]));
         break;
       case DECIMAL:
         // stored as string
-        csvString.append(encodeToCSVDecimal(obj));
+        csvString.append(toCSVDecimal(obj));
         break;
       case DATE:
         // stored as long
         Long dateInMillis = (Long) obj;
-        csvString.append(encodeToCSVDate(new org.joda.time.LocalDate(dateInMillis)));
+        csvString.append(toCSVDate(new org.joda.time.LocalDate(dateInMillis)));
         break;
       case TIME:
         // stored as long
         Long timeInMillis = (Long) obj;
-        csvString.append(encodeToCSVTime(new org.joda.time.LocalTime(timeInMillis), cols[i]));
+        csvString.append(toCSVTime(new org.joda.time.LocalTime(timeInMillis), cols[i]));
         break;
       case DATE_TIME:
         // stored as long
         Long dateTimeInMillis = (Long) obj;
-        csvString.append(encodeToCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), cols[i]));
+        csvString.append(toCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), cols[i]));
         break;
       case BIT:
-        csvString.append(encodeToCSVBit(obj));
+        csvString.append(toCSVBit(obj));
         break;
       default:
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,

http://git-wip-us.apache.org/repos/asf/sqoop/blob/89dcbe87/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index 33b5d0a..856a4bb 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -20,45 +20,39 @@ package org.apache.sqoop.connector.idf;
 
 import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.AbstractComplexListType;
 import org.apache.sqoop.schema.type.Column;
-import org.apache.sqoop.utils.ClassUtils;
-import org.joda.time.DateTime;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalDateTime;
-import org.joda.time.LocalTime;
-import org.json.simple.JSONValue;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 /**
  * A concrete implementation for the {@link #IntermediateDataFormat} that
  * represents each row of the data source as a comma separates list. Each
- * element in the CSV represents a specific column value encoded as string using the sqoop specified rules.
- * The methods allow serializing to this string and deserializing the string to its
- * corresponding java object based on the {@link #Schema} and its
- * {@link #Column} types.
+ * element in the CSV represents a specific column value encoded as string using
+ * the sqoop specified rules. The methods allow serializing to this string and
+ * deserializing the string to its corresponding java object based on the
+ * {@link #Schema} and its {@link #Column} types.
  *
  */
 public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
 
   public static final Logger LOG = Logger.getLogger(CSVIntermediateDataFormat.class);
 
+  // need this default constructor for reflection magic used in execution engine
   public CSVIntermediateDataFormat() {
   }
 
   public CSVIntermediateDataFormat(Schema schema) {
-    setSchema(schema);
+    super.setSchema(schema);
   }
+
   /**
    * {@inheritDoc}
    */
@@ -80,11 +74,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
    */
   @Override
   public Object[] getObjectData() {
-    if (schema == null || schema.isEmpty()) {
-      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002);
-    }
-
-    // fieldStringArray represents the csv fields parsed into string array
+    super.validateSchema(schema);
     String[] csvStringArray = parseCSVString(this.data);
 
     if (csvStringArray == null) {
@@ -92,14 +82,13 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     }
 
     if (csvStringArray.length != schema.getColumnsArray().length) {
-      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + getCSVTextData()
-          + " has the wrong number of fields.");
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+          "The data " + getCSVTextData() + " has the wrong number of fields.");
     }
 
     Object[] objectArray = new Object[csvStringArray.length];
     Column[] columnArray = schema.getColumnsArray();
     for (int i = 0; i < csvStringArray.length; i++) {
-      // check for NULL field and bail out immediately
       if (csvStringArray[i].equals(NULL_VALUE)) {
         objectArray[i] = null;
         continue;
@@ -109,7 +98,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     return objectArray;
   }
 
-
   private Object toObject(String csvString, Column column) {
     Object returnValue = null;
 
@@ -163,18 +151,9 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
    */
   @Override
   public void setObjectData(Object[] data) {
-    Set<Integer> nullValueIndices = new HashSet<Integer>();
-    Column[] columnArray = schema.getColumnsArray();
-    // check for null
-    for (int i = 0; i < data.length; i++) {
-      if (data[i] == null) {
-        nullValueIndices.add(i);
-        data[i] = NULL_VALUE;
-      }
-    }
-    // ignore the null values while encoding the object array into csv string
-    encodeToCSVText(data, columnArray, nullValueIndices);
-    this.data = StringUtils.join(data, CSV_SEPARATOR_CHARACTER);
+    super.validateSchema(schema);
+    // convert object array to csv text
+    this.data = toCSV(data);
 
   }
 
@@ -200,60 +179,75 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
    * array
    *
    * @param objectArray
-   * @param columnArray
-   * @param nullValueIndices
    */
   @SuppressWarnings("unchecked")
-  private void encodeToCSVText(Object[] objectArray, Column[] columnArray, Set<Integer> nullValueIndices) {
-    for (int i : bitTypeColumnIndices) {
-      if (!nullValueIndices.contains(i)) {
-        objectArray[i] = encodeToCSVBit(objectArray[i]);
-      }
-    }
-    for (int i : stringTypeColumnIndices) {
-      if (!nullValueIndices.contains(i)) {
-        objectArray[i] = encodeToCSVString((String) objectArray[i]);
-      }
-    }
-    for (int i : dateTimeTypeColumnIndices) {
-      if (!nullValueIndices.contains(i)) {
-        Column col = columnArray[i];
-        if (objectArray[i] instanceof org.joda.time.DateTime) {
-          org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i];
-          // check for fraction and time zone and then use the right formatter
-          objectArray[i] = encodeToCSVDateTime(dateTime, col);
-        } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
-          org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i];
-          objectArray[i] = encodeToCSVLocalDateTime(localDateTime, col);
+  private String toCSV(Object[] objectArray) {
+
+    Column[] columnArray = schema.getColumnsArray();
+
+    StringBuilder csvString = new StringBuilder();
+    for (int i = 0; i < columnArray.length; i++) {
+      Object obj = objectArray[i];
+      if (obj == null) {
+        csvString.append(NULL_VALUE);
+      } else {
+        switch (columnArray[i].getType()) {
+        case ARRAY:
+        case SET:
+          csvString.append(toCSVList((Object[]) obj, (AbstractComplexListType) columnArray[i]));
+          break;
+        case MAP:
+          csvString.append(toCSVMap((Map<Object, Object>) obj, columnArray[i]));
+          break;
+        case ENUM:
+        case TEXT:
+          csvString.append(toCSVString(obj.toString()));
+          break;
+        case BINARY:
+        case UNKNOWN:
+          csvString.append(toCSVByteArray((byte[]) obj));
+          break;
+        case FIXED_POINT:
+          csvString.append(toCSVFixedPoint(obj, columnArray[i]));
+          break;
+        case FLOATING_POINT:
+          csvString.append(toCSVFloatingPoint(obj, columnArray[i]));
+          break;
+        case DECIMAL:
+          csvString.append(toCSVDecimal(obj));
+          break;
+        // stored in JSON as strings in the joda time format
+        case DATE:
+          csvString.append(toCSVDate(obj));
+          break;
+        case TIME:
+          csvString.append(toCSVTime(obj, columnArray[i]));
+          break;
+        case DATE_TIME:
+          if (objectArray[i] instanceof org.joda.time.DateTime) {
+            org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj;
+            // check for fraction and time zone and then use the right formatter
+            csvString.append(toCSVDateTime(dateTime, columnArray[i]));
+          } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
+            org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) obj;
+            csvString.append(toCSVLocalDateTime(localDateTime, columnArray[i]));
+          }
+          break;
+        case BIT:
+          csvString.append(toCSVBit(obj));
+          break;
+        default:
+          throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+              "Column type from schema was not recognized for " + columnArray[i].getType());
         }
       }
-    }
-    for (int i : dateTypeColumnIndices) {
-      if (!nullValueIndices.contains(i)) {
-        objectArray[i] = encodeToCSVDate(objectArray[i]);
-      }
-    }
-    for (int i : timeTypeColumnIndices) {
-      Column col = columnArray[i];
-      if (!nullValueIndices.contains(i)) {
-        objectArray[i] = encodeToCSVTime(objectArray[i], col);
-      }
-    }
-    for (int i : byteTypeColumnIndices) {
-      if (!nullValueIndices.contains(i)) {
-        objectArray[i] = encodeToCSVByteArray((byte[]) objectArray[i]);
-      }
-    }
-    for (int i : listTypeColumnIndices) {
-      if (!nullValueIndices.contains(i)) {
-        objectArray[i] = encodeToCSVList((Object[]) objectArray[i], (AbstractComplexListType) columnArray[i]);
-      }
-    }
-    for (int i : mapTypeColumnIndices) {
-      if (!nullValueIndices.contains(i)) {
-        objectArray[i] = encodeToCSVMap((Map<Object, Object>) objectArray[i], columnArray[i]);
+      if (i < columnArray.length - 1) {
+        csvString.append(CSV_SEPARATOR_CHARACTER);
       }
+
     }
+
+    return csvString.toString();
   }
 
   /**
@@ -261,15 +255,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
    */
   @Override
   public Set<String> getJars() {
-
-    Set<String> jars = super.getJars();
-    // Add JODA classes for IDF date/time handling
-    jars.add(ClassUtils.jarForClass(LocalDate.class));
-    jars.add(ClassUtils.jarForClass(LocalDateTime.class));
-    jars.add(ClassUtils.jarForClass(DateTime.class));
-    jars.add(ClassUtils.jarForClass(LocalTime.class));
-    // Add JSON parsing jar
-    jars.add(ClassUtils.jarForClass(JSONValue.class));
-    return jars;
+    return super.getJars();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/89dcbe87/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index 055b41c..261a462 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -18,12 +18,14 @@
  */
 package org.apache.sqoop.connector.idf;
 
-import static org.apache.sqoop.connector.common.SqoopIDFUtils.isColumnListType;
-import static org.apache.sqoop.connector.common.SqoopIDFUtils.isColumnStringType;
-
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.Column;
-import org.apache.sqoop.schema.type.ColumnType;
+import org.apache.sqoop.utils.ClassUtils;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalDateTime;
+import org.joda.time.LocalTime;
+import org.json.simple.JSONValue;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -51,16 +53,6 @@ public abstract class IntermediateDataFormat<T> {
 
   protected Schema schema;
 
-  protected final Set<Integer> stringTypeColumnIndices = new HashSet<Integer>();
-  protected final Set<Integer> bitTypeColumnIndices = new HashSet<Integer>();
-  protected final Set<Integer> byteTypeColumnIndices = new HashSet<Integer>();
-  protected final Set<Integer> listTypeColumnIndices = new HashSet<Integer>();
-  protected final Set<Integer> mapTypeColumnIndices = new HashSet<Integer>();
-  protected final Set<Integer> dateTimeTypeColumnIndices = new HashSet<Integer>();
-  protected final Set<Integer> dateTypeColumnIndices = new HashSet<Integer>();
-  protected final Set<Integer> timeTypeColumnIndices = new HashSet<Integer>();
-
-
   /**
    * Get one row of data.
    *
@@ -134,32 +126,13 @@ public abstract class IntermediateDataFormat<T> {
    *          - the schema used for serializing/de-serializing data
    */
   public void setSchema(Schema schema) {
-    if (schema == null) {
-      // TODO(SQOOP-1956): throw an exception since working without a schema is dangerous
-      return;
-    }
+    validateSchema(schema);
     this.schema = schema;
-    Column[] columns = schema.getColumnsArray();
-    int i = 0;
-    for (Column col : columns) {
-      if (isColumnStringType(col)) {
-        stringTypeColumnIndices.add(i);
-      } else if (col.getType() == ColumnType.BIT) {
-        bitTypeColumnIndices.add(i);
-      } else if (col.getType() == ColumnType.DATE) {
-        dateTypeColumnIndices.add(i);
-      } else if (col.getType() == ColumnType.TIME) {
-        timeTypeColumnIndices.add(i);
-      } else if (col.getType() == ColumnType.DATE_TIME) {
-        dateTimeTypeColumnIndices.add(i);
-      } else if (col.getType() == ColumnType.BINARY) {
-        byteTypeColumnIndices.add(i);
-      } else if (isColumnListType(col)) {
-        listTypeColumnIndices.add(i);
-      } else if (col.getType() == ColumnType.MAP) {
-        mapTypeColumnIndices.add(i);
-      }
-      i++;
+  }
+
+  protected void validateSchema(Schema schema) {
+    if (schema == null) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002);
     }
   }
 
@@ -186,7 +159,15 @@ public abstract class IntermediateDataFormat<T> {
    * @return set of jars
    */
   public Set<String> getJars() {
-    return new HashSet<String>();
+    Set<String> jars = new  HashSet<String>();
+    // Add JODA classes for IDF date/time handling
+    jars.add(ClassUtils.jarForClass(LocalDate.class));
+    jars.add(ClassUtils.jarForClass(LocalDateTime.class));
+    jars.add(ClassUtils.jarForClass(DateTime.class));
+    jars.add(ClassUtils.jarForClass(LocalTime.class));
+    // Add JSON parsing jar
+    jars.add(ClassUtils.jarForClass(JSONValue.class));
+    return jars;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/89dcbe87/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
index 90294f0..b937d87 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
@@ -41,6 +41,10 @@ import java.util.Set;
  */
 public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObject> {
 
+  // need this default constructor for reflection magic used in execution engine
+  public JSONIntermediateDataFormat() {
+  }
+
   // We need schema at all times
   public JSONIntermediateDataFormat(Schema schema) {
     setSchema(schema);
@@ -110,6 +114,7 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
 
     Set<String> jars = super.getJars();
     jars.add(ClassUtils.jarForClass(JSONObject.class));
+    jars.add(ClassUtils.jarForClass(JSONArray.class));
     return jars;
   }
 
@@ -241,16 +246,16 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
       // stored in JSON as the same format as csv strings in the joda time
       // format
       case DATE_TIME:
-        object.put(cols[i].getName(), removeQuotes(encodeToCSVDateTime(data[i], cols[i])));
+        object.put(cols[i].getName(), removeQuotes(toCSVDateTime(data[i], cols[i])));
         break;
       case TIME:
-        object.put(cols[i].getName(), removeQuotes(encodeToCSVTime(data[i], cols[i])));
+        object.put(cols[i].getName(), removeQuotes(toCSVTime(data[i], cols[i])));
         break;
       case DATE:
-        object.put(cols[i].getName(), removeQuotes(encodeToCSVDate(data[i])));
+        object.put(cols[i].getName(), removeQuotes(toCSVDate(data[i])));
         break;
       case BIT:
-        object.put(cols[i].getName(), Boolean.valueOf(encodeToCSVBit(data[i])));
+        object.put(cols[i].getName(), Boolean.valueOf(toCSVBit(data[i])));
         break;
       default:
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
@@ -278,40 +283,40 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
       case SET:
         // stored as JSON array
         JSONArray array = (JSONArray) obj;
-        csvString.append(encloseWithQuote(array.toJSONString()));
+        csvString.append(encloseWithQuotes(array.toJSONString()));
         break;
       case MAP:
         // stored as JSON object
-        csvString.append(encloseWithQuote((((JSONObject) obj).toJSONString())));
+        csvString.append(encloseWithQuotes((((JSONObject) obj).toJSONString())));
         break;
       case ENUM:
       case TEXT:
-        csvString.append(encodeToCSVString(obj.toString()));
+        csvString.append(toCSVString(obj.toString()));
         break;
       case BINARY:
       case UNKNOWN:
-        csvString.append(encodeToCSVByteArray(Base64.decodeBase64(obj.toString())));
+        csvString.append(toCSVByteArray(Base64.decodeBase64(obj.toString())));
         break;
       case FIXED_POINT:
-        csvString.append(encodeToCSVFixedPoint(obj, cols[i]));
+        csvString.append(toCSVFixedPoint(obj, cols[i]));
         break;
       case FLOATING_POINT:
-        csvString.append(encodeToCSVFloatingPoint(obj, cols[i]));
+        csvString.append(toCSVFloatingPoint(obj, cols[i]));
         break;
       case DECIMAL:
-        csvString.append(encodeToCSVDecimal(obj));
+        csvString.append(toCSVDecimal(obj));
         break;
       // stored in JSON as strings in the joda time format
       case DATE:
       case TIME:
       case DATE_TIME:
-        csvString.append(encloseWithQuote(obj.toString()));
+        csvString.append(encloseWithQuotes(obj.toString()));
         break;
       // 0/1 will be stored as they are in JSON, even though valid values in
       // JSON
       // are true/false
       case BIT:
-        csvString.append(encodeToCSVBit(obj));
+        csvString.append(toCSVBit(obj));
         break;
       default:
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,

http://git-wip-us.apache.org/repos/asf/sqoop/blob/89dcbe87/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java
index 71db8da..1cef714 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java
@@ -45,7 +45,7 @@ public class TestSqoopIDFUtils {
   @Test
   public void testEncloseStringWithQuotes() {
     String test = "test";
-    String quotedText = encloseWithQuote(test);
+    String quotedText = encloseWithQuotes(test);
     assertEquals(quotedText, "'test'");
 
   }
@@ -53,7 +53,7 @@ public class TestSqoopIDFUtils {
   @Test
   public void testStringWithQuotesToEncloseStringWithQuotes() {
     String test = "'test'";
-    String quotedText = encloseWithQuote(test);
+    String quotedText = encloseWithQuotes(test);
     assertEquals(quotedText, "''test''");
 
   }
@@ -82,28 +82,28 @@ public class TestSqoopIDFUtils {
   @Test
   public void testExample1EncodeToCSVString() {
     String test = "test";
-    String encodedText = encodeToCSVString(test);
+    String encodedText = toCSVString(test);
     assertEquals(encodedText, "'test'");
   }
 
   @Test
   public void testExample2EncodeToCSVString() {
     String test = "test,test1";
-    String encodedText = encodeToCSVString(test);
+    String encodedText = toCSVString(test);
     assertEquals(encodedText, "'test,test1'");
   }
 
   @Test
   public void testExample3EncodeToCSVString() {
     String test = "test,'test1";
-    String encodedText = encodeToCSVString(test);
+    String encodedText = toCSVString(test);
     assertEquals(encodedText, "'test,\\'test1'");
   }
 
   @Test
   public void testExample4EncodeToCSVString() {
     String test = "test,\"test1";
-    String encodedText = encodeToCSVString(test);
+    String encodedText = toCSVString(test);
     assertEquals(encodedText, "'test,\\\"test1'");
   }
 
@@ -117,7 +117,7 @@ public class TestSqoopIDFUtils {
 
   public void testExample5EncodeToCSVString() {
     String test = new String(new char[] { 0x0A });
-    String encodedText = encodeToCSVString(test);
+    String encodedText = toCSVString(test);
     assertEquals(encodedText, "'\\n'");
   }
 
@@ -130,7 +130,7 @@ public class TestSqoopIDFUtils {
 
   public void testExample6EncodeToCSVString() {
     String test = new String(new char[] { 0x0D });
-    String encodedText = encodeToCSVString(test);
+    String encodedText = toCSVString(test);
     assertEquals(encodedText, "'\\r'");
   }
 
@@ -138,7 +138,7 @@ public class TestSqoopIDFUtils {
   public void testEncodeByteToCSVString() {
     // byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
     byte[] bytes = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54 };
-    String encodedText = encodeToCSVByteArray(bytes);
+    String encodedText = toCSVByteArray(bytes);
     String expectedText = getByteFieldString(bytes).replaceAll("\r", "\\\\r");
     assertEquals(encodedText, expectedText);
   }
@@ -149,7 +149,7 @@ public class TestSqoopIDFUtils {
     list.add(1);
     list.add(2);
     AbstractComplexListType array = new Array("a", new Text("t"));
-    String encodedText = encodeToCSVList(list.toArray(), array);
+    String encodedText = toCSVList(list.toArray(), array);
     assertEquals(encodedText, "'[1,2]'");
   }
 
@@ -159,7 +159,7 @@ public class TestSqoopIDFUtils {
     list.add("A");
     list.add("B");
     AbstractComplexListType array = new Array("a", new Text("t"));
-    String encodedText = encodeToCSVList(list.toArray(), array);
+    String encodedText = toCSVList(list.toArray(), array);
     assertEquals(encodedText, "'[\"A\",\"B\"]'");
   }
 
@@ -172,7 +172,7 @@ public class TestSqoopIDFUtils {
     map.put("A", list);
     org.apache.sqoop.schema.type.Map mapCol = new org.apache.sqoop.schema.type.Map("a", new Text("t"), new Array("r", new Text(
         "tr")));
-    String encodedText = encodeToCSVMap(map, mapCol);
+    String encodedText = toCSVMap(map, mapCol);
     assertEquals(encodedText, "'{\"A\":[\"A\",\"B\"]}'");
   }
   

http://git-wip-us.apache.org/repos/asf/sqoop/blob/89dcbe87/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index e116f3c..fca410f 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -55,7 +55,6 @@ public class TestCSVIntermediateDataFormat {
 
   @BeforeMethod
   public void setUp() {
-    dataFormat = new CSVIntermediateDataFormat();
   }
 
 
@@ -70,7 +69,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData(null);
     Object[] out = dataFormat.getObjectData();
     assertNull(out);
@@ -81,7 +80,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new FixedPoint("1")).addColumn(new FixedPoint("2")).addColumn(new Text("3")).addColumn(new Text("4"))
         .addColumn(new Binary("5")).addColumn(new Text("6"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData("");
     dataFormat.getObjectData();
   }
@@ -98,12 +97,13 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
         .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14"));
 
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null, null };
     dataFormat.setObjectData(in);
 
     String csvText = dataFormat.getCSVTextData();
     String[] textValues = csvText.split(",");
+    assertEquals(14, textValues.length);
     for (String text : textValues) {
       assertEquals(text, NULL_VALUE);
     }
@@ -121,11 +121,12 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
         .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14"));
 
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null, null };
     dataFormat.setObjectData(in);
 
     Object[] out = dataFormat.getObjectData();
+    assertEquals(14, out.length);
     for (Object obj : out) {
       assertEquals(obj, null);
     }
@@ -143,7 +144,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
         .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14"));
 
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL",
         "NULL" };
     dataFormat.setCSVTextData(StringUtils.join(test, ","));
@@ -166,7 +167,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new org.apache.sqoop.schema.type.Set("12", new Text("t4")))
         .addColumn(new org.apache.sqoop.schema.type.Enum("13")).addColumn(new org.apache.sqoop.schema.type.Unknown("14"));
 
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL",
         "NULL" };
     dataFormat.setCSVTextData(StringUtils.join(test, ","));
@@ -182,17 +183,30 @@ public class TestCSVIntermediateDataFormat {
 
   @Test
   public void testInputAsCSVTextInCSVTextOut() {
-    String testData = "'ENUM',10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
-      + ",'" + String.valueOf(0x0A) + "'";
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Enum("1")).addColumn(new FixedPoint("2"))
+        .addColumn(new FixedPoint("3")).addColumn(new Text("4")).addColumn(new Text("5"))
+        .addColumn(new Binary("6")).addColumn(new Text("7"));
+
+    String testData = "'ENUM',10,34,'54','random data',"
+        + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A)
+        + "'";
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData(testData);
     assertEquals(testData, dataFormat.getCSVTextData());
   }
 
-
   @Test
   public void testInputAsCSVTextInAndDataOut() {
-    String testData = "'ENUM',10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
-      + ",'" + String.valueOf(0x0A) + "'";
+    Schema schema = new Schema("test");
+    schema.addColumn(new org.apache.sqoop.schema.type.Enum("1")).addColumn(new FixedPoint("2"))
+        .addColumn(new FixedPoint("3")).addColumn(new Text("4")).addColumn(new Text("5"))
+        .addColumn(new Binary("6")).addColumn(new Text("7"));
+
+    String testData = "'ENUM',10,34,'54','random data',"
+        + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A)
+        + "'";
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData(testData);
     assertEquals(testData, dataFormat.getData());
   }
@@ -204,7 +218,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new Text("text"));
 
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData(testData);
 
     Object[] out = dataFormat.getObjectData();
@@ -226,7 +240,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("6"))
         .addColumn(new org.apache.sqoop.schema.type.Enum("7"));
 
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData(testData);
 
     Object[] out = dataFormat.getObjectData();
@@ -251,7 +265,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"))
         .addColumn(new org.apache.sqoop.schema.type.Enum("7"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
 
     byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
     Object[] in = new Object[7];
@@ -285,7 +299,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("6"))
         .addColumn(new org.apache.sqoop.schema.type.Enum("7"));
 
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
 
     Object[] in = new Object[7];
     in[0] = new Long(10);
@@ -315,7 +329,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("6"))
         .addColumn(new org.apache.sqoop.schema.type.Enum("7"));
 
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
 
     byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
     Object[] in = new Object[7];
@@ -340,7 +354,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new Text("1"));
 
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
 
     char[] allCharArr = new char[256];
     for(int i = 0; i < allCharArr.length; ++i) {
@@ -363,7 +377,7 @@ public class TestCSVIntermediateDataFormat {
   public void testByteArrayFullRangeOfCharacters() {
     Schema schema = new Schema("test");
     schema.addColumn(new Binary("1"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
 
     byte[] allCharByteArr = new byte[256];
     for (int i = 0; i < allCharByteArr.length; ++i) {
@@ -385,7 +399,7 @@ public class TestCSVIntermediateDataFormat {
   public void testTimeWithCSVTextInCSVTextOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Time("1", false));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData("'12:00:00'");
     assertEquals("'12:00:00'", dataFormat.getCSVTextData());
   }
@@ -394,7 +408,7 @@ public class TestCSVIntermediateDataFormat {
   public void testTimeWithCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Time("1", false));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData("'12:59:59'");
     org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59);
     assertEquals(time.toString(), dataFormat.getObjectData()[0].toString());
@@ -404,7 +418,7 @@ public class TestCSVIntermediateDataFormat {
   public void testTimeWithObjectArrayInCSVTextOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Time("1", true)).addColumn(new Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     org.joda.time.LocalTime time = new org.joda.time.LocalTime(15, 0, 0);
     Object[] in = { time, "test" };
     dataFormat.setObjectData(in);
@@ -415,7 +429,7 @@ public class TestCSVIntermediateDataFormat {
   public void testTimeWithObjectArrayInObjectArrayOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Time("1", true));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     org.joda.time.LocalTime time = new org.joda.time.LocalTime(2, 23, 33);
     Object[] in = { time };
     dataFormat.setObjectData(in);
@@ -428,7 +442,7 @@ public class TestCSVIntermediateDataFormat {
   public void testDateWithCSVTextInCSVTextOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Date("1"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData("'2014-10-01'");
     assertEquals("'2014-10-01'", dataFormat.getCSVTextData());
   }
@@ -437,7 +451,7 @@ public class TestCSVIntermediateDataFormat {
   public void testDateWithCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Date("1"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData("'2014-10-01'");
     org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
     assertEquals(date.toString(), dataFormat.getObjectData()[0].toString());
@@ -447,7 +461,7 @@ public class TestCSVIntermediateDataFormat {
   public void testDateWithObjectArrayInCSVTextOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Date("1")).addColumn(new Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
     Object[] in = { date, "test" };
     dataFormat.setObjectData(in);
@@ -458,7 +472,7 @@ public class TestCSVIntermediateDataFormat {
   public void testDateWithObjectArrayInObjectArrayOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Date("1"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
     Object[] in = { date };
     dataFormat.setObjectData(in);
@@ -471,7 +485,7 @@ public class TestCSVIntermediateDataFormat {
   public void testDateTimeWithCSVTextInCSVTextOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new DateTime("1", false, false));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
 
     dataFormat.setCSVTextData("'2014-10-01 12:00:00'");
     assertEquals("'2014-10-01 12:00:00'", dataFormat.getCSVTextData());
@@ -481,7 +495,7 @@ public class TestCSVIntermediateDataFormat {
   public void testDateTimeWithFractionNoTimezoneWithCSVTextInCSVTextOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new DateTime("1", true, false));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData("'2014-10-01 12:00:00.000'");
     assertEquals("'2014-10-01 12:00:00.000'", dataFormat.getCSVTextData());
   }
@@ -489,7 +503,7 @@ public class TestCSVIntermediateDataFormat {
   public void testDateTimeNoFractionNoTimezoneWithCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new DateTime("1", false, false));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData("'2014-10-01 12:00:00'");
     // NOTE: string representation will have the T added, it is an
     // implementation quirk of using JODA
@@ -500,7 +514,7 @@ public class TestCSVIntermediateDataFormat {
   public void testDateTimeWithFractionNoTimezoneWithCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new DateTime("1", true, false));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData("'2014-10-01 12:00:00.000'");
     // NOTE: string representation will have the T added, it is an
     // implementation quirk of using JODA
@@ -512,7 +526,7 @@ public class TestCSVIntermediateDataFormat {
   public void testDateTimeNoQuotesWithFractionTimezoneWithCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new DateTime("1", true, true));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     DateTimeZone zone = DateTimeZone.forID("America/New_York");
     org.joda.time.DateTime dateTime = new org.joda.time.DateTime(zone);
     dataFormat.setCSVTextData(dateTime.toString());
@@ -524,7 +538,7 @@ public class TestCSVIntermediateDataFormat {
   public void testDateTimeIncorrectFormatWithCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new DateTime("1", true, true));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData("'2014-3310-01 12:00:00.000'");
     dataFormat.getObjectData()[0].toString();
   }
@@ -533,7 +547,7 @@ public class TestCSVIntermediateDataFormat {
   public void testCurrentDateTime2WithFractionNoTimezoneWithCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new DateTime("1", true, false));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     // current date time
     org.joda.time.DateTime dateTime = new org.joda.time.DateTime();
     String dateTimeString = dtfWithFractionNoTimeZone.print(dateTime);
@@ -545,7 +559,7 @@ public class TestCSVIntermediateDataFormat {
   public void testDateTimeWithFractionAndTimeZoneWithCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new DateTime("1", true, true));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData("'2014-10-01 12:00:00.000-0400'");
     // NOTE: string representation will have the T added, it is an
     // implementation quirk of using JODA
@@ -556,7 +570,7 @@ public class TestCSVIntermediateDataFormat {
   public void testDateTimeWithFractionAndTimeZoneObjectInCSVTextOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new DateTime("1", true, true));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     DateTimeZone zone = DateTimeZone.forID("America/New_York");
     org.joda.time.DateTime dateTime = new org.joda.time.DateTime(2014, 10, 01, 12, 0, 0, 1, zone);
     Object[] in = { dateTime };
@@ -569,7 +583,7 @@ public class TestCSVIntermediateDataFormat {
   public void testLocalDateTimeWithObjectInCSVTextOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new DateTime("1", true, false));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     org.joda.time.LocalDateTime dateTime = new org.joda.time.LocalDateTime(2014, 10, 01, 12, 0, 0, 2);
     Object[] in = { dateTime };
     dataFormat.setObjectData(in);
@@ -581,7 +595,7 @@ public class TestCSVIntermediateDataFormat {
   public void testDateTimeFractionAndTimezoneWithCSVTextInObjectArrayOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new DateTime("1", true, true));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData("'2014-10-01 12:00:00.000-04:00'");
     DateTimeZone zone = DateTimeZone.forID("America/New_York");
     org.joda.time.DateTime edateTime = new org.joda.time.DateTime(2014, 10, 01, 12, 0, 0, 0, zone);
@@ -594,13 +608,11 @@ public class TestCSVIntermediateDataFormat {
 
   // **************test cases for BIT*******************
 
-  // **************test cases for BIT*******************
-
   @Test
   public void testBitTrueFalseWithCSVTextInAndCSVTextOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Bit("1"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
 
     for (String trueBit : new String[] { "true", "TRUE" }) {
       dataFormat.setCSVTextData(trueBit);
@@ -617,7 +629,7 @@ public class TestCSVIntermediateDataFormat {
   public void testBitWithCSVTextInAndCSVTextOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Bit("1"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData("1");
     assertEquals("1", dataFormat.getCSVTextData());
     dataFormat.setCSVTextData("0");
@@ -628,7 +640,7 @@ public class TestCSVIntermediateDataFormat {
   public void testBitWithObjectArrayInAndCSVTextOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] data = new Object[2];
     data[0] = Boolean.TRUE;
     data[1] = Boolean.FALSE;
@@ -640,7 +652,7 @@ public class TestCSVIntermediateDataFormat {
   public void testUnsupportedBitWithObjectArrayInAndCSVTextOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] data = new Object[2];
     data[0] = "1";
     data[1] = "2";
@@ -652,7 +664,7 @@ public class TestCSVIntermediateDataFormat {
   public void testBitWithObjectArrayInAndObjectOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] data = new Object[2];
     data[0] = Boolean.TRUE;
     data[1] = Boolean.FALSE;
@@ -669,7 +681,7 @@ public class TestCSVIntermediateDataFormat {
   public void testBitWithCSVTextInAndObjectArrayOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Bit("1"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
 
     for (String trueBit : new String[] { "true", "TRUE", "1" }) {
       dataFormat.setCSVTextData(trueBit);
@@ -686,7 +698,7 @@ public class TestCSVIntermediateDataFormat {
   public void testUnsupportedBitWithObjectArrayInAndObjectOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] data = new Object[2];
     data[0] = "1";
     data[1] = "2";
@@ -699,7 +711,7 @@ public class TestCSVIntermediateDataFormat {
   public void testUnsupportedBitWithCSVTextInAndObjectOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new Bit("1")).addColumn(new Bit("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData("1,3");
     assertEquals(true, dataFormat.getObjectData()[0]);
     assertEquals(false, dataFormat.getObjectData()[1]);
@@ -711,7 +723,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text")));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] givenArray = { "A", "B" };
     // create an array inside the object array
     Object[] data = new Object[2];
@@ -728,7 +740,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text")));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] givenArray = { "A", "B" };
     // create an array inside the object array
     Object[] data = new Object[2];
@@ -746,7 +758,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text")));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] givenArray = { "A", "B" };
     // create an array inside the object array
     Object[] data = new Object[2];
@@ -762,7 +774,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text")));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     String testData = "'[\"A\",\"B\"]','text'";
     dataFormat.setCSVTextData(testData);
     assertEquals(testData, dataFormat.getCSVTextData());
@@ -773,7 +785,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new Text("text")));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] givenArray = { "A''\"ssss", "Bss###''" };
     // create an array inside the object array
     Object[] data = new Object[2];
@@ -790,7 +802,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new FixedPoint("fn")));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] givenArray = { 1, 2 };
     // create an array inside the object array
     Object[] data = new Object[2];
@@ -807,7 +819,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new org.apache.sqoop.schema.type.Array("1", new FixedPoint("fn")));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     List<Integer> givenList = new ArrayList<Integer>();
     givenList.add(1);
     givenList.add(1);
@@ -825,7 +837,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new org.apache.sqoop.schema.type.Set("1", new FixedPoint("fn")));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Set<Integer> givenSet = new HashSet<Integer>();
     givenSet.add(1);
     givenSet.add(3);
@@ -845,7 +857,7 @@ public class TestCSVIntermediateDataFormat {
     schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
         new org.apache.sqoop.schema.type.Decimal("deci")));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] givenArray = { 1.22, 2.444 };
     // create an array inside the object array
     Object[] data = new Object[2];
@@ -863,7 +875,7 @@ public class TestCSVIntermediateDataFormat {
     schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
         new org.apache.sqoop.schema.type.Array("array", new FixedPoint("ft"))));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] givenArrayOne = { 11, 12 };
     Object[] givenArrayTwo = { 14, 15 };
 
@@ -889,7 +901,7 @@ public class TestCSVIntermediateDataFormat {
     schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
         new org.apache.sqoop.schema.type.Array("array", new FixedPoint("ft"))));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] givenArrayOne = { 11, 12 };
     Object[] givenArrayTwo = { 14, 15 };
 
@@ -914,7 +926,7 @@ public class TestCSVIntermediateDataFormat {
     schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
         new org.apache.sqoop.schema.type.Array("array", new FixedPoint("ft"))));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     String input = "'[[11, 12],[14, 15]]','text'";
     dataFormat.setCSVTextData(input);
     Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
@@ -928,7 +940,7 @@ public class TestCSVIntermediateDataFormat {
     schema.addColumn(new org.apache.sqoop.schema.type.Array("1",
         new org.apache.sqoop.schema.type.Array("array", new FixedPoint("ft"))));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Object[] givenArrayOne = { 11, 12 };
     Object[] givenArrayTwo = { 14, 15 };
 
@@ -951,7 +963,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Map<Object, Object> map = new HashMap<Object, Object>();
     map.put("testKey", "testValue");
     // create an array inside the object array
@@ -971,7 +983,7 @@ public class TestCSVIntermediateDataFormat {
     schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value",
         new FixedPoint("number"))));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Map<Object, Object> givenMap = new HashMap<Object, Object>();
     List<Integer> intList = new ArrayList<Integer>();
     intList.add(11);
@@ -994,7 +1006,7 @@ public class TestCSVIntermediateDataFormat {
     schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value",
         new Text("text"))));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Map<Object, Object> givenMap = new HashMap<Object, Object>();
     List<String> stringList = new ArrayList<String>();
     stringList.add("A");
@@ -1017,7 +1029,7 @@ public class TestCSVIntermediateDataFormat {
     schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Array("value",
         new Text("text"))));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Map<Object, Object> givenMap = new HashMap<Object, Object>();
     List<String> stringList = new ArrayList<String>();
     stringList.add("A");
@@ -1041,7 +1053,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Map<Object, Object> givenMap = new HashMap<Object, Object>();
     givenMap.put("testKey", "testValue");
     Object[] data = new Object[2];
@@ -1060,7 +1072,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Map<Object, Object> givenMap = new HashMap<Object, Object>();
     givenMap.put("testKey", "testValue");
     Object[] data = new Object[2];
@@ -1079,7 +1091,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     Map<Object, Object> givenMap = new HashMap<Object, Object>();
     givenMap.put("testKey", "testValue");
     Object[] data = new Object[2];
@@ -1095,7 +1107,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new org.apache.sqoop.schema.type.Map("1", new Text("key"), new Text("value")));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     String testData = "'{\"testKey\":\"testValue\"}','text'";
     dataFormat.setCSVTextData(testData);
     assertEquals(testData, dataFormat.getCSVTextData());
@@ -1106,7 +1118,7 @@ public class TestCSVIntermediateDataFormat {
     String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
         + ",'\\n'";
     Schema schema = new Schema("Test");
-    dataFormat.setSchema(schema);
+    dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData(testData);
 
     @SuppressWarnings("unused")
@@ -1115,13 +1127,14 @@ public class TestCSVIntermediateDataFormat {
 
   @Test(expectedExceptions = SqoopException.class)
   public void testNullSchema() {
-    dataFormat.setSchema(null);
+    dataFormat = new CSVIntermediateDataFormat(null);
     @SuppressWarnings("unused")
     Object[] out = dataFormat.getObjectData();
   }
 
   @Test(expectedExceptions = SqoopException.class)
   public void testNotSettingSchema() {
+    dataFormat = new CSVIntermediateDataFormat();
     @SuppressWarnings("unused")
     Object[] out = dataFormat.getObjectData();
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/89dcbe87/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
index ab2178e..4945584 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
@@ -88,12 +88,12 @@ public class TestMatching {
     Schema from2 = new Schema("FROM-2");
     Schema to2 = new Schema("TO-2");
 
-    from1.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+    from1.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2"))
         .addColumn(new org.apache.sqoop.schema.type.Text("3"));
-    to1.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+    to1.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2"))
       .addColumn(new org.apache.sqoop.schema.type.Text("3"));
-    from2.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"));
-    to2.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"));
+    from2.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2"));
+    to2.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2"));
 
     parameters.add(new Object[]{
         emptyFrom,
@@ -160,7 +160,6 @@ public class TestMatching {
 
   @Test
   public void testSchemalessFromAndTo() throws UnsupportedEncodingException {
-    CSVIntermediateDataFormat dataFormat = new CSVIntermediateDataFormat();
     String testData = "\"This is the data you are looking for. It has no structure.\"";
     Object[] testObject = new Object[] {testData.getBytes(BYTE_FIELD_CHARSET)};
     Object[] testObjectCopy = new Object[1];
@@ -169,8 +168,7 @@ public class TestMatching {
     Matcher matcher = MatcherFactory.getMatcher(NullSchema.getInstance(),
             NullSchema.getInstance());
     // Checking FROM side only because currently that is the only IDF that is used
-    dataFormat.setSchema(matcher.getFromSchema());
-
+    CSVIntermediateDataFormat dataFormat = new CSVIntermediateDataFormat(matcher.getFromSchema());
     // Setting data as CSV and validating getting CSV and object
     dataFormat.setCSVTextData(testData);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/89dcbe87/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
index 087d7d3..6a14201 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
@@ -34,6 +34,8 @@ import java.io.InputStream;
 
 import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
 import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Text;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -85,17 +87,18 @@ public class TestSqoopWritable {
   // it existed before.
   @Test
   public void testWriteAndReadFields() throws IOException {
+    Schema schema = new Schema("test").addColumn(new Text("t"));
     String testData = "You shall not pass";
     ByteArrayOutputStream ostream = new ByteArrayOutputStream();
     DataOutput out = new DataOutputStream(ostream);
-    SqoopWritable writableOne = new SqoopWritable(new CSVIntermediateDataFormat());
+    SqoopWritable writableOne = new SqoopWritable(new CSVIntermediateDataFormat(schema));
     writableOne.setString(testData);
     writableOne.write(out);
     byte[] written = ostream.toByteArray();
 
     // Don't test what the data is, test that SqoopWritable can read it.
     InputStream instream = new ByteArrayInputStream(written);
-    SqoopWritable writableTwo = new SqoopWritable(new CSVIntermediateDataFormat());
+    SqoopWritable writableTwo = new SqoopWritable(new CSVIntermediateDataFormat(schema));
     DataInput in = new DataInputStream(instream);
     writableTwo.readFields(in);
     assertEquals(writableOne.toString(), writableTwo.toString());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/89dcbe87/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index 7ddaa10..41ea24a 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -39,6 +39,8 @@ import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.job.io.SqoopWritable;
 import org.apache.sqoop.job.util.MRJobTestUtil;
 import org.apache.sqoop.schema.NullSchema;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Text;
 import org.apache.sqoop.submission.counter.SqoopCounters;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -145,8 +147,12 @@ public class TestSqoopOutputFormatLoadExecutor {
 
   }
   // TODO:SQOOP-1873: Mock objects instead
-  private IntermediateDataFormat<?> getIDF(){
-    return new CSVIntermediateDataFormat();
+  private IntermediateDataFormat<?> getIDF() {
+    return new CSVIntermediateDataFormat(getSchema());
+  }
+
+  private Schema getSchema() {
+    return new Schema("test").addColumn(new Text("t"));
   }
 
   @BeforeMethod

http://git-wip-us.apache.org/repos/asf/sqoop/blob/89dcbe87/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java
index d498850..ce39a78 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java
@@ -70,7 +70,7 @@ public class MRJobTestUtil {
 
   public static Schema getTestSchema() {
     Schema schema = new Schema("Test");
-    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+    schema.addColumn(new FixedPoint("1").setByteSize(2L)).addColumn(new FloatingPoint("2"))
         .addColumn(new Text("3"));
     return schema;
   }