You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/01/06 03:57:17 UTC

sqoop git commit: SQOOP-1901: Sqoop2: Support DRY code in IDF impementations and add JSONIDF

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 c7ef89dba -> 4ffa806ba


SQOOP-1901: Sqoop2: Support DRY code in IDF impementations and add JSONIDF

(Veena Basavaraj via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/4ffa806b
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/4ffa806b
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/4ffa806b

Branch: refs/heads/sqoop2
Commit: 4ffa806bace0fbe7cb922ef6d80ae0c5d891bfca
Parents: c7ef89d
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Mon Jan 5 17:34:12 2015 -0800
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Mon Jan 5 18:56:45 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/sqoop/schema/Schema.java    |  29 +-
 .../org/apache/sqoop/schema/SchemaError.java    |   2 +
 connector/connector-sdk/pom.xml                 |   4 +
 .../sqoop/connector/common/SqoopIDFUtils.java   | 172 +++++---
 .../idf/CSVIntermediateDataFormat.java          | 177 ++------
 .../idf/CSVIntermediateDataFormatError.java     |  17 +-
 .../connector/idf/IntermediateDataFormat.java   | 112 ++++-
 .../idf/IntermediateDataFormatError.java        |  50 +++
 .../idf/JSONIntermediateDataFormat.java         | 405 +++++++++++++++++++
 .../idf/JSONIntermediateDataFormatError.java    |  49 +++
 .../idf/TestCSVIntermediateDataFormat.java      |  24 +-
 .../idf/TestJSONIntermediateDataFormat.java     | 309 ++++++++++++++
 pom.xml                                         |  16 +-
 13 files changed, 1135 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/common/src/main/java/org/apache/sqoop/schema/Schema.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/Schema.java b/common/src/main/java/org/apache/sqoop/schema/Schema.java
index 189dbe9..bc14bcc 100644
--- a/common/src/main/java/org/apache/sqoop/schema/Schema.java
+++ b/common/src/main/java/org/apache/sqoop/schema/Schema.java
@@ -23,12 +23,15 @@ import org.apache.sqoop.schema.type.Column;
 
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
- * Schema represents the data fields that are transferred between {@link #From} and {@link #To}
+ * Schema represents the data fields that are transferred between {@link #From}
+ * and {@link #To}
  */
 public class Schema {
 
@@ -57,10 +60,22 @@ public class Schema {
    */
   private Set<String> columNames;
 
+  /**
+   * Helper map to map column name to index
+   */
+  private Map<String, Integer> nameToIndexMap;
+
+  /**
+   * column global index
+   */
+
+  int columnIndex;
+
   private Schema() {
     creationDate = new Date();
     columns = new ArrayList<Column>();
     columNames = new HashSet<String>();
+    nameToIndexMap = new HashMap<String, Integer>();
   }
 
   public Schema(String name) {
@@ -85,11 +100,10 @@ public class Schema {
     if(columNames.contains(column.getName())) {
       throw new SqoopException(SchemaError.SCHEMA_0002, "Column: " + column);
     }
-
     columNames.add(column.getName());
-
     columns.add(column);
-
+    nameToIndexMap.put(column.getName(), columnIndex);
+    columnIndex ++;
     return this;
   }
 
@@ -127,6 +141,13 @@ public class Schema {
     return columns.size();
   }
 
+  public Integer getColumnNameIndex(String name) {
+    if (columNames.contains(name)) {
+      return nameToIndexMap.get(name);
+    }
+    throw new SqoopException(SchemaError.SCHEMA_0007, "Column: " + name);
+  }
+
   public boolean isEmpty() {
     return columns.size() == 0;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/common/src/main/java/org/apache/sqoop/schema/SchemaError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/SchemaError.java b/common/src/main/java/org/apache/sqoop/schema/SchemaError.java
index 33e4672..6b7fb48 100644
--- a/common/src/main/java/org/apache/sqoop/schema/SchemaError.java
+++ b/common/src/main/java/org/apache/sqoop/schema/SchemaError.java
@@ -38,6 +38,8 @@ public enum SchemaError implements ErrorCode {
 
   SCHEMA_0006("Schema without name"),
 
+  SCHEMA_0007("Unknown column name"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/pom.xml b/connector/connector-sdk/pom.xml
index 38c217a..46cb9f8 100644
--- a/connector/connector-sdk/pom.xml
+++ b/connector/connector-sdk/pom.xml
@@ -37,6 +37,10 @@ limitations under the License.
       <artifactId>joda-time</artifactId>
     </dependency>
     <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
index 48adae1..979aa4f 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.connector.common;
 
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.idf.CSVIntermediateDataFormatError;
+import org.apache.sqoop.connector.idf.IntermediateDataFormatError;
 import org.apache.sqoop.schema.type.AbstractComplexListType;
 import org.apache.sqoop.schema.type.Column;
 import org.apache.sqoop.schema.type.ColumnType;
@@ -38,6 +39,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -87,16 +89,25 @@ public class SqoopIDFUtils {
   // only date, no time
   public static final DateTimeFormatter df = DateTimeFormat.forPattern("yyyy-MM-dd");
   // time with fraction only, no timezone
-  public static final DateTimeFormatter tfWithFraction = DateTimeFormat.forPattern("HH:mm:ss.SSSSSS");
+  public static final DateTimeFormatter tfWithFraction = DateTimeFormat.forPattern("HH:mm:ss.SSS");
   public static final DateTimeFormatter tfWithNoFraction = DateTimeFormat.forPattern("HH:mm:ss");
 
-  static final String[] TRUE_BIT_VALUES = new String[] { "1", "true", "TRUE" };
-  static final Set<String> TRUE_BIT_SET = new HashSet<String>(Arrays.asList(TRUE_BIT_VALUES));
-  static final String[] FALSE_BIT_VALUES = new String[] { "0", "false", "FALSE" };
-  static final Set<String> FALSE_BIT_SET = new HashSet<String>(Arrays.asList(FALSE_BIT_VALUES));
+  public static final String[] TRUE_BIT_VALUES = new String[] { "1", "true", "TRUE" };
+  public static final Set<String> TRUE_BIT_SET = new HashSet<String>(Arrays.asList(TRUE_BIT_VALUES));
+  public static final String[] FALSE_BIT_VALUES = new String[] { "0", "false", "FALSE" };
+  public static final Set<String> FALSE_BIT_SET = new HashSet<String>(Arrays.asList(FALSE_BIT_VALUES));
 
   // ******** Number Column Type utils***********
 
+  public static String encodeToCSVFixedPoint(Object obj, Column column) {
+    Long byteSize = ((FixedPoint) column).getByteSize();
+    if (byteSize != null && byteSize <= Integer.SIZE) {
+      return ((Integer) obj).toString();
+    } else {
+      return ((Long) obj).toString();
+    }
+  }
+
   public static Object toFixedPoint(String csvString, Column column) {
     Object returnValue;
     Long byteSize = ((FixedPoint) column).getByteSize();
@@ -108,6 +119,15 @@ public class SqoopIDFUtils {
     return returnValue;
   }
 
+  public static String encodeToCSVFloatingPoint(Object obj, Column column) {
+    Long byteSize = ((FloatingPoint) column).getByteSize();
+    if (byteSize != null && byteSize <= Float.SIZE) {
+      return ((Float) obj).toString();
+    } else {
+      return ((Double) obj).toString();
+    }
+  }
+
   public static Object toFloatingPoint(String csvString, Column column) {
     Object returnValue;
     Long byteSize = ((FloatingPoint) column).getByteSize();
@@ -119,43 +139,44 @@ public class SqoopIDFUtils {
     return returnValue;
   }
 
+  public static String encodeToCSVDecimal(Object obj) {
+     return ((BigDecimal)obj).toString();
+  }
   public static Object toDecimal(String csvString, Column column) {
     return new BigDecimal(csvString);
   }
 
   // ********** BIT Column Type utils******************
-  public static void encodeToCSVBit(Object[] objectArray, int i) {
-    String bitStringValue = objectArray[i].toString();
+  public static String encodeToCSVBit(Object obj) {
+    String bitStringValue = obj.toString();
     if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
-      objectArray[i] = bitStringValue;
+      return bitStringValue;
     } else {
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: "
-          + objectArray[i]);
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006, " given bit value: " + bitStringValue);
     }
   }
 
-  public static Object toBit(String csvString, Object returnValue) {
+  public static Object toBit(String csvString) {
     if ((TRUE_BIT_SET.contains(csvString)) || (FALSE_BIT_SET.contains(csvString))) {
-      returnValue = TRUE_BIT_SET.contains(csvString);
+      return TRUE_BIT_SET.contains(csvString);
     } else {
       // throw an exception for any unsupported value for BITs
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + csvString);
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006, " given bit value: " + csvString);
     }
-    return returnValue;
   }
 
   // *********** DATE and TIME Column Type utils **********
 
-  public static void encodeToCSVDate(Object[] objectArray, int i) {
-    org.joda.time.LocalDate date = (org.joda.time.LocalDate) objectArray[i];
-    objectArray[i] = encloseWithQuote(df.print(date));
+  public static String encodeToCSVDate(Object obj) {
+    org.joda.time.LocalDate date = (org.joda.time.LocalDate) obj;
+    return encloseWithQuote(df.print(date));
   }
 
-  public static void encodeToCSVTime(Object[] objectArray, int i, Column col) {
+  public static String encodeToCSVTime(Object obj, Column col) {
     if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) {
-      objectArray[i] = encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) objectArray[i]));
+      return encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) obj));
     } else {
-      objectArray[i] = encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) objectArray[i]));
+      return encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) obj));
     }
   }
 
@@ -169,31 +190,34 @@ public class SqoopIDFUtils {
 
   // *********** DATE TIME Column Type utils **********
 
-  public static void encodeToCSVLocalDateTime(Object[] objectArray, int i, Column col, org.joda.time.LocalDateTime localDateTime) {
+  public static String encodeToCSVLocalDateTime(Object obj, Column col) {
+    org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) obj;
     org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
     if (column.hasFraction()) {
-      objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(localDateTime));
+      return encloseWithQuote(dtfWithFractionNoTimeZone.print(localDateTime));
     } else {
-      objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(localDateTime));
+      return encloseWithQuote(dtfWithNoFractionAndTimeZone.print(localDateTime));
     }
   }
 
-  public static void encodeToCSVDateTime(Object[] objectArray, int i, Column col, org.joda.time.DateTime dateTime) {
+
+  public static String encodeToCSVDateTime(Object obj, Column col) {
+    org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj;
     org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
     if (column.hasFraction() && column.hasTimezone()) {
-      objectArray[i] = encloseWithQuote(dtfWithFractionAndTimeZone.print(dateTime));
+      return encloseWithQuote(dtfWithFractionAndTimeZone.print(dateTime));
     } else if (column.hasFraction() && !column.hasTimezone()) {
-      objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(dateTime));
+      return encloseWithQuote(dtfWithFractionNoTimeZone.print(dateTime));
     } else if (column.hasTimezone()) {
-      objectArray[i] = encloseWithQuote(dtfWithNoFractionWithTimeZone.print(dateTime));
+      return encloseWithQuote(dtfWithNoFractionWithTimeZone.print(dateTime));
     } else {
-      objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(dateTime));
+      return encloseWithQuote(dtfWithNoFractionAndTimeZone.print(dateTime));
     }
   }
 
-  public static Object toDateTime(String fieldString, Column column) {
+  public static Object toDateTime(String csvString, Column column) {
     Object returnValue;
-    String dateTime = removeQuotes(fieldString);
+    String dateTime = removeQuotes(csvString);
     org.apache.sqoop.schema.type.DateTime col = ((org.apache.sqoop.schema.type.DateTime) column);
     if (col.hasFraction() && col.hasTimezone()) {
       // After calling withOffsetParsed method, a string
@@ -227,7 +251,7 @@ public class SqoopIDFUtils {
     try {
       object = (JSONObject) new JSONParser().parse(removeQuotes(csvString));
     } catch (org.json.simple.parser.ParseException e) {
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e);
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0003, e);
     }
     if (object != null) {
       return toMap(object);
@@ -235,7 +259,7 @@ public class SqoopIDFUtils {
     return null;
   }
 
-  private static List<Object> toList(JSONArray array) {
+  public static List<Object> toList(JSONArray array) {
     List<Object> list = new ArrayList<Object>();
     for (int i = 0; i < array.size(); i++) {
       Object value = array.get(i);
@@ -252,7 +276,7 @@ public class SqoopIDFUtils {
   }
 
   @SuppressWarnings("unchecked")
-  private static Map<Object, Object> toMap(JSONObject object) {
+  public static Map<Object, Object> toMap(JSONObject object) {
     Map<Object, Object> elementMap = new HashMap<Object, Object>();
     Set<Map.Entry<Object, Object>> entries = object.entrySet();
     for (Map.Entry<Object, Object> entry : entries) {
@@ -273,13 +297,18 @@ public class SqoopIDFUtils {
   // ************ LIST Column Type utils*********
 
   @SuppressWarnings("unchecked")
-  public static String encodeToCSVList(Object[] list, AbstractComplexListType column) {
+  public static String encodeToCSVList(Object[] list, Column column) {
     List<Object> elementList = new ArrayList<Object>();
     for (int n = 0; n < list.length; n++) {
       Column listType = ((AbstractComplexListType) column).getListType();
+      //2 level nesting supported
       if (isColumnListType(listType)) {
         Object[] listElements = (Object[]) list[n];
-        elementList.add((Arrays.deepToString(listElements)));
+        JSONArray subArray = new JSONArray();
+        for (int i = 0; i < listElements.length; i++) {
+          subArray.add(listElements[i]);
+        }
+        elementList.add(subArray);
       } else {
         elementList.add(list[n]);
       }
@@ -295,7 +324,7 @@ public class SqoopIDFUtils {
     try {
       array = (JSONArray) new JSONParser().parse(removeQuotes(csvString));
     } catch (org.json.simple.parser.ParseException e) {
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e);
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0003, e);
     }
     if (array != null) {
       return array.toArray();
@@ -321,31 +350,32 @@ public class SqoopIDFUtils {
         replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j]));
       }
     } catch (Exception e) {
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + "  " + replacement
-          + "  " + String.valueOf(j) + "  " + e.getMessage());
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + "  " + replacement + "  "
+          + String.valueOf(j) + "  " + e.getMessage());
     }
     return encloseWithQuote(replacement);
   }
 
-  public static String toText(String csvString) {
+  public static String toText(String string) {
     // Remove the trailing and starting quotes.
-    csvString = removeQuotes(csvString);
+    string = removeQuotes(string);
     int j = 0;
     try {
       for (j = 0; j < replacements.length; j++) {
-        csvString = csvString.replaceAll(getRegExp(replacements[j]), Matcher.quoteReplacement(String.valueOf(originals[j])));
+        string = string.replaceAll(getRegExp(replacements[j]), Matcher.quoteReplacement(String.valueOf(originals[j])));
       }
     } catch (Exception e) {
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, csvString + "  "
-          + String.valueOf(j) + e.getMessage());
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, string + "  " + String.valueOf(j)
+          + e.getMessage());
     }
 
-    return csvString;
+    return string;
   }
 
   // ************ BINARY Column type utils*********
 
-  public static String encodeToCSVByteArray(byte[] bytes) {
+  public static String encodeToCSVByteArray(Object obj) {
+    byte[] bytes = (byte[]) obj;
     try {
       return encodeToCSVString(new String(bytes, BYTE_FIELD_CHARSET));
     } catch (UnsupportedEncodingException e) {
@@ -393,4 +423,56 @@ public class SqoopIDFUtils {
     return stringType.getType().equals(ColumnType.TEXT) || stringType.getType().equals(ColumnType.ENUM);
   }
 
+  // ******* parse sqoop CSV ********
+  /**
+   * Custom CSV Text parser that honors quoting and escaped quotes.
+   *
+   * @return String[]
+   */
+  public static String[] parseCSVString(String csvText) {
+    if (csvText == null) {
+      return null;
+    }
+
+    boolean quoted = false;
+    boolean escaped = false;
+
+    List<String> parsedData = new LinkedList<String>();
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < csvText.length(); ++i) {
+      char c = csvText.charAt(i);
+      switch (c) {
+      case QUOTE_CHARACTER:
+        builder.append(c);
+        if (escaped) {
+          escaped = false;
+        } else {
+          quoted = !quoted;
+        }
+        break;
+      case ESCAPE_CHARACTER:
+        builder.append(ESCAPE_CHARACTER);
+        escaped = !escaped;
+        break;
+      case CSV_SEPARATOR_CHARACTER:
+        if (quoted) {
+          builder.append(c);
+        } else {
+          parsedData.add(builder.toString());
+          builder = new StringBuilder();
+        }
+        break;
+      default:
+        if (escaped) {
+          escaped = false;
+        }
+        builder.append(c);
+        break;
+      }
+    }
+    parsedData.add(builder.toString());
+
+    return parsedData.toArray(new String[parsedData.size()]);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index ad1dc04..be1147d 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -26,7 +26,6 @@ import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.AbstractComplexListType;
 import org.apache.sqoop.schema.type.Column;
-import org.apache.sqoop.schema.type.ColumnType;
 import org.apache.sqoop.utils.ClassUtils;
 import org.joda.time.DateTime;
 import org.joda.time.LocalDate;
@@ -38,8 +37,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -56,25 +53,12 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
 
   public static final Logger LOG = Logger.getLogger(CSVIntermediateDataFormat.class);
 
-  private final Set<Integer> stringTypeColumnIndices = new HashSet<Integer>();
-  private final Set<Integer> bitTypeColumnIndices = new HashSet<Integer>();
-  private final Set<Integer> byteTypeColumnIndices = new HashSet<Integer>();
-  private final Set<Integer> listTypeColumnIndices = new HashSet<Integer>();
-  private final Set<Integer> mapTypeColumnIndices = new HashSet<Integer>();
-  private final Set<Integer> dateTimeTypeColumnIndices = new HashSet<Integer>();
-  private final Set<Integer> dateTypeColumnIndices = new HashSet<Integer>();
-  private final Set<Integer> timeColumnIndices = new HashSet<Integer>();
-
-
-  private Schema schema;
-
   public CSVIntermediateDataFormat() {
   }
 
   public CSVIntermediateDataFormat(Schema schema) {
     setSchema(schema);
   }
-
   /**
    * {@inheritDoc}
    */
@@ -87,129 +71,45 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
    * {@inheritDoc}
    */
   @Override
-  public void setCSVTextData(String text) {
-    this.data = text;
+  public void setCSVTextData(String csvText) {
+    this.data = csvText;
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public void setSchema(Schema schema) {
-    if (schema == null) {
-      return;
-    }
-    this.schema = schema;
-    Column[] columns = schema.getColumnsArray();
-    int i = 0;
-    for (Column col : columns) {
-      if (isColumnStringType(col)) {
-        stringTypeColumnIndices.add(i);
-      } else if (col.getType() == ColumnType.BIT) {
-        bitTypeColumnIndices.add(i);
-      } else if (col.getType() == ColumnType.DATE) {
-        dateTypeColumnIndices.add(i);
-      } else if (col.getType() == ColumnType.TIME) {
-        timeColumnIndices.add(i);
-      } else if (col.getType() == ColumnType.DATE_TIME) {
-        dateTimeTypeColumnIndices.add(i);
-      } else if (col.getType() == ColumnType.BINARY) {
-        byteTypeColumnIndices.add(i);
-      } else if (isColumnListType(col)) {
-        listTypeColumnIndices.add(i);
-      } else if (col.getType() == ColumnType.MAP) {
-        mapTypeColumnIndices.add(i);
-      }
-      i++;
-    }
-  }
-
-  /**
-   * Custom CSV Text parser that honors quoting and escaped quotes.
-   *
-   * @return String[]
-   */
-  private String[] parseCSVString() {
-    if (data == null) {
-      return null;
-    }
-
-    boolean quoted = false;
-    boolean escaped = false;
-
-    List<String> parsedData = new LinkedList<String>();
-    StringBuilder builder = new StringBuilder();
-    for (int i = 0; i < data.length(); ++i) {
-      char c = data.charAt(i);
-      switch (c) {
-      case QUOTE_CHARACTER:
-        builder.append(c);
-        if (escaped) {
-          escaped = false;
-        } else {
-          quoted = !quoted;
-        }
-        break;
-      case ESCAPE_CHARACTER:
-        builder.append(ESCAPE_CHARACTER);
-        escaped = !escaped;
-        break;
-      case CSV_SEPARATOR_CHARACTER:
-        if (quoted) {
-          builder.append(c);
-        } else {
-          parsedData.add(builder.toString());
-          builder = new StringBuilder();
-        }
-        break;
-      default:
-        if (escaped) {
-          escaped = false;
-        }
-        builder.append(c);
-        break;
-      }
-    }
-    parsedData.add(builder.toString());
-
-    return parsedData.toArray(new String[parsedData.size()]);
-  }
-
-  /**
-   * Converts the CSV String array into actual object array based on its
-   * corresponding column type {@inheritDoc}
-   */
-  @Override
   public Object[] getObjectData() {
     if (schema == null || schema.isEmpty()) {
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006);
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002);
     }
 
     // fieldStringArray represents the csv fields parsed into string array
-    String[] fieldStringArray = parseCSVString();
+    String[] csvStringArray = parseCSVString(this.data);
 
-    if (fieldStringArray == null) {
+    if (csvStringArray == null) {
       return null;
     }
 
-    if (fieldStringArray.length != schema.getColumnsCount()) {
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0005,
-          "The data " + getCSVTextData() + " has the wrong number of fields.");
+    if (csvStringArray.length != schema.getColumnsArray().length) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + getCSVTextData()
+          + " has the wrong number of fields.");
     }
 
-    Object[] objectArray = new Object[fieldStringArray.length];
+    Object[] objectArray = new Object[csvStringArray.length];
     Column[] columnArray = schema.getColumnsArray();
-    for (int i = 0; i < fieldStringArray.length; i++) {
+    for (int i = 0; i < csvStringArray.length; i++) {
       // check for NULL field and bail out immediately
-      if (fieldStringArray[i].equals(NULL_VALUE)) {
+      if (csvStringArray[i].equals(NULL_VALUE)) {
         objectArray[i] = null;
         continue;
       }
-      objectArray[i] = toObject(fieldStringArray[i], columnArray[i]);
+      objectArray[i] = toObject(csvStringArray[i], columnArray[i]);
     }
     return objectArray;
   }
 
+
   private Object toObject(String csvString, Column column) {
     Object returnValue = null;
 
@@ -242,7 +142,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
       returnValue = toDateTime(csvString, column);
       break;
     case BIT:
-      returnValue = toBit(csvString, returnValue);
+      returnValue = toBit(csvString);
       break;
     case ARRAY:
     case SET:
@@ -258,14 +158,12 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     return returnValue;
   }
 
- 
-
   /**
-   * Appends the actual java objects into CSV string {@inheritDoc}
+   * {@inheritDoc}
    */
   @Override
   public void setObjectData(Object[] data) {
-   Set<Integer> nullValueIndices = new HashSet<Integer>();
+    Set<Integer> nullValueIndices = new HashSet<Integer>();
     Column[] columnArray = schema.getColumnsArray();
     // check for null
     for (int i = 0; i < data.length; i++) {
@@ -277,6 +175,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     // ignore the null values while encoding the object array into csv string
     encodeToCSVText(data, columnArray, nullValueIndices);
     this.data = StringUtils.join(data, CSV_SEPARATOR_CHARACTER);
+
   }
 
   /**
@@ -288,6 +187,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
   }
 
   /**
+   *
    * {@inheritDoc}
    */
   @Override
@@ -296,30 +196,18 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
   }
 
   /**
-   * {@inheritDoc}
+   * Encode to the sqoop prescribed CSV String for every element in the object
+   * array
+   *
+   * @param objectArray
+   * @param columnArray
+   * @param nullValueIndices
    */
-  @Override
-  public boolean equals(Object other) {
-    if (this == other) {
-      return true;
-    }
-    if (other == null || !(other instanceof CSVIntermediateDataFormat)) {
-      return false;
-    }
-    return data.equals(((CSVIntermediateDataFormat) other).data);
-  }
-
- /**
-  * Encode to the sqoop prescribed CSV String for every element in the objet array
-  * @param objectArray
-  * @param columnArray
-  * @param nullValueIndices
-  */
   @SuppressWarnings("unchecked")
   private void encodeToCSVText(Object[] objectArray, Column[] columnArray, Set<Integer> nullValueIndices) {
     for (int i : bitTypeColumnIndices) {
       if (!nullValueIndices.contains(i)) {
-        encodeToCSVBit(objectArray, i);
+        objectArray[i] = encodeToCSVBit(objectArray[i]);
       }
     }
     for (int i : stringTypeColumnIndices) {
@@ -333,22 +221,22 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
         if (objectArray[i] instanceof org.joda.time.DateTime) {
           org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i];
           // check for fraction and time zone and then use the right formatter
-          encodeToCSVDateTime(objectArray, i, col, dateTime);
+          objectArray[i] = encodeToCSVDateTime(dateTime, col);
         } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
           org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i];
-          encodeToCSVLocalDateTime(objectArray, i, col, localDateTime);
+          objectArray[i] = encodeToCSVLocalDateTime(localDateTime, col);
         }
       }
     }
     for (int i : dateTypeColumnIndices) {
       if (!nullValueIndices.contains(i)) {
-        encodeToCSVDate(objectArray, i);
+        objectArray[i] = encodeToCSVDate(objectArray[i]);
       }
     }
-    for (int i : timeColumnIndices) {
+    for (int i : timeTypeColumnIndices) {
       Column col = columnArray[i];
       if (!nullValueIndices.contains(i)) {
-        encodeToCSVTime(objectArray, i, col);
+        objectArray[i] = encodeToCSVTime(objectArray[i], col);
       }
     }
     for (int i : byteTypeColumnIndices) {
@@ -358,7 +246,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     }
     for (int i : listTypeColumnIndices) {
       if (!nullValueIndices.contains(i)) {
-        objectArray[i] = encodeToCSVList((Object[]) objectArray[i], (AbstractComplexListType)columnArray[i]);
+        objectArray[i] = encodeToCSVList((Object[]) objectArray[i], (AbstractComplexListType) columnArray[i]);
       }
     }
     for (int i : mapTypeColumnIndices) {
@@ -384,5 +272,4 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     jars.add(ClassUtils.jarForClass(JSONValue.class));
     return jars;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
index 884550d..a88db45 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
@@ -22,7 +22,7 @@ import org.apache.sqoop.common.ErrorCode;
 
 public enum CSVIntermediateDataFormatError implements ErrorCode {
   /** An unknown error has occurred. */
-  INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
+  CSV_INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
 
   /** An encoding is missing in the Java native libraries. */
   CSV_INTERMEDIATE_DATA_FORMAT_0001("Native character set error."),
@@ -36,17 +36,14 @@ public enum CSVIntermediateDataFormatError implements ErrorCode {
   /** Column type isn't known by Intermediate Data Format. */
   CSV_INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
 
-  /** Number of columns in schema does not match the data set. */
-  CSV_INTERMEDIATE_DATA_FORMAT_0005("Wrong number of columns."),
-
-  /** Schema is missing in the IDF. */
-  CSV_INTERMEDIATE_DATA_FORMAT_0006("Schema missing."),
-
-  /** For arrays and maps we use JSON representation and incorrect representation results in parse exception*/
-  CSV_INTERMEDIATE_DATA_FORMAT_0008("JSON parse internal error."),
+  /**
+   * For arrays and maps we use JSON representation and incorrect representation
+   * results in parse exception
+   */
+  CSV_INTERMEDIATE_DATA_FORMAT_0005("JSON parse internal error."),
 
   /** Unsupported bit values */
-  CSV_INTERMEDIATE_DATA_FORMAT_0009("Unsupported bit value."),
+  CSV_INTERMEDIATE_DATA_FORMAT_0006("Unsupported bit value."),
 
   ;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index 7ac44dc..adeb2ec 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -18,12 +18,21 @@
  */
 package org.apache.sqoop.connector.idf;
 
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
+
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.AbstractComplexListType;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.ColumnType;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -44,9 +53,16 @@ public abstract class IntermediateDataFormat<T> {
 
   protected volatile T data;
 
-  public int hashCode() {
-    return data.hashCode();
-  }
+  protected Schema schema;
+
+  protected final Set<Integer> stringTypeColumnIndices = new HashSet<Integer>();
+  protected final Set<Integer> bitTypeColumnIndices = new HashSet<Integer>();
+  protected final Set<Integer> byteTypeColumnIndices = new HashSet<Integer>();
+  protected final Set<Integer> listTypeColumnIndices = new HashSet<Integer>();
+  protected final Set<Integer> mapTypeColumnIndices = new HashSet<Integer>();
+  protected final Set<Integer> dateTimeTypeColumnIndices = new HashSet<Integer>();
+  protected final Set<Integer> dateTypeColumnIndices = new HashSet<Integer>();
+  protected final Set<Integer> timeTypeColumnIndices = new HashSet<Integer>();
 
 
   /**
@@ -68,6 +84,7 @@ public abstract class IntermediateDataFormat<T> {
   public void setData(T obj) {
     this.data = obj;
   }
+
   /**
    * Get one row of data as CSV text. Use {@link #SqoopIDFUtils} for reading and writing
    * into the sqoop specified CSV text format for each {@link #ColumnType} field in the row
@@ -89,33 +106,66 @@ public abstract class IntermediateDataFormat<T> {
 
   /**
    * Set one row of data as CSV.
-   *
    */
   public abstract void setCSVTextData(String csvText);
 
   /**
-   * Get one row of data as an Object array. Sqoop uses defined object representation
-   * for each column type. For instance org.joda.time to represent date.Use {@link #SqoopIDFUtils}
-   * for reading and writing into the sqoop specified object format
-   * for each {@link #ColumnType} field in the row
+   * Get one row of data as an Object array.
+   * Sqoop uses defined object representation
+   * for each column type. For instance org.joda.time to represent date.
+   * Use {@link #SqoopIDFUtils} for reading and writing into the sqoop
+   * specified object format for each {@link #ColumnType} field in the row
    * </p>
    * @return - String representing the data as an Object array
    * If FROM and TO schema exist, we will use SchemaMatcher to get the data according to "TO" schema
    */
   public abstract Object[] getObjectData();
 
-  /**
-   * Set one row of data as an Object array.
-   *
-   */
+ /**
+  * Set one row of data as an Object array.
+  * It also should construct the data representation
+  * that the IDF represents so that the object is ready to
+  * consume when getData is invoked. Custom implementations
+  * will override this method to convert form object array
+  * to the data format
+  */
   public abstract void setObjectData(Object[] data);
 
   /**
-   * Set the schema for serializing/de-serializing  data.
+   * Set the schema for serializing/de-serializing data.
    *
-   * @param schema - the schema used for serializing/de-serializing  data
+   * @param schema
+   *          - the schema used for serializing/de-serializing data
    */
-  public abstract void setSchema(Schema schema);
+  public void setSchema(Schema schema) {
+    if (schema == null) {
+      // TODO(SQOOP-1956): throw an exception since working without a schema is dangerous
+      return;
+    }
+    this.schema = schema;
+    Column[] columns = schema.getColumnsArray();
+    int i = 0;
+    for (Column col : columns) {
+      if (isColumnStringType(col)) {
+        stringTypeColumnIndices.add(i);
+      } else if (col.getType() == ColumnType.BIT) {
+        bitTypeColumnIndices.add(i);
+      } else if (col.getType() == ColumnType.DATE) {
+        dateTypeColumnIndices.add(i);
+      } else if (col.getType() == ColumnType.TIME) {
+        timeTypeColumnIndices.add(i);
+      } else if (col.getType() == ColumnType.DATE_TIME) {
+        dateTimeTypeColumnIndices.add(i);
+      } else if (col.getType() == ColumnType.BINARY) {
+        byteTypeColumnIndices.add(i);
+      } else if (isColumnListType(col)) {
+        listTypeColumnIndices.add(i);
+      } else if (col.getType() == ColumnType.MAP) {
+        mapTypeColumnIndices.add(i);
+      }
+      i++;
+    }
+  }
 
   /**
    * Serialize the fields of this object to <code>out</code>.
@@ -135,7 +185,6 @@ public abstract class IntermediateDataFormat<T> {
    * @throws IOException
    */
   public abstract void read(DataInput in) throws IOException;
-
   /**
    * Provide the external jars that the IDF depends on
    * @return set of jars
@@ -144,4 +193,35 @@ public abstract class IntermediateDataFormat<T> {
     return new HashSet<String>();
   }
 
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((data == null) ? 0 : data.hashCode());
+    result = prime * result + ((schema == null) ? 0 : schema.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    IntermediateDataFormat<?> other = (IntermediateDataFormat<?>) obj;
+    if (data == null) {
+      if (other.data != null)
+        return false;
+    } else if (!data.equals(other.data))
+      return false;
+    if (schema == null) {
+      if (other.schema != null)
+        return false;
+    } else if (!schema.equals(other.schema))
+      return false;
+    return true;
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
new file mode 100644
index 0000000..bda75fc
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum IntermediateDataFormatError implements ErrorCode {
+  /** An unknown error has occurred. */
+  INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
+
+  /** Number of columns in schema does not match the data set. */
+  INTERMEDIATE_DATA_FORMAT_0001("Wrong number of columns."),
+
+  /** Schema is missing in the IDF. */
+  INTERMEDIATE_DATA_FORMAT_0002("Schema missing."),
+
+  INTERMEDIATE_DATA_FORMAT_0003("JSON parse error"),
+
+  ;
+
+  private final String message;
+
+  private IntermediateDataFormatError(String message) {
+    this.message = message;
+  }
+
+  public String getCode() {
+    return name();
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
new file mode 100644
index 0000000..9329cf8
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.utils.ClassUtils;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * IDF representing the intermediate format in JSON
+ */
+public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObject> {
+
+  // We need schema at all times
+  public JSONIntermediateDataFormat(Schema schema) {
+    setSchema(schema);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setCSVTextData(String text) {
+    // convert the CSV text to JSON
+    this.data = toJSON(text);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String getCSVTextData() {
+    // convert JSON to sqoop CSV
+    return toCSV(data);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setObjectData(Object[] data) {
+    // convert the object Array to JSON
+    this.data = toJSON(data);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Object[] getObjectData() {
+    // convert JSON to object array
+    return toObject(data);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(this.data.toJSONString());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void read(DataInput in) throws IOException {
+    try {
+      data = (JSONObject) new JSONParser().parse(in.readUTF());
+    } catch (ParseException e) {
+      throw new SqoopException(JSONIntermediateDataFormatError.JSON_INTERMEDIATE_DATA_FORMAT_0002, e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Set<String> getJars() {
+
+    Set<String> jars = super.getJars();
+    jars.add(ClassUtils.jarForClass(JSONObject.class));
+    return jars;
+  }
+
+  @SuppressWarnings("unchecked")
+  private JSONObject toJSON(String csv) {
+
+    String[] csvStringArray = parseCSVString(csv);
+
+    if (csvStringArray == null) {
+      return null;
+    }
+
+    if (csvStringArray.length != schema.getColumnsArray().length) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + csv
+          + " has the wrong number of fields.");
+    }
+    JSONObject object = new JSONObject();
+    Column[] columnArray = schema.getColumnsArray();
+    for (int i = 0; i < csvStringArray.length; i++) {
+      // check for NULL field and bail out immediately
+      if (csvStringArray[i].equals(NULL_VALUE)) {
+        object.put(columnArray[i].getName(), null);
+        continue;
+      }
+      object.put(columnArray[i].getName(), toJSON(csvStringArray[i], columnArray[i]));
+
+    }
+    return object;
+  }
+
+  private Object toJSON(String csvString, Column column) {
+    Object returnValue = null;
+
+    switch (column.getType()) {
+    case ARRAY:
+    case SET:
+      try {
+        returnValue = (JSONArray) new JSONParser().parse(removeQuotes(csvString));
+      } catch (ParseException e) {
+        throw new SqoopException(JSONIntermediateDataFormatError.JSON_INTERMEDIATE_DATA_FORMAT_0002, e);
+      }
+      break;
+    case MAP:
+      try {
+        returnValue = (JSONObject) new JSONParser().parse(removeQuotes(csvString));
+      } catch (ParseException e) {
+        throw new SqoopException(JSONIntermediateDataFormatError.JSON_INTERMEDIATE_DATA_FORMAT_0003, e);
+      }
+      break;
+    case ENUM:
+    case TEXT:
+      returnValue = toText(csvString);
+      break;
+    case BINARY:
+    case UNKNOWN:
+      returnValue = Base64.encodeBase64String(toByteArray(csvString));
+      break;
+    case FIXED_POINT:
+      returnValue = toFixedPoint(csvString, column);
+      break;
+    case FLOATING_POINT:
+      returnValue = toFloatingPoint(csvString, column);
+      break;
+    case DECIMAL:
+      returnValue = toDecimal(csvString, column);
+      break;
+    case DATE:
+    case TIME:
+    case DATE_TIME:
+      // store as string expected to be in the JODA time format in CSV
+      // stored in JSON as joda time format
+      returnValue = removeQuotes(csvString);
+      break;
+    // true/false and TRUE/ FALSE are the only accepted values for JSON Bit
+    // will be stored as true/false in JSON
+    case BIT:
+      returnValue = Boolean.valueOf(removeQuotes(csvString));
+      break;
+    default:
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0004,
+ "Column type from schema was not recognized for " + column.getType());
+    }
+    return returnValue;
+  }
+
+  @SuppressWarnings("unchecked")
+  private JSONObject toJSON(Object[] data) {
+
+    if (data == null) {
+      return null;
+    }
+
+    if (data.length != schema.getColumnsArray().length) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + data.toString()
+          + " has the wrong number of fields.");
+    }
+    JSONObject object = new JSONObject();
+    Column[] cols = schema.getColumnsArray();
+    for (int i = 0; i < data.length; i++) {
+      switch (cols[i].getType()) {
+      case ARRAY:
+      case SET:
+        // store as JSON array
+        Object[] objArray = (Object[]) data[i];
+        JSONArray jsonArray = toJSONArray(objArray);
+        object.put(cols[i].getName(), jsonArray);
+        break;
+      case MAP:
+        // store as JSON object
+        Map<Object, Object> map = (Map<Object, Object>) data[i];
+        JSONObject jsonObject = new JSONObject();
+        jsonObject.putAll(map);
+        object.put(cols[i].getName(), jsonObject);
+        break;
+      case ENUM:
+      case TEXT:
+        object.put(cols[i].getName(), data[i]);
+        break;
+      case BINARY:
+      case UNKNOWN:
+        object.put(cols[i].getName(), Base64.encodeBase64String((byte[]) data[i]));
+        break;
+      case FIXED_POINT:
+      case FLOATING_POINT:
+      case DECIMAL:
+        // store a object
+        object.put(cols[i].getName(), data[i]);
+        break;
+      // stored in JSON as the same format as csv strings in the joda time
+      // format
+      case DATE_TIME:
+        object.put(cols[i].getName(), removeQuotes(encodeToCSVDateTime(data[i], cols[i])));
+        break;
+      case TIME:
+        object.put(cols[i].getName(), removeQuotes(encodeToCSVTime(data[i], cols[i])));
+        break;
+      case DATE:
+        object.put(cols[i].getName(), removeQuotes(encodeToCSVDate(data[i])));
+        break;
+      case BIT:
+        object.put(cols[i].getName(), Boolean.valueOf(encodeToCSVBit(data[i])));
+        break;
+      default:
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+            "Column type from schema was not recognized for " + cols[i].getType());
+      }
+    }
+
+    return object;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static JSONArray toJSONArray(Object[] objectArray) {
+    JSONArray jsonArray = new JSONArray();
+    for (int i = 0; i < objectArray.length; i++) {
+      Object value = objectArray[i];
+      if (value instanceof Object[]) {
+        value = toJSONArray((Object[]) value);
+      }
+      jsonArray.add(value);
+    }
+    return jsonArray;
+  }
+
+  private String toCSV(JSONObject json) {
+    Column[] cols = this.schema.getColumnsArray();
+
+    StringBuilder csvString = new StringBuilder();
+    for (int i = 0; i < cols.length; i++) {
+
+      // or we can to json.entrySet();
+      Object obj = json.get(cols[i].getName());
+      if (obj == null) {
+        throw new SqoopException(JSONIntermediateDataFormatError.JSON_INTERMEDIATE_DATA_FORMAT_0003, " for " + cols[i].getName());
+      }
+
+      switch (cols[i].getType()) {
+      case ARRAY:
+      case SET:
+        // stored as JSON array
+        JSONArray array = (JSONArray) obj;
+        csvString.append(encloseWithQuote(array.toJSONString()));
+        break;
+      case MAP:
+        // stored as JSON object
+        csvString.append(encloseWithQuote((((JSONObject) obj).toJSONString())));
+        break;
+      case ENUM:
+      case TEXT:
+        csvString.append(encodeToCSVString(obj.toString()));
+        break;
+      case BINARY:
+      case UNKNOWN:
+        csvString.append(encodeToCSVByteArray(Base64.decodeBase64(obj.toString())));
+        break;
+      case FIXED_POINT:
+        csvString.append(encodeToCSVFixedPoint(obj, cols[i]));
+        break;
+      case FLOATING_POINT:
+        csvString.append(encodeToCSVFloatingPoint(obj, cols[i]));
+        break;
+      case DECIMAL:
+        csvString.append(encodeToCSVDecimal(obj));
+        break;
+      // stored in JSON as strings in the joda time format
+      case DATE:
+      case TIME:
+      case DATE_TIME:
+        csvString.append(encloseWithQuote(obj.toString()));
+        break;
+      // 0/1 will be stored as they are in JSON, even though valid values in
+      // JSON
+      // are true/false
+      case BIT:
+        csvString.append(encodeToCSVBit(obj));
+        break;
+      default:
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+            "Column type from schema was not recognized for " + cols[i].getType());
+      }
+      if (i < cols.length - 1) {
+        csvString.append(CSV_SEPARATOR_CHARACTER);
+      }
+
+    }
+
+    return csvString.toString();
+  }
+
+  @SuppressWarnings("unchecked")
+  private Object[] toObject(JSONObject data) {
+
+    if (data == null) {
+      return null;
+    }
+    Column[] cols = schema.getColumnsArray();
+    Object[] object = new Object[cols.length];
+
+    Set<String> keys = data.keySet();
+    int i = 0;
+    for (String key : keys) {
+
+      Integer nameIndex = schema.getColumnNameIndex(key);
+      Object obj = data.get(key);
+      Column column = cols[nameIndex];
+      switch (column.getType()) {
+      case ARRAY:
+      case SET:
+        object[nameIndex] = toList((JSONArray) obj).toArray();
+        break;
+      case MAP:
+        object[nameIndex] = toMap((JSONObject) obj);
+        break;
+      case ENUM:
+      case TEXT:
+        object[nameIndex] = toText(obj.toString());
+        break;
+      case BINARY:
+      case UNKNOWN:
+        // JSON spec is to store byte array as base64 encoded
+        object[nameIndex] = Base64.decodeBase64(obj.toString());
+        break;
+      case FIXED_POINT:
+        object[nameIndex] = toFixedPoint(obj.toString(), column);
+        break;
+      case FLOATING_POINT:
+        object[nameIndex] = toFloatingPoint(obj.toString(), column);
+        break;
+      case DECIMAL:
+        object[nameIndex] = toDecimal(obj.toString(), column);
+        break;
+      case DATE:
+        object[nameIndex] = toDate(obj.toString(), column);
+        break;
+      case TIME:
+        object[nameIndex] = toTime(obj.toString(), column);
+        break;
+      case DATE_TIME:
+        object[nameIndex] = toDateTime(obj.toString(), column);
+        break;
+      case BIT:
+        object[nameIndex] = toBit(obj.toString());
+        break;
+      default:
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+            "Column type from schema was not recognized for " + cols[i].getType());
+      }
+
+    }
+    return object;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java
new file mode 100644
index 0000000..86c71b6
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sqoop.connector.idf;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum JSONIntermediateDataFormatError implements ErrorCode {
+  /** An unknown error has occurred. */
+  JSON_INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
+
+  JSON_INTERMEDIATE_DATA_FORMAT_0001("JSON array parse error."),
+
+  JSON_INTERMEDIATE_DATA_FORMAT_0002("JSON object parse error."),
+
+  JSON_INTERMEDIATE_DATA_FORMAT_0003("Missing key in the JSON object."),
+
+  ;
+
+  private final String message;
+
+  private JSONIntermediateDataFormatError(String message) {
+    this.message = message;
+  }
+
+  public String getCode() {
+    return name();
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index f6852a0..2602c61 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -188,6 +188,16 @@ public class TestCSVIntermediateDataFormat {
     assertEquals(testData, dataFormat.getCSVTextData());
   }
 
+
+  @Test
+  public void testInputAsCSVTextInAndDataOut() {
+    String testData = "'ENUM',10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+      + ",'" + String.valueOf(0x0A) + "'";
+    dataFormat.setCSVTextData(testData);
+    assertEquals(testData, dataFormat.getData());
+  }
+
+
   @Test
   public void testInputAsCSVTextInObjectOut() {
 
@@ -219,7 +229,7 @@ public class TestCSVIntermediateDataFormat {
   }
 
   @Test
-  public void testInputAsObjectArayInCSVTextOut() {
+  public void testInputAsObjectArayInCSVTextOrDataOut() {
     Schema schema = new Schema("test");
     schema.addColumn(new FixedPoint("1"))
         .addColumn(new FixedPoint("2"))
@@ -246,6 +256,7 @@ public class TestCSVIntermediateDataFormat {
     String testData = "10,34,'54','random data'," + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r")
         + ",'\\n','TEST_ENUM'";
     assertEquals(testData, dataFormat.getCSVTextData());
+    assertEquals(testData, dataFormat.getData());
   }
 
   @Test
@@ -384,7 +395,7 @@ public class TestCSVIntermediateDataFormat {
     org.joda.time.LocalTime time = new org.joda.time.LocalTime(15, 0, 0);
     Object[] in = { time, "test" };
     dataFormat.setObjectData(in);
-    assertEquals("'15:00:00.000000','test'", dataFormat.getCSVTextData());
+    assertEquals("'15:00:00.000','test'", dataFormat.getCSVTextData());
   }
 
   @Test
@@ -854,7 +865,8 @@ public class TestCSVIntermediateDataFormat {
     dataFormat.setObjectData(data);
     Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
     assertEquals(2, expectedArray.length);
-    assertEquals(Arrays.deepToString(arrayOfArrays), Arrays.deepToString(expectedArray));
+    String arrayOfArraysString = "[[11,12], [14,15]]";
+    assertEquals(arrayOfArraysString, Arrays.toString(expectedArray));
     assertEquals("text", dataFormat.getObjectData()[1]);
   }
 
@@ -879,7 +891,7 @@ public class TestCSVIntermediateDataFormat {
     dataFormat.setCSVTextData("'[\"[11, 12]\",\"[14, 15]\"]','text'");
     Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
     assertEquals(2, expectedArray.length);
-    assertEquals(Arrays.deepToString(arrayOfArrays), Arrays.deepToString(expectedArray));
+    assertEquals(Arrays.deepToString(arrayOfArrays), Arrays.toString(expectedArray));
     assertEquals("text", dataFormat.getObjectData()[1]);
   }
 
@@ -890,7 +902,7 @@ public class TestCSVIntermediateDataFormat {
         new org.apache.sqoop.schema.type.Array("array", new FixedPoint("ft"))));
     schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
     dataFormat.setSchema(schema);
-    String input = "'[\"[11, 12]\",\"[14, 15]\"]','text'";
+    String input = "'[[11, 12],[14, 15]]','text'";
     dataFormat.setCSVTextData(input);
     Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
     assertEquals(2, expectedArray.length);
@@ -916,7 +928,7 @@ public class TestCSVIntermediateDataFormat {
     data[0] = arrayOfArrays;
     data[1] = "text";
     dataFormat.setObjectData(data);
-    String expected = "'[\"[11, 12]\",\"[14, 15]\"]','text'";
+    String expected = "'[[11,12],[14,15]]','text'";
     assertEquals(expected, dataFormat.getCSVTextData());
   }
   //**************test cases for map**********************

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
new file mode 100644
index 0000000..8672712
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.sqoop.connector.common.SqoopIDFUtils;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Array;
+import org.apache.sqoop.schema.type.Binary;
+import org.apache.sqoop.schema.type.Bit;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.joda.time.LocalDateTime;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestJSONIntermediateDataFormat {
+
+  private JSONIntermediateDataFormat dataFormat;
+  private final static String csvArray = "'[[11,11],[14,15]]'";
+  private final static String map = "'{\"testKey\":\"testValue\"}'";
+  private final static String csvSet = "'[[11,12],[14,15]]'";
+  private final static String date = "'2014-10-01'";
+  private final static String dateTime = "'2014-10-01 12:00:00.000'";
+  private final static String time = "'12:59:59'";
+
+  @Before
+  public void setUp() {
+    createJSONIDF();
+  }
+
+  private void createJSONIDF() {
+    Schema schema = new Schema("test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new FixedPoint("2", 2L, false)).addColumn(new Text("3"))
+        .addColumn(new Text("4")).addColumn(new Binary("5")).addColumn(new Text("6"))
+        .addColumn(new org.apache.sqoop.schema.type.Enum("7"))
+        .addColumn(new Array("8", new Array("array", new FixedPoint("ft"))))
+        .addColumn(new org.apache.sqoop.schema.type.Map("9", new Text("t1"), new Text("t2"))).addColumn(new Bit("10"))
+        .addColumn(new org.apache.sqoop.schema.type.DateTime("11", true, false))
+        .addColumn(new org.apache.sqoop.schema.type.Time("12", false)).addColumn(new org.apache.sqoop.schema.type.Date("13"))
+        .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("14"))
+        .addColumn(new org.apache.sqoop.schema.type.Set("15", new Array("set", new FixedPoint("ftw"))));
+    dataFormat = new JSONIntermediateDataFormat(schema);
+  }
+
+  /**
+   * setCSVGetData setCSVGetObjectArray setCSVGetCSV
+   */
+  @Test
+  public void testInputAsCSVTextInAndDataOut() {
+
+    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
+        + csvSet;
+    dataFormat.setCSVTextData(csvText);
+    String jsonExpected = "{\"15\":[[11,12],[14,15]],\"13\":\"2014-10-01\",\"14\":13.44,\"11\":\"2014-10-01 12:00:00.000\","
+        + "\"12\":\"12:59:59\",\"3\":\"54\",\"2\":34,\"1\":10,\"10\":true,\"7\":\"ENUM\",\"6\":\"10\",\"5\":\"kDY=\",\"4\":\"random data\","
+        + "\"9\":{\"testKey\":\"testValue\"},\"8\":[[11,11],[14,15]]}";
+    assertEquals(jsonExpected, dataFormat.getData().toJSONString());
+  }
+
+  @Test
+  public void testInputAsCSVTextInAndObjectArrayOut() {
+    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
+        + csvSet;
+    dataFormat.setCSVTextData(csvText);
+    assertEquals(dataFormat.getObjectData().length, 15);
+    assertObjectArray();
+
+  }
+
+  private void assertObjectArray() {
+    Object[] out = dataFormat.getObjectData();
+    assertEquals(10L, out[0]);
+    assertEquals(34, out[1]);
+    assertEquals("54", out[2]);
+    assertEquals("random data", out[3]);
+    assertEquals(-112, ((byte[]) out[4])[0]);
+    assertEquals(54, ((byte[]) out[4])[1]);
+    assertEquals("10", out[5]);
+    assertEquals("ENUM", out[6]);
+
+    Object[] givenArrayOne = new Object[2];
+    givenArrayOne[0] = 11;
+    givenArrayOne[1] = 11;
+    Object[] givenArrayTwo = new Object[2];
+    givenArrayTwo[0] = 14;
+    givenArrayTwo[1] = 15;
+    Object[] arrayOfArrays = new Object[2];
+    arrayOfArrays[0] = givenArrayOne;
+    arrayOfArrays[1] = givenArrayTwo;
+    Map<Object, Object> map = new HashMap<Object, Object>();
+    map.put("testKey", "testValue");
+    // no time zone
+    LocalDateTime dateTime = new org.joda.time.LocalDateTime(2014, 10, 01, 12, 0, 0);
+    org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59);
+    org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
+    Object[] set0 = new Object[2];
+    set0[0] = 11;
+    set0[1] = 12;
+    Object[] set1 = new Object[2];
+    set1[0] = 14;
+    set1[1] = 15;
+    Object[] set = new Object[2];
+    set[0] = set0;
+    set[1] = set1;
+    out[14] = set;
+    assertEquals(arrayOfArrays.length, 2);
+    assertEquals(Arrays.deepToString(arrayOfArrays), Arrays.deepToString((Object[]) out[7]));
+    assertEquals(map, out[8]);
+    assertEquals(true, out[9]);
+    assertEquals(dateTime, out[10]);
+    assertEquals(time, out[11]);
+    assertEquals(date, out[12]);
+    assertEquals(13.44, out[13]);
+    assertEquals(set.length, 2);
+    assertEquals(Arrays.deepToString(set), Arrays.deepToString((Object[]) out[14]));
+
+  }
+
+  @Test
+  public void testInputAsCSVTextInCSVTextOut() {
+    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
+        + csvSet;
+    dataFormat.setCSVTextData(csvText);
+    assertEquals(csvText, dataFormat.getCSVTextData());
+  }
+
+  @SuppressWarnings("unchecked")
+  private JSONObject createJSONObject() {
+    JSONObject json = new JSONObject();
+    json.put("1", 10L);
+    json.put("2", 34);
+    json.put("3", "54");
+    json.put("4", "random data");
+    json.put("5", org.apache.commons.codec.binary.Base64.encodeBase64String(new byte[] { (byte) -112, (byte) 54 }));
+    json.put("6", String.valueOf(0x0A));
+    json.put("7", "ENUM");
+    JSONArray givenArrayOne = new JSONArray();
+    givenArrayOne.add(11);
+    givenArrayOne.add(11);
+    JSONArray givenArrayTwo = new JSONArray();
+    givenArrayTwo.add(14);
+    givenArrayTwo.add(15);
+
+    JSONArray arrayOfArrays = new JSONArray();
+    arrayOfArrays.add(givenArrayOne);
+    arrayOfArrays.add(givenArrayTwo);
+
+    JSONObject map = new JSONObject();
+    map.put("testKey", "testValue");
+
+    json.put("8", arrayOfArrays);
+    json.put("9", map);
+    json.put("10", true);
+
+    // expect dates as strings
+    json.put("11", SqoopIDFUtils.removeQuotes(dateTime));
+    json.put("12", SqoopIDFUtils.removeQuotes(time));
+    json.put("13", SqoopIDFUtils.removeQuotes(date));
+    json.put("14", 13.44);
+
+    JSONArray givenSetOne = new JSONArray();
+    givenSetOne.add(11);
+    givenSetOne.add(12);
+    JSONArray givenSetTwo = new JSONArray();
+    givenSetTwo.add(14);
+    givenSetTwo.add(15);
+    JSONArray arrayOfSet = new JSONArray();
+    arrayOfSet.add(givenSetOne);
+    arrayOfSet.add(givenSetTwo);
+
+    json.put("15", arrayOfSet);
+    return json;
+  }
+
+  /**
+   * setDataGetCSV setDataGetObjectArray setDataGetData
+   */
+  @Test
+  public void testInputAsDataInAndCSVOut() {
+
+    String csvExpected = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
+        + csvSet;
+    dataFormat.setData(createJSONObject());
+    assertEquals(csvExpected, dataFormat.getCSVTextData());
+  }
+
+  @Test
+  public void testInputAsDataInAndObjectArrayOut() {
+    JSONObject json = createJSONObject();
+    dataFormat.setData(json);
+    assertObjectArray();
+  }
+
+  @Test
+  public void testInputAsDataInAndDataOut() {
+    JSONObject json = createJSONObject();
+    dataFormat.setData(json);
+    assertEquals(json, dataFormat.getData());
+  }
+
+  private Object[] createObjectArray() {
+    Object[] out = new Object[15];
+    out[0] = 10L;
+    out[1] = 34;
+    out[2] = "54";
+    out[3] = "random data";
+    out[4] = new byte[] { (byte) -112, (byte) 54 };
+    out[5] = String.valueOf(0x0A);
+    out[6] = "ENUM";
+
+    Object[] givenArrayOne = new Object[2];
+    givenArrayOne[0] = 11;
+    givenArrayOne[1] = 11;
+    Object[] givenArrayTwo = new Object[2];
+    givenArrayTwo[0] = 14;
+    givenArrayTwo[1] = 15;
+
+    Object[] arrayOfArrays = new Object[2];
+    arrayOfArrays[0] = givenArrayOne;
+    arrayOfArrays[1] = givenArrayTwo;
+
+    Map<Object, Object> map = new HashMap<Object, Object>();
+    map.put("testKey", "testValue");
+
+    out[7] = arrayOfArrays;
+    out[8] = map;
+    out[9] = true;
+    org.joda.time.DateTime dateTime = new org.joda.time.DateTime(2014, 10, 01, 12, 0, 0, 0);
+    out[10] = dateTime;
+    org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59);
+
+    out[11] = time;
+    org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
+
+    out[12] = date;
+
+    out[13] = 13.44;
+    Object[] set0 = new Object[2];
+    set0[0] = 11;
+    set0[1] = 12;
+    Object[] set1 = new Object[2];
+    set1[0] = 14;
+    set1[1] = 15;
+
+    Object[] set = new Object[2];
+    set[0] = set0;
+    set[1] = set1;
+    out[14] = set;
+    return out;
+  }
+
+  /**
+   * setObjectArrayGetData setObjectArrayGetCSV setObjectArrayGetObjectArray
+   */
+  @Test
+  public void testInputAsObjectArrayInAndDataOut() {
+
+    Object[] out = createObjectArray();
+    dataFormat.setObjectData(out);
+    JSONObject json = createJSONObject();
+    assertEquals(json, dataFormat.getData());
+  }
+
+  @Test
+  public void testInputAsObjectArrayInAndCSVOut() {
+    Object[] out = createObjectArray();
+    dataFormat.setObjectData(out);
+    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
+        + csvSet;
+    assertEquals(csvText, dataFormat.getCSVTextData());
+  }
+
+  @Test
+  public void testInputAsObjectArrayInAndObjectArrayOut() {
+    Object[] out = createObjectArray();
+    dataFormat.setObjectData(out);
+    assertObjectArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2381566..4dbc48f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -94,6 +94,7 @@ limitations under the License.
     <commons-lang.version>2.5</commons-lang.version>
     <commons-io.version>2.4</commons-io.version>
     <commons-compress.version>1.9</commons-compress.version>
+    <commons-codec.version>1.9</commons-codec.version>
     <derby.version>10.8.2.2</derby.version>
     <hadoop.1.version>1.0.3</hadoop.1.version>
     <hadoop.2.version>2.6.0</hadoop.2.version>
@@ -444,11 +445,16 @@ limitations under the License.
         <artifactId>commons-cli</artifactId>
         <version>${commons-cli.version}</version>
       </dependency>
-        <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
-            <version>${commons-io.version}</version>
-        </dependency>
+       <dependency>
+         <groupId>commons-io</groupId>
+         <artifactId>commons-io</artifactId>
+         <version>${commons-io.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>commons-codec</groupId>
+        <artifactId>commons-codec</artifactId>
+        <version>${commons-codec.version}</version>
+      </dependency>
       <dependency>
         <groupId>javax.servlet</groupId>
         <artifactId>servlet-api</artifactId>