You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2014/12/13 03:30:47 UTC

sqoop git commit: SQOOP-1813: Sqoop2: Add SqoopIDFUtils class and unit tests

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 dda5e1edd -> 293e9ef63


SQOOP-1813: Sqoop2: Add SqoopIDFUtils class and unit tests

(Veena Basavaraj via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/293e9ef6
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/293e9ef6
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/293e9ef6

Branch: refs/heads/sqoop2
Commit: 293e9ef63625538a7020ebc16f6bacbd27a2f6ea
Parents: dda5e1e
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Fri Dec 12 20:26:26 2014 -0600
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Fri Dec 12 20:27:09 2014 -0600

----------------------------------------------------------------------
 .../sqoop/connector/common/SqoopIDFUtils.java   | 396 +++++++++++++++++++
 .../idf/CSVIntermediateDataFormat.java          | 354 ++---------------
 .../connector/idf/IntermediateDataFormat.java   |   4 +-
 .../connector/common/TestSqoopIDFUtils.java     | 162 ++++++++
 .../idf/TestCSVIntermediateDataFormat.java      |  27 +-
 5 files changed, 603 insertions(+), 340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/293e9ef6/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
new file mode 100644
index 0000000..48adae1
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.common;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormatError;
+import org.apache.sqoop.schema.type.AbstractComplexListType;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.ColumnType;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+
+/**
+ * Utility methods for connectors to encode data into the sqoop expected formats
+ * documented in
+ * https://cwiki.apache.org/confluence/display/SQOOP/Intermediate+Data
+ * +Format+API
+ *
+ */
+
+public class SqoopIDFUtils {
+
+  public static final String NULL_VALUE = "NULL";
+
+  // ISO-8859-1 is an 8-bit codec that is supported in every java
+  // implementation.
+  public static final String BYTE_FIELD_CHARSET = "ISO-8859-1";
+
+  public static final char[] originals = { 0x5C, 0x00, 0x0A, 0x0D, 0x1A, 0x22, 0x27 };
+
+  public static final char CSV_SEPARATOR_CHARACTER = ',';
+  public static final char ESCAPE_CHARACTER = '\\';
+  public static final char QUOTE_CHARACTER = '\'';
+
+  // string related replacements
+  private static final String[] replacements = {
+      new String(new char[] { ESCAPE_CHARACTER, '\\' }),
+      new String(new char[] { ESCAPE_CHARACTER, '0' }),
+      new String(new char[] { ESCAPE_CHARACTER, 'n' }),
+      new String(new char[] { ESCAPE_CHARACTER, 'r' }),
+      new String(new char[] { ESCAPE_CHARACTER, 'Z' }),
+      new String(new char[] { ESCAPE_CHARACTER, '\"' }),
+      new String(new char[] { ESCAPE_CHARACTER, '\'' })
+      };
+
+  // http://www.joda.org/joda-time/key_format.html provides details on the
+  // formatter token
+  // can have fraction and or timezone
+  public static final DateTimeFormatter dtfWithFractionAndTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ");
+  public static final DateTimeFormatter dtfWithNoFractionAndTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
+  public static final DateTimeFormatter dtfWithFractionNoTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS");
+  public static final DateTimeFormatter dtfWithNoFractionWithTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssZ");
+
+  // only date, no time
+  public static final DateTimeFormatter df = DateTimeFormat.forPattern("yyyy-MM-dd");
+  // time with fraction only, no timezone
+  public static final DateTimeFormatter tfWithFraction = DateTimeFormat.forPattern("HH:mm:ss.SSSSSS");
+  public static final DateTimeFormatter tfWithNoFraction = DateTimeFormat.forPattern("HH:mm:ss");
+
+  static final String[] TRUE_BIT_VALUES = new String[] { "1", "true", "TRUE" };
+  static final Set<String> TRUE_BIT_SET = new HashSet<String>(Arrays.asList(TRUE_BIT_VALUES));
+  static final String[] FALSE_BIT_VALUES = new String[] { "0", "false", "FALSE" };
+  static final Set<String> FALSE_BIT_SET = new HashSet<String>(Arrays.asList(FALSE_BIT_VALUES));
+
+  // ******** Number Column Type utils***********
+
+  public static Object toFixedPoint(String csvString, Column column) {
+    Object returnValue;
+    Long byteSize = ((FixedPoint) column).getByteSize();
+    if (byteSize != null && byteSize <= Integer.SIZE) {
+      returnValue = Integer.valueOf(csvString);
+    } else {
+      returnValue = Long.valueOf(csvString);
+    }
+    return returnValue;
+  }
+
+  public static Object toFloatingPoint(String csvString, Column column) {
+    Object returnValue;
+    Long byteSize = ((FloatingPoint) column).getByteSize();
+    if (byteSize != null && byteSize <= Float.SIZE) {
+      returnValue = Float.valueOf(csvString);
+    } else {
+      returnValue = Double.valueOf(csvString);
+    }
+    return returnValue;
+  }
+
+  public static Object toDecimal(String csvString, Column column) {
+    return new BigDecimal(csvString);
+  }
+
+  // ********** BIT Column Type utils******************
+  public static void encodeToCSVBit(Object[] objectArray, int i) {
+    String bitStringValue = objectArray[i].toString();
+    if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
+      objectArray[i] = bitStringValue;
+    } else {
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: "
+          + objectArray[i]);
+    }
+  }
+
+  public static Object toBit(String csvString, Object returnValue) {
+    if ((TRUE_BIT_SET.contains(csvString)) || (FALSE_BIT_SET.contains(csvString))) {
+      returnValue = TRUE_BIT_SET.contains(csvString);
+    } else {
+      // throw an exception for any unsupported value for BITs
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + csvString);
+    }
+    return returnValue;
+  }
+
+  // *********** DATE and TIME Column Type utils **********
+
+  public static void encodeToCSVDate(Object[] objectArray, int i) {
+    org.joda.time.LocalDate date = (org.joda.time.LocalDate) objectArray[i];
+    objectArray[i] = encloseWithQuote(df.print(date));
+  }
+
+  public static void encodeToCSVTime(Object[] objectArray, int i, Column col) {
+    if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) {
+      objectArray[i] = encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) objectArray[i]));
+    } else {
+      objectArray[i] = encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) objectArray[i]));
+    }
+  }
+
+  public static Object toDate(String csvString, Column column) {
+    return LocalDate.parse(removeQuotes(csvString));
+  }
+
+  public static Object toTime(String csvString, Column column) {
+    return LocalTime.parse(removeQuotes(csvString));
+  }
+
+  // *********** DATE TIME Column Type utils **********
+
+  public static void encodeToCSVLocalDateTime(Object[] objectArray, int i, Column col, org.joda.time.LocalDateTime localDateTime) {
+    org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
+    if (column.hasFraction()) {
+      objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(localDateTime));
+    } else {
+      objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(localDateTime));
+    }
+  }
+
+  public static void encodeToCSVDateTime(Object[] objectArray, int i, Column col, org.joda.time.DateTime dateTime) {
+    org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
+    if (column.hasFraction() && column.hasTimezone()) {
+      objectArray[i] = encloseWithQuote(dtfWithFractionAndTimeZone.print(dateTime));
+    } else if (column.hasFraction() && !column.hasTimezone()) {
+      objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(dateTime));
+    } else if (column.hasTimezone()) {
+      objectArray[i] = encloseWithQuote(dtfWithNoFractionWithTimeZone.print(dateTime));
+    } else {
+      objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(dateTime));
+    }
+  }
+
+  public static Object toDateTime(String fieldString, Column column) {
+    Object returnValue;
+    String dateTime = removeQuotes(fieldString);
+    org.apache.sqoop.schema.type.DateTime col = ((org.apache.sqoop.schema.type.DateTime) column);
+    if (col.hasFraction() && col.hasTimezone()) {
+      // After calling withOffsetParsed method, a string
+      // '2004-06-09T10:20:30-08:00' will create a datetime with a zone of
+      // -08:00 (a fixed zone, with no daylight savings rules)
+      returnValue = dtfWithFractionAndTimeZone.withOffsetParsed().parseDateTime(dateTime);
+    } else if (col.hasFraction() && !col.hasTimezone()) {
+      // we use local date time explicitly to not include the timezone
+      returnValue = dtfWithFractionNoTimeZone.parseLocalDateTime(dateTime);
+    } else if (col.hasTimezone()) {
+      returnValue = dtfWithNoFractionWithTimeZone.withOffsetParsed().parseDateTime(dateTime);
+    } else {
+      // we use local date time explicitly to not include the timezone
+      returnValue = dtfWithNoFractionAndTimeZone.parseLocalDateTime(dateTime);
+    }
+    return returnValue;
+  }
+
+  // ************ MAP Column Type utils*********
+
+  @SuppressWarnings("unchecked")
+  public static String encodeToCSVMap(Map<Object, Object> map, Column column) {
+    JSONObject object = new JSONObject();
+    object.putAll(map);
+    return encloseWithQuote(object.toJSONString());
+  }
+
+  public static Map<Object, Object> toMap(String csvString) {
+
+    JSONObject object = null;
+    try {
+      object = (JSONObject) new JSONParser().parse(removeQuotes(csvString));
+    } catch (org.json.simple.parser.ParseException e) {
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e);
+    }
+    if (object != null) {
+      return toMap(object);
+    }
+    return null;
+  }
+
+  private static List<Object> toList(JSONArray array) {
+    List<Object> list = new ArrayList<Object>();
+    for (int i = 0; i < array.size(); i++) {
+      Object value = array.get(i);
+      if (value instanceof JSONArray) {
+        value = toList((JSONArray) value);
+      }
+
+      else if (value instanceof JSONObject) {
+        value = toMap((JSONObject) value);
+      }
+      list.add(value);
+    }
+    return list;
+  }
+
+  @SuppressWarnings("unchecked")
+  private static Map<Object, Object> toMap(JSONObject object) {
+    Map<Object, Object> elementMap = new HashMap<Object, Object>();
+    Set<Map.Entry<Object, Object>> entries = object.entrySet();
+    for (Map.Entry<Object, Object> entry : entries) {
+      Object value = entry.getValue();
+
+      if (value instanceof JSONArray) {
+        value = toList((JSONArray) value);
+      }
+
+      else if (value instanceof JSONObject) {
+        value = toMap((JSONObject) value);
+      }
+      elementMap.put(entry.getKey(), value);
+    }
+    return elementMap;
+  }
+
+  // ************ LIST Column Type utils*********
+
+  @SuppressWarnings("unchecked")
+  public static String encodeToCSVList(Object[] list, AbstractComplexListType column) {
+    List<Object> elementList = new ArrayList<Object>();
+    for (int n = 0; n < list.length; n++) {
+      Column listType = ((AbstractComplexListType) column).getListType();
+      if (isColumnListType(listType)) {
+        Object[] listElements = (Object[]) list[n];
+        elementList.add((Arrays.deepToString(listElements)));
+      } else {
+        elementList.add(list[n]);
+      }
+    }
+    JSONArray array = new JSONArray();
+    array.addAll(elementList);
+    return encloseWithQuote(array.toJSONString());
+  }
+
+  public static Object[] toList(String csvString) {
+
+    JSONArray array = null;
+    try {
+      array = (JSONArray) new JSONParser().parse(removeQuotes(csvString));
+    } catch (org.json.simple.parser.ParseException e) {
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e);
+    }
+    if (array != null) {
+      return array.toArray();
+    }
+    return null;
+  }
+
+  // ************ TEXT Column Type utils*********
+
+  private static String getRegExp(char character) {
+    return getRegExp(String.valueOf(character));
+  }
+
+  private static String getRegExp(String string) {
+    return string.replaceAll("\\\\", Matcher.quoteReplacement("\\\\"));
+  }
+
+  public static String encodeToCSVString(String string) {
+    int j = 0;
+    String replacement = string;
+    try {
+      for (j = 0; j < replacements.length; j++) {
+        replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j]));
+      }
+    } catch (Exception e) {
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + "  " + replacement
+          + "  " + String.valueOf(j) + "  " + e.getMessage());
+    }
+    return encloseWithQuote(replacement);
+  }
+
+  public static String toText(String csvString) {
+    // Remove the trailing and starting quotes.
+    csvString = removeQuotes(csvString);
+    int j = 0;
+    try {
+      for (j = 0; j < replacements.length; j++) {
+        csvString = csvString.replaceAll(getRegExp(replacements[j]), Matcher.quoteReplacement(String.valueOf(originals[j])));
+      }
+    } catch (Exception e) {
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, csvString + "  "
+          + String.valueOf(j) + e.getMessage());
+    }
+
+    return csvString;
+  }
+
+  // ************ BINARY Column type utils*********
+
+  public static String encodeToCSVByteArray(byte[] bytes) {
+    try {
+      return encodeToCSVString(new String(bytes, BYTE_FIELD_CHARSET));
+    } catch (UnsupportedEncodingException e) {
+      // We should never hit this case.
+      // This character set should be distributed with Java.
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0001, "The character set "
+          + BYTE_FIELD_CHARSET + " is not available.");
+    }
+  }
+
+  public static byte[] toByteArray(String csvString) {
+    // Always encoded in BYTE_FIELD_CHARSET.
+    try {
+      return toText(csvString).getBytes(BYTE_FIELD_CHARSET);
+    } catch (UnsupportedEncodingException e) {
+      // Should never hit this case.
+      // This character set should be distributed with Java.
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0001, "The character set "
+          + BYTE_FIELD_CHARSET + " is not available.");
+    }
+  }
+
+  // *********** SQOOP CSV standard encoding utils********************
+
+  public static String encloseWithQuote(String string) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(QUOTE_CHARACTER).append(string).append(QUOTE_CHARACTER);
+    return builder.toString();
+  }
+
+  public static String removeQuotes(String string) {
+    // validate that the string has quotes
+    if (string.startsWith(String.valueOf(QUOTE_CHARACTER)) && string.endsWith(String.valueOf(QUOTE_CHARACTER))) {
+      return string.substring(1, string.length() - 1);
+    }
+    return string;
+  }
+
+  // ********* utility methods for column type classification ***********
+  public static boolean isColumnListType(Column listType) {
+    return listType.getType().equals(ColumnType.ARRAY) || listType.getType().equals(ColumnType.SET);
+  }
+
+  public static boolean isColumnStringType(Column stringType) {
+    return stringType.getType().equals(ColumnType.TEXT) || stringType.getType().equals(ColumnType.ENUM);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/293e9ef6/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index e4a83b1..b377e2d 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -18,6 +18,8 @@
  */
 package org.apache.sqoop.connector.idf;
 
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
@@ -25,30 +27,16 @@ import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.AbstractComplexListType;
 import org.apache.sqoop.schema.type.Column;
 import org.apache.sqoop.schema.type.ColumnType;
-import org.apache.sqoop.schema.type.FixedPoint;
-import org.apache.sqoop.schema.type.FloatingPoint;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.math.BigDecimal;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.regex.Matcher;
 
 /**
  * A concrete implementation for the {@link #IntermediateDataFormat} that
@@ -63,42 +51,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
 
   public static final Logger LOG = Logger.getLogger(CSVIntermediateDataFormat.class);
 
-  public static final char SEPARATOR_CHARACTER = ',';
-  public static final char ESCAPE_CHARACTER = '\\';
-  public static final char QUOTE_CHARACTER = '\'';
-
-  public static final String NULL_VALUE = "NULL";
-
-  private static final char[] originals = {
-    0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27
-  };
-
-
-  private static final String[] replacements = {
-    new String(new char[] { ESCAPE_CHARACTER, '\\'}),
-    new String(new char[] { ESCAPE_CHARACTER, '0'}),
-    new String(new char[] { ESCAPE_CHARACTER, 'n'}),
-    new String(new char[] { ESCAPE_CHARACTER, 'r'}),
-    new String(new char[] { ESCAPE_CHARACTER, 'Z'}),
-    new String(new char[] { ESCAPE_CHARACTER, '\"'}),
-    new String(new char[] { ESCAPE_CHARACTER, '\''})
-  };
-
-  // ISO-8859-1 is an 8-bit codec that is supported in every java implementation.
-  static final String BYTE_FIELD_CHARSET = "ISO-8859-1";
-  // http://www.joda.org/joda-time/key_format.html provides details on the formatter token
-  // can have fraction and or timezone
-  static final DateTimeFormatter dtfWithFractionAndTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ");
-  static final DateTimeFormatter dtfWithNoFractionAndTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
-  static final DateTimeFormatter dtfWithFractionNoTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS");
-  static final DateTimeFormatter dtfWithNoFractionWithTimeZone = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssZ");
-
-  // only date, no time
-  static final DateTimeFormatter df = DateTimeFormat.forPattern("yyyy-MM-dd");
-  // time with fraction only, no timezone
-  static final DateTimeFormatter tfWithFraction = DateTimeFormat.forPattern("HH:mm:ss.SSSSSS");
-  static final DateTimeFormatter tfWithNoFraction = DateTimeFormat.forPattern("HH:mm:ss");
-
   private final List<Integer> stringTypeColumnIndices = new ArrayList<Integer>();
   private final List<Integer> bitTypeColumnIndices = new ArrayList<Integer>();
   private final List<Integer> byteTypeColumnIndices = new ArrayList<Integer>();
@@ -108,10 +60,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
   private final List<Integer> dateTypeColumnIndices = new ArrayList<Integer>();
   private final List<Integer> timeColumnIndices = new ArrayList<Integer>();
 
-  static final String[] TRUE_BIT_VALUES = new String[] { "1", "true", "TRUE" };
-  static final Set<String> TRUE_BIT_SET = new HashSet<String>(Arrays.asList(TRUE_BIT_VALUES));
-  static final String[] FALSE_BIT_VALUES = new String[] { "0", "false", "FALSE" };
-  static final Set<String> FALSE_BIT_SET = new HashSet<String>(Arrays.asList(FALSE_BIT_VALUES));
 
   private Schema schema;
 
@@ -202,7 +150,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
         builder.append(ESCAPE_CHARACTER);
         escaped = !escaped;
         break;
-      case SEPARATOR_CHARACTER:
+      case CSV_SEPARATOR_CHARACTER:
         if (quoted) {
           builder.append(c);
         } else {
@@ -258,61 +206,46 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     return objectArray;
   }
 
-  private Object parseCSVStringArrayElement(String fieldString, Column column) {
+  private Object parseCSVStringArrayElement(String csvString, Column column) {
     Object returnValue = null;
 
     switch (column.getType()) {
     case ENUM:
     case TEXT:
-      returnValue = unescapeString(fieldString);
+      returnValue = toText(csvString);
       break;
     case BINARY:
       // Unknown is treated as a binary type
     case UNKNOWN:
-      returnValue = unescapeByteArray(fieldString);
+      returnValue = toByteArray(csvString);
       break;
     case FIXED_POINT:
-      Long byteSize = ((FixedPoint) column).getByteSize();
-      if (byteSize != null && byteSize <= Integer.SIZE) {
-        returnValue = Integer.valueOf(fieldString);
-      } else {
-        returnValue = Long.valueOf(fieldString);
-      }
+      returnValue = toFixedPoint(csvString, column);
       break;
     case FLOATING_POINT:
-      byteSize = ((FloatingPoint) column).getByteSize();
-      if (byteSize != null && byteSize <= Float.SIZE) {
-        returnValue = Float.valueOf(fieldString);
-      } else {
-        returnValue = Double.valueOf(fieldString);
-      }
+      returnValue = toFloatingPoint(csvString, column);
       break;
     case DECIMAL:
-      returnValue = new BigDecimal(fieldString);
+      returnValue = toDecimal(csvString, column);
       break;
     case DATE:
-      returnValue = LocalDate.parse(removeQuotes(fieldString));
+      returnValue = toDate(csvString, column);
       break;
     case TIME:
-      returnValue = LocalTime.parse(removeQuotes(fieldString));
+      returnValue = toTime(csvString, column);
       break;
     case DATE_TIME:
-      returnValue = parseDateTime(fieldString, column);
+      returnValue = toDateTime(csvString, column);
       break;
     case BIT:
-      if ((TRUE_BIT_SET.contains(fieldString)) || (FALSE_BIT_SET.contains(fieldString))) {
-        returnValue = TRUE_BIT_SET.contains(fieldString);
-      } else {
-        // throw an exception for any unsupported value for BITs
-        throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + fieldString);
-      }
+      returnValue = toBit(csvString, returnValue);
       break;
     case ARRAY:
     case SET:
-      returnValue = parseListElementFromJSON(fieldString);
+      returnValue = toList(csvString);
       break;
     case MAP:
-      returnValue = parseMapElementFromJSON(fieldString);
+      returnValue = toMap(csvString);
       break;
     default:
       throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0004,
@@ -321,89 +254,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     return returnValue;
   }
 
-  private Object parseDateTime(String fieldString, Column column) {
-    Object returnValue;
-    String dateTime = removeQuotes(fieldString);
-    org.apache.sqoop.schema.type.DateTime col = ((org.apache.sqoop.schema.type.DateTime) column);
-    if (col.hasFraction() && col.hasTimezone()) {
-      // After calling withOffsetParsed method, a string
-      // '2004-06-09T10:20:30-08:00' will create a datetime with a zone of
-      // -08:00 (a fixed zone, with no daylight savings rules)
-      returnValue = dtfWithFractionAndTimeZone.withOffsetParsed().parseDateTime(dateTime);
-    } else if (col.hasFraction() && !col.hasTimezone()) {
-      // we use local date time explicitly to not include the timezone
-      returnValue = dtfWithFractionNoTimeZone.parseLocalDateTime(dateTime);
-    } else if (col.hasTimezone()) {
-      returnValue = dtfWithNoFractionWithTimeZone.withOffsetParsed().parseDateTime(dateTime);
-    } else {
-      // we use local date time explicitly to not include the timezone
-      returnValue = dtfWithNoFractionAndTimeZone.parseLocalDateTime(dateTime);
-    }
-    return returnValue;
-  }
-
-  private Object[] parseListElementFromJSON(String fieldString) {
-
-    JSONArray array = null;
-    try {
-      array = (JSONArray) new JSONParser().parse(removeQuotes(fieldString));
-    } catch (org.json.simple.parser.ParseException e) {
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e);
-    }
-    if (array != null) {
-      return array.toArray();
-    }
-    return null;
-  }
-
-  private Map<Object, Object> parseMapElementFromJSON(String fieldString) {
-
-    JSONObject object = null;
-    try {
-      object = (JSONObject) new JSONParser().parse(removeQuotes(fieldString));
-    } catch (org.json.simple.parser.ParseException e) {
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e);
-    }
-    if (object != null) {
-      return toMap(object);
-    }
-    return null;
-  }
-
-  private List<Object> toList(JSONArray array) {
-    List<Object> list = new ArrayList<Object>();
-    for (int i = 0; i < array.size(); i++) {
-      Object value = array.get(i);
-      if (value instanceof JSONArray) {
-        value = toList((JSONArray) value);
-      }
-
-      else if (value instanceof JSONObject) {
-        value = toMap((JSONObject) value);
-      }
-      list.add(value);
-    }
-    return list;
-  }
-
-  @SuppressWarnings("unchecked")
-  private Map<Object, Object> toMap(JSONObject object) {
-    Map<Object, Object> elementMap = new HashMap<Object, Object>();
-    Set<Map.Entry<Object, Object>> entries = object.entrySet();
-    for (Map.Entry<Object, Object> entry : entries) {
-      Object value = entry.getValue();
-
-      if (value instanceof JSONArray) {
-        value = toList((JSONArray) value);
-      }
-
-      else if (value instanceof JSONObject) {
-        value = toMap((JSONObject) value);
-      }
-      elementMap.put(entry.getKey(), value);
-    }
-    return elementMap;
-  }
+ 
 
   /**
    * Appends the actual java objects into CSV string {@inheritDoc}
@@ -420,8 +271,8 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
       }
     }
     // ignore the null values while encoding the object array into csv string
-    encodeCSVStringElements(data, columnArray, nullValueIndices);
-    this.data = StringUtils.join(data, SEPARATOR_CHARACTER);
+    encodeToCSVText(data, columnArray, nullValueIndices);
+    this.data = StringUtils.join(data, CSV_SEPARATOR_CHARACTER);
   }
 
   /**
@@ -468,27 +319,22 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     return data.compareTo(o.getCSVTextData());
   }
 
-  /**
-   * Sanitize every element of the CSV string based on the column type
-   *
-   * @param objectArray
-   */
+ /**
+  * Encode to the sqoop prescribed CSV String for every element in the objet array
+  * @param objectArray
+  * @param columnArray
+  * @param nullValueIndices
+  */
   @SuppressWarnings("unchecked")
-  private void encodeCSVStringElements(Object[] objectArray, Column[] columnArray, Set<Integer> nullValueIndices) {
+  private void encodeToCSVText(Object[] objectArray, Column[] columnArray, Set<Integer> nullValueIndices) {
     for (int i : bitTypeColumnIndices) {
       if (!nullValueIndices.contains(i)) {
-        String bitStringValue = objectArray[i].toString();
-        if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
-          objectArray[i] = bitStringValue;
-        } else {
-          throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: "
-              + objectArray[i]);
-        }
+        encodeToCSVBit(objectArray, i);
       }
     }
     for (int i : stringTypeColumnIndices) {
       if (!nullValueIndices.contains(i)) {
-        objectArray[i] = escapeString((String) objectArray[i]);
+        objectArray[i] = encodeToCSVString((String) objectArray[i]);
       }
     }
     for (int i : dateTimeTypeColumnIndices) {
@@ -497,170 +343,38 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
         if (objectArray[i] instanceof org.joda.time.DateTime) {
           org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i];
           // check for fraction and time zone and then use the right formatter
-          formatDateTime(objectArray, i, col, dateTime);
+          encodeToCSVDateTime(objectArray, i, col, dateTime);
         } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
           org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i];
-          formatLocalDateTime(objectArray, i, col, localDateTime);
+          encodeToCSVLocalDateTime(objectArray, i, col, localDateTime);
         }
       }
     }
     for (int i : dateTypeColumnIndices) {
       if (!nullValueIndices.contains(i)) {
-        org.joda.time.LocalDate date = (org.joda.time.LocalDate) objectArray[i];
-        objectArray[i] = encloseWithQuote(df.print(date));
+        encodeToCSVDate(objectArray, i);
       }
     }
     for (int i : timeColumnIndices) {
       Column col = columnArray[i];
       if (!nullValueIndices.contains(i)) {
-        if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) {
-          objectArray[i] = encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) objectArray[i]));
-        } else {
-          objectArray[i] = encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) objectArray[i]));
-        }
+        encodeToCSVTime(objectArray, i, col);
       }
     }
     for (int i : byteTypeColumnIndices) {
       if (!nullValueIndices.contains(i)) {
-        objectArray[i] = escapeByteArrays((byte[]) objectArray[i]);
+        objectArray[i] = encodeToCSVByteArray((byte[]) objectArray[i]);
       }
     }
     for (int i : listTypeColumnIndices) {
       if (!nullValueIndices.contains(i)) {
-        objectArray[i] = encodeList((Object[]) objectArray[i], columnArray[i]);
+        objectArray[i] = encodeToCSVList((Object[]) objectArray[i], (AbstractComplexListType)columnArray[i]);
       }
     }
     for (int i : mapTypeColumnIndices) {
       if (!nullValueIndices.contains(i)) {
-        objectArray[i] = encodeMap((Map<Object, Object>) objectArray[i], columnArray[i]);
-      }
-    }
-  }
-
-  private void formatLocalDateTime(Object[] objectArray, int i, Column col, org.joda.time.LocalDateTime localDateTime) {
-    org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
-    if (column.hasFraction()) {
-      objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(localDateTime));
-    } else {
-      objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(localDateTime));
-    }
-  }
-
-  private void formatDateTime(Object[] objectArray, int i, Column col, org.joda.time.DateTime dateTime) {
-    org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
-    if (column.hasFraction() && column.hasTimezone()) {
-      objectArray[i] = encloseWithQuote(dtfWithFractionAndTimeZone.print(dateTime));
-    } else if (column.hasFraction() && !column.hasTimezone()) {
-      objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(dateTime));
-    } else if (column.hasTimezone()) {
-      objectArray[i] = encloseWithQuote(dtfWithNoFractionWithTimeZone.print(dateTime));
-    } else {
-      objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(dateTime));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private String encodeMap(Map<Object, Object> map, Column column) {
-    JSONObject object = new JSONObject();
-    object.putAll(map);
-    return encloseWithQuote(object.toJSONString());
-  }
-
-  @SuppressWarnings("unchecked")
-  private String encodeList(Object[] list, Column column) {
-    List<Object> elementList = new ArrayList<Object>();
-    for (int n = 0; n < list.length; n++) {
-      Column listType = ((AbstractComplexListType) column).getListType();
-      if (isColumnListType(listType)) {
-        Object[] listElements = (Object[]) list[n];
-        elementList.add((Arrays.deepToString(listElements)));
-      } else {
-        elementList.add(list[n]);
-      }
-    }
-    JSONArray array = new JSONArray();
-    array.addAll(elementList);
-    return encloseWithQuote(array.toJSONString());
-  }
-
-  private boolean isColumnListType(Column listType) {
-    return listType.getType().equals(ColumnType.ARRAY) || listType.getType().equals(ColumnType.SET);
-  }
-
-  private boolean isColumnStringType(Column stringType) {
-    return stringType.getType().equals(ColumnType.TEXT) || stringType.getType().equals(ColumnType.ENUM);
-  }
-
-  private String escapeByteArrays(byte[] bytes) {
-    try {
-      return escapeString(new String(bytes, BYTE_FIELD_CHARSET));
-    } catch (UnsupportedEncodingException e) {
-      // We should never hit this case.
-      // This character set should be distributed with Java.
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0001,
-          "The character set " + BYTE_FIELD_CHARSET + " is not available.");
-    }
-  }
-
-  private String getRegExp(char orig) {
-    return getRegExp(String.valueOf(orig));
-  }
-
-  private String getRegExp(String orig) {
-    return orig.replaceAll("\\\\", Matcher.quoteReplacement("\\\\"));
-  }
-
-  private String escapeString(String orig) {
-    int j = 0;
-    String replacement = orig;
-    try {
-      for (j = 0; j < replacements.length; j++) {
-        replacement = replacement.replaceAll(getRegExp(originals[j]),
-            Matcher.quoteReplacement(replacements[j]));
-      }
-    } catch (Exception e) {
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, orig
-          + "  " + replacement + "  " + String.valueOf(j) + "  " + e.getMessage());
-    }
-    return encloseWithQuote(replacement);
-  }
-
-  private String encloseWithQuote(String string) {
-    StringBuilder builder = new StringBuilder();
-    builder.append(QUOTE_CHARACTER).append(string).append(QUOTE_CHARACTER);
-    return builder.toString();
-  }
-
-  private String unescapeString(String orig) {
-    // Remove the trailing and starting quotes.
-    orig = removeQuotes(orig);
-    int j = 0;
-    try {
-      for (j = 0; j < replacements.length; j++) {
-        orig = orig.replaceAll(getRegExp(replacements[j]),
-            Matcher.quoteReplacement(String.valueOf(originals[j])));
+        objectArray[i] = encodeToCSVMap((Map<Object, Object>) objectArray[i], columnArray[i]);
       }
-    } catch (Exception e) {
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, orig
-          + "  " + String.valueOf(j) + e.getMessage());
-    }
-
-    return orig;
-  }
-
-  private String removeQuotes(String string) {
-    return string.substring(1, string.length() - 1);
-  }
-
-  private byte[] unescapeByteArray(String orig) {
-    // Always encoded in BYTE_FIELD_CHARSET.
-    try {
-      return unescapeString(orig).getBytes(BYTE_FIELD_CHARSET);
-    } catch (UnsupportedEncodingException e) {
-      // Should never hit this case.
-      // This character set should be distributed with Java.
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0001,
-          "The character set " + BYTE_FIELD_CHARSET + " is not available.");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/293e9ef6/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index e219948..eca7c58 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -67,7 +67,7 @@ public abstract class IntermediateDataFormat<T> {
     this.data = data;
   }
   /**
-   * Get one row of data as CSV text. Use SqoopDataUtils for reading and writing
+   * Get one row of data as CSV text. Use {@link #SqoopIDFUtils} for reading and writing
    * into the sqoop specified CSV text format for each {@link #ColumnType} field in the row
    * Why a "native" internal format and then return CSV text too?
    * Imagine a connector that moves data from a system that stores data as a
@@ -93,7 +93,7 @@ public abstract class IntermediateDataFormat<T> {
 
   /**
    * Get one row of data as an Object array. Sqoop uses defined object representation
-   * for each column type. For instance org.joda.time to represent date.Use SqoopDataUtils
+   * for each column type. For instance org.joda.time to represent date.Use {@link #SqoopIDFUtils}
    * for reading and writing into the sqoop specified object format
    * for each {@link #ColumnType} field in the row
    * </p>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/293e9ef6/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java
new file mode 100644
index 0000000..0dde2e7
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/common/TestSqoopIDFUtils.java
@@ -0,0 +1,162 @@
+package org.apache.sqoop.connector.common;
+
+import static org.junit.Assert.*;
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
+
+import org.apache.sqoop.schema.type.AbstractComplexListType;
+import org.apache.sqoop.schema.type.Array;
+import org.apache.sqoop.schema.type.Text;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestSqoopIDFUtils {
+
+  public static String getByteFieldString(byte[] byteFieldData) {
+    try {
+      return new StringBuilder("'").append(new String(byteFieldData, BYTE_FIELD_CHARSET)).append("'").toString();
+    } catch (UnsupportedEncodingException e) {
+      // Should never get to this point because ISO-8859-1 is a standard codec.
+      return null;
+    }
+  }
+
+  @Test
+  public void testEncloseStringWithQuotes() {
+    String test = "test";
+    String quotedText = encloseWithQuote(test);
+    assertEquals(quotedText, "'test'");
+
+  }
+
+  @Test
+  public void testStringWithQuotesToEncloseStringWithQuotes() {
+    String test = "'test'";
+    String quotedText = encloseWithQuote(test);
+    assertEquals(quotedText, "''test''");
+
+  }
+
+  @Test
+  public void testRemoveQuotes() {
+    String test = "'test'";
+    String quotedText = removeQuotes(test);
+    assertEquals(quotedText, "test");
+  }
+
+  @Test
+  public void testStringWithNoQuotesRemoveQuotes() {
+    String test = "test";
+    String quotedText = removeQuotes(test);
+    assertEquals(quotedText, "test");
+  }
+
+  @Test
+  public void testStingWithNoQuotesRemoveQuotes() {
+    String test = "test";
+    String quotedText = removeQuotes(test);
+    assertEquals(quotedText, "test");
+  }
+
+  @Test
+  public void testExample1EncodeToCSVString() {
+    String test = "test";
+    String encodedText = encodeToCSVString(test);
+    assertEquals(encodedText, "'test'");
+  }
+
+  @Test
+  public void testExample2EncodeToCSVString() {
+    String test = "test,test1";
+    String encodedText = encodeToCSVString(test);
+    assertEquals(encodedText, "'test,test1'");
+  }
+
+  @Test
+  public void testExample3EncodeToCSVString() {
+    String test = "test,'test1";
+    String encodedText = encodeToCSVString(test);
+    assertEquals(encodedText, "'test,\\'test1'");
+  }
+
+  @Test
+  public void testExample4EncodeToCSVString() {
+    String test = "test,\"test1";
+    String encodedText = encodeToCSVString(test);
+    assertEquals(encodedText, "'test,\\\"test1'");
+  }
+
+  @Test
+  public void testExample4ToString() {
+    String test = "'test,\\\"test1'";
+    String expectedString = "test,\"test1";
+    String toString = toText(test);
+    assertEquals(toString, expectedString);
+  }
+
+  public void testExample5EncodeToCSVString() {
+    String test = new String(new char[] { 0x0A });
+    String encodedText = encodeToCSVString(test);
+    assertEquals(encodedText, "'\\n'");
+  }
+
+  public void testExample5ToString() {
+    String test = "'\\n'";
+    String expectedString = new String(new char[] { 0x0A });
+    String toString = toText(test);
+    assertEquals(toString, expectedString);
+  }
+
+  public void testExample6EncodeToCSVString() {
+    String test = new String(new char[] { 0x0D });
+    String encodedText = encodeToCSVString(test);
+    assertEquals(encodedText, "'\\r'");
+  }
+
+  @Test
+  public void testEncodeByteToCSVString() {
+    // byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
+    byte[] bytes = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54 };
+    String encodedText = encodeToCSVByteArray(bytes);
+    String expectedText = getByteFieldString(bytes).replaceAll("\r", "\\\\r");
+    assertEquals(encodedText, expectedText);
+  }
+
+  @Test
+  public void testEncodeArrayIntegersToCSVString() {
+    List<Integer> list = new ArrayList<Integer>();
+    list.add(1);
+    list.add(2);
+    AbstractComplexListType array = new Array("a", new Text("t"));
+    String encodedText = encodeToCSVList(list.toArray(), array);
+    assertEquals(encodedText, "'[1,2]'");
+  }
+
+  @Test
+  public void testEncodeArrayStringsToCSVString() {
+    List<String> list = new ArrayList<String>();
+    list.add("A");
+    list.add("B");
+    AbstractComplexListType array = new Array("a", new Text("t"));
+    String encodedText = encodeToCSVList(list.toArray(), array);
+    assertEquals(encodedText, "'[\"A\",\"B\"]'");
+  }
+
+  @Test
+  public void testEncodeMapToCSVString() {
+    List<String> list = new ArrayList<String>();
+    list.add("A");
+    list.add("B");
+    Map<Object, Object> map = new HashMap<Object, Object>();
+    map.put("A", list);
+    org.apache.sqoop.schema.type.Map mapCol = new org.apache.sqoop.schema.type.Map("a", new Text("t"), new Array("r", new Text(
+        "tr")));
+    String encodedText = encodeToCSVMap(map, mapCol);
+    assertEquals(encodedText, "'{\"A\":[\"A\",\"B\"]}'");
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/293e9ef6/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index 1a2a96f..83a95ec 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -22,8 +22,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
+import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
 
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -57,16 +58,6 @@ public class TestCSVIntermediateDataFormat {
     dataFormat = new CSVIntermediateDataFormat();
   }
 
-  private String getByteFieldString(byte[] byteFieldData) {
-    try {
-      return new StringBuilder("'")
-          .append(new String(byteFieldData, CSVIntermediateDataFormat.BYTE_FIELD_CHARSET))
-          .append("'").toString();
-    } catch (UnsupportedEncodingException e) {
-      // Should never get to this point because ISO-8859-1 is a standard codec.
-      return null;
-    }
-  }
 
   //**************test cases for null and empty input*******************
 
@@ -114,7 +105,7 @@ public class TestCSVIntermediateDataFormat {
     String csvText = dataFormat.getCSVTextData();
     String[] textValues = csvText.split(",");
     for (String text : textValues) {
-      assertEquals(text, CSVIntermediateDataFormat.NULL_VALUE);
+      assertEquals(text, NULL_VALUE);
     }
   }
 
@@ -183,7 +174,7 @@ public class TestCSVIntermediateDataFormat {
     String csvText = dataFormat.getCSVTextData();
     String[] textValues = csvText.split(",");
     for (String text : textValues) {
-      assertEquals(text, CSVIntermediateDataFormat.NULL_VALUE);
+      assertEquals(text, NULL_VALUE);
     }
   }
 
@@ -252,8 +243,8 @@ public class TestCSVIntermediateDataFormat {
     dataFormat.setObjectData(in);
 
     //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
-    String testData = "10,34,'54','random data'," +
-        getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n','TEST_ENUM'";
+    String testData = "10,34,'54','random data'," + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r")
+        + ",'\\n','TEST_ENUM'";
     assertEquals(testData, dataFormat.getCSVTextData());
   }
 
@@ -315,8 +306,8 @@ public class TestCSVIntermediateDataFormat {
     dataFormat.setObjectData(in);
 
     //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
-    String testData = "10,34,NULL,'random data'," +
-        getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n','TEST_ENUM'";
+    String testData = "10,34,NULL,'random data'," + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r")
+        + ",'\\n','TEST_ENUM'";
     assertEquals(testData, dataFormat.getCSVTextData());
   }
 
@@ -521,7 +512,7 @@ public class TestCSVIntermediateDataFormat {
     dataFormat.setSchema(schema);
     // current date time
     org.joda.time.DateTime dateTime = new org.joda.time.DateTime();
-    String dateTimeString = CSVIntermediateDataFormat.dtfWithFractionNoTimeZone.print(dateTime);
+    String dateTimeString = dtfWithFractionNoTimeZone.print(dateTime);
     dataFormat.setCSVTextData("'" + dateTimeString + "'");
     assertEquals(dateTimeString.replace(" ", "T"), dataFormat.getObjectData()[0].toString());
   }