You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/01/13 04:05:09 UTC

sqoop git commit: SQOOP-1995: Sqoop2: Allow nulls only if the column for that field has IsNullable to be true

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 ec30437f5 -> 2d54e26a0


SQOOP-1995: Sqoop2: Allow nulls only if the column for that field has IsNullable to be true

(Veena Basavaraj via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/2d54e26a
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/2d54e26a
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/2d54e26a

Branch: refs/heads/sqoop2
Commit: 2d54e26a086b0407afd9f22ae26e1af32b631bfe
Parents: ec30437
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Mon Jan 12 19:03:28 2015 -0800
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Mon Jan 12 19:03:28 2015 -0800

----------------------------------------------------------------------
 .../idf/AVROIntermediateDataFormat.java         | 231 +++++++++-------
 .../idf/AVROIntermediateDataFormatError.java    |   2 -
 .../idf/CSVIntermediateDataFormat.java          |  58 ++--
 .../connector/idf/IntermediateDataFormat.java   |   2 +
 .../idf/IntermediateDataFormatError.java        |   5 +-
 .../idf/JSONIntermediateDataFormat.java         | 209 +++++++-------
 .../idf/JSONIntermediateDataFormatError.java    |   2 -
 .../idf/TestAVROIntermediateDataFormat.java     | 269 +++++++++++++++++--
 .../idf/TestCSVIntermediateDataFormat.java      |  52 +++-
 .../idf/TestJSONIntermediateDataFormat.java     | 254 +++++++++++++++--
 10 files changed, 812 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
index 67b47e7..4d68ea0 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
@@ -73,6 +73,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
    */
   @Override
   public void setCSVTextData(String text) {
+    super.validateSchema(schema);
     // convert the CSV text to avro
     this.data = toAVRO(text);
   }
@@ -82,6 +83,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
    */
   @Override
   public String getCSVTextData() {
+    super.validateSchema(schema);
     // convert avro to sqoop CSV
     return toCSV(data);
   }
@@ -91,6 +93,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
    */
   @Override
   public void setObjectData(Object[] data) {
+    super.validateSchema(schema);
     // convert the object array to avro
     this.data = toAVRO(data);
   }
@@ -100,6 +103,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
    */
   @Override
   public Object[] getObjectData() {
+    super.validateSchema(schema);
     // convert avro to object array
     return toObject(data);
   }
@@ -143,21 +147,22 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
     if (csvStringArray == null) {
       return null;
     }
-
-    if (csvStringArray.length != schema.getColumnsArray().length) {
-      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + csv
-          + " has the wrong number of fields.");
+    Column[] columns = schema.getColumnsArray();
+    if (csvStringArray.length != columns.length) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+          "The data " + csv + " has the wrong number of fields.");
     }
     GenericRecord avroObject = new GenericData.Record(avroSchema);
-    Column[] columnArray = schema.getColumnsArray();
     for (int i = 0; i < csvStringArray.length; i++) {
-      // check for NULL field and assume this field is nullable as per the sqoop
-      // schema
-      if (csvStringArray[i].equals(NULL_VALUE) && columnArray[i].isNullable()) {
-        avroObject.put(columnArray[i].getName(), null);
+      if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+            columns[i].getName() + " does not support null values");
+      }
+      if (csvStringArray[i].equals(NULL_VALUE)) {
+        avroObject.put(columns[i].getName(), null);
         continue;
       }
-      avroObject.put(columnArray[i].getName(), toAVRO(csvStringArray[i], columnArray[i]));
+      avroObject.put(columns[i].getName(), toAVRO(csvStringArray[i], columns[i]));
     }
     return avroObject;
   }
@@ -219,66 +224,80 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
     return returnValue;
   }
 
-  private GenericRecord toAVRO(Object[] data) {
+  private GenericRecord toAVRO(Object[] objectArray) {
 
-    if (data == null) {
+    if (objectArray == null) {
       return null;
     }
+    Column[] columns = schema.getColumnsArray();
 
-    if (data.length != schema.getColumnsArray().length) {
-      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + data.toString()
-          + " has the wrong number of fields.");
+    if (objectArray.length != columns.length) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+          "The data " + objectArray.toString() + " has the wrong number of fields.");
     }
     // get avro schema from sqoop schema
     GenericRecord avroObject = new GenericData.Record(avroSchema);
-    Column[] cols = schema.getColumnsArray();
-    for (int i = 0; i < data.length; i++) {
-      switch (cols[i].getType()) {
+    for (int i = 0; i < objectArray.length; i++) {
+      if (objectArray[i] == null && !columns[i].isNullable()) {
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+            columns[i].getName() + " does not support null values");
+      }
+      if (objectArray[i] == null) {
+        avroObject.put(columns[i].getName(), null);
+        continue;
+      }
+
+      switch (columns[i].getType()) {
       case ARRAY:
       case SET:
-        avroObject.put(cols[i].getName(), toList((Object[]) data[i]));
+        avroObject.put(columns[i].getName(), toList((Object[]) objectArray[i]));
         break;
       case MAP:
-        avroObject.put(cols[i].getName(), data[i]);
+        avroObject.put(columns[i].getName(), objectArray[i]);
         break;
       case ENUM:
-        GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(createEnumSchema(cols[i]), (String) data[i]);
-        avroObject.put(cols[i].getName(), enumValue);
+        GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(createEnumSchema(columns[i]),
+            (String) objectArray[i]);
+        avroObject.put(columns[i].getName(), enumValue);
         break;
       case TEXT:
-        avroObject.put(cols[i].getName(), new Utf8((String) data[i]));
+        avroObject.put(columns[i].getName(), new Utf8((String) objectArray[i]));
         break;
       case BINARY:
       case UNKNOWN:
-        avroObject.put(cols[i].getName(), ByteBuffer.wrap((byte[]) data[i]));
+        avroObject.put(columns[i].getName(), ByteBuffer.wrap((byte[]) objectArray[i]));
         break;
       case FIXED_POINT:
       case FLOATING_POINT:
-        avroObject.put(cols[i].getName(), data[i]);
+        avroObject.put(columns[i].getName(), objectArray[i]);
         break;
       case DECIMAL:
         // TODO: store as FIXED in SQOOP-16161
-        avroObject.put(cols[i].getName(), ((BigDecimal) data[i]).toPlainString());
+        avroObject.put(columns[i].getName(), ((BigDecimal) objectArray[i]).toPlainString());
         break;
       case DATE_TIME:
-        if (data[i] instanceof org.joda.time.DateTime) {
-          avroObject.put(cols[i].getName(), ((org.joda.time.DateTime) data[i]).toDate().getTime());
-        } else if (data[i] instanceof org.joda.time.LocalDateTime) {
-          avroObject.put(cols[i].getName(), ((org.joda.time.LocalDateTime) data[i]).toDate().getTime());
+        if (objectArray[i] instanceof org.joda.time.DateTime) {
+          avroObject.put(columns[i].getName(), ((org.joda.time.DateTime) objectArray[i]).toDate()
+              .getTime());
+        } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
+          avroObject.put(columns[i].getName(), ((org.joda.time.LocalDateTime) objectArray[i])
+              .toDate().getTime());
         }
         break;
       case TIME:
-        avroObject.put(cols[i].getName(), ((org.joda.time.LocalTime) data[i]).toDateTimeToday().getMillis());
+        avroObject.put(columns[i].getName(), ((org.joda.time.LocalTime) objectArray[i])
+            .toDateTimeToday().getMillis());
         break;
       case DATE:
-        avroObject.put(cols[i].getName(), ((org.joda.time.LocalDate) data[i]).toDate().getTime());
+        avroObject.put(columns[i].getName(), ((org.joda.time.LocalDate) objectArray[i]).toDate()
+            .getTime());
         break;
       case BIT:
-        avroObject.put(cols[i].getName(), Boolean.valueOf((Boolean) data[i]));
+        avroObject.put(columns[i].getName(), Boolean.valueOf((Boolean) objectArray[i]));
         break;
       default:
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
-            "Column type from schema was not recognized for " + cols[i].getType());
+            "Column type from schema was not recognized for " + columns[i].getType());
       }
     }
 
@@ -287,68 +306,72 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
 
   @SuppressWarnings("unchecked")
   private String toCSV(GenericRecord record) {
-    Column[] cols = this.schema.getColumnsArray();
+    Column[] columns = this.schema.getColumnsArray();
 
     StringBuilder csvString = new StringBuilder();
-    for (int i = 0; i < cols.length; i++) {
+    for (int i = 0; i < columns.length; i++) {
 
-      Object obj = record.get(cols[i].getName());
-
-      if (obj == null) {
-        throw new SqoopException(AVROIntermediateDataFormatError.AVRO_INTERMEDIATE_DATA_FORMAT_0001, " for " + cols[i].getName());
+      Object obj = record.get(columns[i].getName());
+      if (obj == null && !columns[i].isNullable()) {
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+            columns[i].getName() + " does not support null values");
       }
-
-      switch (cols[i].getType()) {
-      case ARRAY:
-      case SET:
-        List<Object> objList = (List<Object>) obj;
-        csvString.append(toCSVList(toObjectArray(objList), cols[i]));
-        break;
-      case MAP:
-        Map<Object, Object> objMap = (Map<Object, Object>) obj;
-        csvString.append(toCSVMap(objMap, cols[i]));
-        break;
-      case ENUM:
-      case TEXT:
-        csvString.append(toCSVString(obj.toString()));
-        break;
-      case BINARY:
-      case UNKNOWN:
-        csvString.append(toCSVByteArray(getBytesFromByteBuffer(obj)));
-        break;
-      case FIXED_POINT:
-        csvString.append(toCSVFixedPoint(obj, cols[i]));
-        break;
-      case FLOATING_POINT:
-        csvString.append(toCSVFloatingPoint(obj, cols[i]));
-        break;
-      case DECIMAL:
-        // stored as string
-        csvString.append(toCSVDecimal(obj));
-        break;
-      case DATE:
-        // stored as long
-        Long dateInMillis = (Long) obj;
-        csvString.append(toCSVDate(new org.joda.time.LocalDate(dateInMillis)));
-        break;
-      case TIME:
-        // stored as long
-        Long timeInMillis = (Long) obj;
-        csvString.append(toCSVTime(new org.joda.time.LocalTime(timeInMillis), cols[i]));
-        break;
-      case DATE_TIME:
-        // stored as long
-        Long dateTimeInMillis = (Long) obj;
-        csvString.append(toCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), cols[i]));
-        break;
-      case BIT:
-        csvString.append(toCSVBit(obj));
-        break;
-      default:
-        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
-            "Column type from schema was not recognized for " + cols[i].getType());
+      if (obj == null) {
+        csvString.append(NULL_VALUE);
+      } else {
+
+        switch (columns[i].getType()) {
+        case ARRAY:
+        case SET:
+          List<Object> objList = (List<Object>) obj;
+          csvString.append(toCSVList(toObjectArray(objList), columns[i]));
+          break;
+        case MAP:
+          Map<Object, Object> objMap = (Map<Object, Object>) obj;
+          csvString.append(toCSVMap(objMap, columns[i]));
+          break;
+        case ENUM:
+        case TEXT:
+          csvString.append(toCSVString(obj.toString()));
+          break;
+        case BINARY:
+        case UNKNOWN:
+          csvString.append(toCSVByteArray(getBytesFromByteBuffer(obj)));
+          break;
+        case FIXED_POINT:
+          csvString.append(toCSVFixedPoint(obj, columns[i]));
+          break;
+        case FLOATING_POINT:
+          csvString.append(toCSVFloatingPoint(obj, columns[i]));
+          break;
+        case DECIMAL:
+          // stored as string
+          csvString.append(toCSVDecimal(obj));
+          break;
+        case DATE:
+          // stored as long
+          Long dateInMillis = (Long) obj;
+          csvString.append(toCSVDate(new org.joda.time.LocalDate(dateInMillis)));
+          break;
+        case TIME:
+          // stored as long
+          Long timeInMillis = (Long) obj;
+          csvString.append(toCSVTime(new org.joda.time.LocalTime(timeInMillis), columns[i]));
+          break;
+        case DATE_TIME:
+          // stored as long
+          Long dateTimeInMillis = (Long) obj;
+          csvString.append(toCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), columns[i]));
+          break;
+        case BIT:
+          csvString.append(toCSVBit(obj));
+          break;
+        default:
+          throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+              "Column type from schema was not recognized for " + columns[i].getType());
+        }
       }
-      if (i < cols.length - 1) {
+      if (i < columns.length - 1) {
         csvString.append(CSV_SEPARATOR_CHARACTER);
       }
 
@@ -360,19 +383,25 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
   @SuppressWarnings("unchecked")
   private Object[] toObject(GenericRecord record) {
 
-    if (data == null) {
+    if (record == null) {
       return null;
     }
-    Column[] cols = schema.getColumnsArray();
-    Object[] object = new Object[cols.length];
-
-    for (int i = 0; i < cols.length; i++) {
-      Object obj = record.get(cols[i].getName());
+    Column[] columns = schema.getColumnsArray();
+    Object[] object = new Object[columns.length];
+
+    for (int i = 0; i < columns.length; i++) {
+      Object obj = record.get(columns[i].getName());
+      Integer nameIndex = schema.getColumnNameIndex(columns[i].getName());
+      Column column = columns[nameIndex];
+      // null is a possible value
+      if (obj == null && !column.isNullable()) {
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+            column.getName() + " does not support null values");
+      }
       if (obj == null) {
-        throw new SqoopException(AVROIntermediateDataFormatError.AVRO_INTERMEDIATE_DATA_FORMAT_0001, " for " + cols[i].getName());
+        object[nameIndex] = null;
+        continue;
       }
-      Integer nameIndex = schema.getColumnNameIndex(cols[i].getName());
-      Column column = cols[nameIndex];
       switch (column.getType()) {
       case ARRAY:
       case SET:
@@ -422,7 +451,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
         break;
       default:
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
-            "Column type from schema was not recognized for " + cols[i].getType());
+            "Column type from schema was not recognized for " + column.getType());
       }
 
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java
index 6af21a3..3dcbf4a 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java
@@ -25,8 +25,6 @@ public enum AVROIntermediateDataFormatError implements ErrorCode {
   /** An unknown error has occurred. */
   AVRO_INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
 
-  AVRO_INTERMEDIATE_DATA_FORMAT_0001("Missing key in the AVRO object.")
-
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index 856a4bb..2af6acd 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -58,7 +58,8 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
    */
   @Override
   public String getCSVTextData() {
-    return data;
+    // TODO:SQOOP-1936 to enable schema validation after we use compareTo
+    return this.data;
   }
 
   /**
@@ -66,7 +67,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
    */
   @Override
   public void setCSVTextData(String csvText) {
-    this.data = csvText;
+    super.setData(csvText);
   }
 
   /**
@@ -87,13 +88,17 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
     }
 
     Object[] objectArray = new Object[csvStringArray.length];
-    Column[] columnArray = schema.getColumnsArray();
+    Column[] columns = schema.getColumnsArray();
     for (int i = 0; i < csvStringArray.length; i++) {
+      if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+            columns[i].getName() + " does not support null values");
+      }
       if (csvStringArray[i].equals(NULL_VALUE)) {
         objectArray[i] = null;
         continue;
       }
-      objectArray[i] = toObject(csvStringArray[i], columnArray[i]);
+      objectArray[i] = toObject(csvStringArray[i], columns[i]);
     }
     return objectArray;
   }
@@ -183,65 +188,68 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
   @SuppressWarnings("unchecked")
   private String toCSV(Object[] objectArray) {
 
-    Column[] columnArray = schema.getColumnsArray();
+    Column[] columns = schema.getColumnsArray();
 
     StringBuilder csvString = new StringBuilder();
-    for (int i = 0; i < columnArray.length; i++) {
-      Object obj = objectArray[i];
-      if (obj == null) {
+    for (int i = 0; i < columns.length; i++) {
+      if (objectArray[i] == null && !columns[i].isNullable()) {
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+            columns[i].getName() + " does not support null values");
+      }
+      if (objectArray[i] == null) {
         csvString.append(NULL_VALUE);
       } else {
-        switch (columnArray[i].getType()) {
+        switch (columns[i].getType()) {
         case ARRAY:
         case SET:
-          csvString.append(toCSVList((Object[]) obj, (AbstractComplexListType) columnArray[i]));
+          csvString.append(toCSVList((Object[]) objectArray[i], (AbstractComplexListType) columns[i]));
           break;
         case MAP:
-          csvString.append(toCSVMap((Map<Object, Object>) obj, columnArray[i]));
+          csvString.append(toCSVMap((Map<Object, Object>) objectArray[i], columns[i]));
           break;
         case ENUM:
         case TEXT:
-          csvString.append(toCSVString(obj.toString()));
+          csvString.append(toCSVString(objectArray[i].toString()));
           break;
         case BINARY:
         case UNKNOWN:
-          csvString.append(toCSVByteArray((byte[]) obj));
+          csvString.append(toCSVByteArray((byte[]) objectArray[i]));
           break;
         case FIXED_POINT:
-          csvString.append(toCSVFixedPoint(obj, columnArray[i]));
+          csvString.append(toCSVFixedPoint(objectArray[i], columns[i]));
           break;
         case FLOATING_POINT:
-          csvString.append(toCSVFloatingPoint(obj, columnArray[i]));
+          csvString.append(toCSVFloatingPoint(objectArray[i], columns[i]));
           break;
         case DECIMAL:
-          csvString.append(toCSVDecimal(obj));
+          csvString.append(toCSVDecimal(objectArray[i]));
           break;
         // stored in JSON as strings in the joda time format
         case DATE:
-          csvString.append(toCSVDate(obj));
+          csvString.append(toCSVDate(objectArray[i]));
           break;
         case TIME:
-          csvString.append(toCSVTime(obj, columnArray[i]));
+          csvString.append(toCSVTime(objectArray[i], columns[i]));
           break;
         case DATE_TIME:
           if (objectArray[i] instanceof org.joda.time.DateTime) {
-            org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj;
+            org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i];
             // check for fraction and time zone and then use the right formatter
-            csvString.append(toCSVDateTime(dateTime, columnArray[i]));
+            csvString.append(toCSVDateTime(dateTime, columns[i]));
           } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
-            org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) obj;
-            csvString.append(toCSVLocalDateTime(localDateTime, columnArray[i]));
+            org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i];
+            csvString.append(toCSVLocalDateTime(localDateTime, columns[i]));
           }
           break;
         case BIT:
-          csvString.append(toCSVBit(obj));
+          csvString.append(toCSVBit(objectArray[i]));
           break;
         default:
           throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
-              "Column type from schema was not recognized for " + columnArray[i].getType());
+              "Column type from schema was not recognized for " + columns[i].getType());
         }
       }
-      if (i < columnArray.length - 1) {
+      if (i < columns.length - 1) {
         csvString.append(CSV_SEPARATOR_CHARACTER);
       }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index 261a462..6063320 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -60,6 +60,7 @@ public abstract class IntermediateDataFormat<T> {
    *         the intermediate data format implementation.
    */
   public T getData() {
+    validateSchema(schema);
     return data;
   }
 
@@ -70,6 +71,7 @@ public abstract class IntermediateDataFormat<T> {
    * @param obj - A single row of data to be moved.
    */
   public void setData(T obj) {
+    validateSchema(schema);
     this.data = obj;
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
index 1f583b2..f4e1fb7 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
@@ -28,13 +28,16 @@ public enum IntermediateDataFormatError implements ErrorCode {
   INTERMEDIATE_DATA_FORMAT_0001("Wrong number of columns."),
 
   /** Schema is missing in the IDF. */
-  INTERMEDIATE_DATA_FORMAT_0002("Schema missing."),
+  INTERMEDIATE_DATA_FORMAT_0002("Schema is null."),
 
   INTERMEDIATE_DATA_FORMAT_0003("JSON parse error"),
 
   /** Column type isn't known by Intermediate Data Format. */
   INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
 
+  /** Column value cannot be null. */
+  INTERMEDIATE_DATA_FORMAT_0005("Column value cannot be null"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
index b937d87..3cfd356 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
@@ -55,6 +55,7 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
    */
   @Override
   public void setCSVTextData(String text) {
+    super.validateSchema(schema);
     // convert the CSV text to JSON
     this.data = toJSON(text);
   }
@@ -64,6 +65,7 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
    */
   @Override
   public String getCSVTextData() {
+    super.validateSchema(schema);
     // convert JSON to sqoop CSV
     return toCSV(data);
   }
@@ -73,6 +75,7 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
    */
   @Override
   public void setObjectData(Object[] data) {
+    super.validateSchema(schema);
     // convert the object Array to JSON
     this.data = toJSON(data);
   }
@@ -82,6 +85,7 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
    */
   @Override
   public Object[] getObjectData() {
+    super.validateSchema(schema);
     // convert JSON to object array
     return toObject(data);
   }
@@ -126,20 +130,24 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
     if (csvStringArray == null) {
       return null;
     }
+    Column[] columns = schema.getColumnsArray();
 
-    if (csvStringArray.length != schema.getColumnsArray().length) {
+    if (csvStringArray.length != columns.length) {
       throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + csv
           + " has the wrong number of fields.");
     }
     JSONObject object = new JSONObject();
-    Column[] columnArray = schema.getColumnsArray();
     for (int i = 0; i < csvStringArray.length; i++) {
+      if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+            columns[i].getName() + " does not support null values");
+      }
       // check for NULL field and bail out immediately
       if (csvStringArray[i].equals(NULL_VALUE)) {
-        object.put(columnArray[i].getName(), null);
+        object.put(columns[i].getName(), null);
         continue;
       }
-      object.put(columnArray[i].getName(), toJSON(csvStringArray[i], columnArray[i]));
+      object.put(columns[i].getName(), toJSON(csvStringArray[i], columns[i]));
 
     }
     return object;
@@ -161,7 +169,7 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
       try {
         returnValue = (JSONObject) new JSONParser().parse(removeQuotes(csvString));
       } catch (ParseException e) {
-        throw new SqoopException(JSONIntermediateDataFormatError.JSON_INTERMEDIATE_DATA_FORMAT_0003, e);
+        throw new SqoopException(JSONIntermediateDataFormatError.JSON_INTERMEDIATE_DATA_FORMAT_0002, e);
       }
       break;
     case ENUM:
@@ -201,128 +209,138 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
   }
 
   @SuppressWarnings("unchecked")
-  private JSONObject toJSON(Object[] data) {
+  private JSONObject toJSON(Object[] objectArray) {
 
-    if (data == null) {
+    if (objectArray == null) {
       return null;
     }
+    Column[] columns = schema.getColumnsArray();
 
-    if (data.length != schema.getColumnsArray().length) {
-      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + data.toString()
+    if (objectArray.length != columns.length) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + objectArray.toString()
           + " has the wrong number of fields.");
     }
-    JSONObject object = new JSONObject();
-    Column[] cols = schema.getColumnsArray();
-    for (int i = 0; i < data.length; i++) {
-      switch (cols[i].getType()) {
+    JSONObject json = new JSONObject();
+    for (int i = 0; i < objectArray.length; i++) {
+      if (objectArray[i] == null && !columns[i].isNullable()) {
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+            columns[i].getName() + " does not support null values");
+      }
+      if (objectArray[i] == null) {
+        json.put(columns[i].getName(), null);
+        continue;
+      }
+      switch (columns[i].getType()) {
       case ARRAY:
       case SET:
         // store as JSON array
-        Object[] objArray = (Object[]) data[i];
+        Object[] objArray = (Object[]) objectArray[i];
         JSONArray jsonArray = toJSONArray(objArray);
-        object.put(cols[i].getName(), jsonArray);
+        json.put(columns[i].getName(), jsonArray);
         break;
       case MAP:
         // store as JSON object
-        Map<Object, Object> map = (Map<Object, Object>) data[i];
+        Map<Object, Object> map = (Map<Object, Object>) objectArray[i];
         JSONObject jsonObject = new JSONObject();
         jsonObject.putAll(map);
-        object.put(cols[i].getName(), jsonObject);
+        json.put(columns[i].getName(), jsonObject);
         break;
       case ENUM:
       case TEXT:
-        object.put(cols[i].getName(), data[i]);
+        json.put(columns[i].getName(), objectArray[i]);
         break;
       case BINARY:
       case UNKNOWN:
-        object.put(cols[i].getName(), Base64.encodeBase64String((byte[]) data[i]));
+        json.put(columns[i].getName(), Base64.encodeBase64String((byte[]) objectArray[i]));
         break;
       case FIXED_POINT:
       case FLOATING_POINT:
       case DECIMAL:
         // store a object
-        object.put(cols[i].getName(), data[i]);
+        json.put(columns[i].getName(), objectArray[i]);
         break;
       // stored in JSON as the same format as csv strings in the joda time
       // format
       case DATE_TIME:
-        object.put(cols[i].getName(), removeQuotes(toCSVDateTime(data[i], cols[i])));
+        json.put(columns[i].getName(), removeQuotes(toCSVDateTime(objectArray[i], columns[i])));
         break;
       case TIME:
-        object.put(cols[i].getName(), removeQuotes(toCSVTime(data[i], cols[i])));
+        json.put(columns[i].getName(), removeQuotes(toCSVTime(objectArray[i], columns[i])));
         break;
       case DATE:
-        object.put(cols[i].getName(), removeQuotes(toCSVDate(data[i])));
+        json.put(columns[i].getName(), removeQuotes(toCSVDate(objectArray[i])));
         break;
       case BIT:
-        object.put(cols[i].getName(), Boolean.valueOf(toCSVBit(data[i])));
+        json.put(columns[i].getName(), Boolean.valueOf(toCSVBit(objectArray[i])));
         break;
       default:
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
-            "Column type from schema was not recognized for " + cols[i].getType());
+            "Column type from schema was not recognized for " + columns[i].getType());
       }
     }
 
-    return object;
+    return json;
   }
 
   private String toCSV(JSONObject json) {
-    Column[] cols = this.schema.getColumnsArray();
+    Column[] columns = this.schema.getColumnsArray();
 
     StringBuilder csvString = new StringBuilder();
-    for (int i = 0; i < cols.length; i++) {
-
-      // or we can to json.entrySet();
-      Object obj = json.get(cols[i].getName());
-      if (obj == null) {
-        throw new SqoopException(JSONIntermediateDataFormatError.JSON_INTERMEDIATE_DATA_FORMAT_0003, " for " + cols[i].getName());
+    for (int i = 0; i < columns.length; i++) {
+      Object obj = json.get(columns[i].getName());
+      if (obj == null && !columns[i].isNullable()) {
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+            columns[i].getName() + " does not support null values");
       }
-
-      switch (cols[i].getType()) {
-      case ARRAY:
-      case SET:
-        // stored as JSON array
-        JSONArray array = (JSONArray) obj;
-        csvString.append(encloseWithQuotes(array.toJSONString()));
-        break;
-      case MAP:
-        // stored as JSON object
-        csvString.append(encloseWithQuotes((((JSONObject) obj).toJSONString())));
-        break;
-      case ENUM:
-      case TEXT:
-        csvString.append(toCSVString(obj.toString()));
-        break;
-      case BINARY:
-      case UNKNOWN:
-        csvString.append(toCSVByteArray(Base64.decodeBase64(obj.toString())));
-        break;
-      case FIXED_POINT:
-        csvString.append(toCSVFixedPoint(obj, cols[i]));
-        break;
-      case FLOATING_POINT:
-        csvString.append(toCSVFloatingPoint(obj, cols[i]));
-        break;
-      case DECIMAL:
-        csvString.append(toCSVDecimal(obj));
-        break;
-      // stored in JSON as strings in the joda time format
-      case DATE:
-      case TIME:
-      case DATE_TIME:
-        csvString.append(encloseWithQuotes(obj.toString()));
-        break;
-      // 0/1 will be stored as they are in JSON, even though valid values in
-      // JSON
-      // are true/false
-      case BIT:
-        csvString.append(toCSVBit(obj));
-        break;
-      default:
-        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
-            "Column type from schema was not recognized for " + cols[i].getType());
+      if (obj == null) {
+        csvString.append(NULL_VALUE);
+      } else {
+        switch (columns[i].getType()) {
+        case ARRAY:
+        case SET:
+          // stored as JSON array
+          JSONArray array = (JSONArray) obj;
+          csvString.append(encloseWithQuotes(array.toJSONString()));
+          break;
+        case MAP:
+          // stored as JSON object
+          csvString.append(encloseWithQuotes((((JSONObject) obj).toJSONString())));
+          break;
+        case ENUM:
+        case TEXT:
+          csvString.append(toCSVString(obj.toString()));
+          break;
+        case BINARY:
+        case UNKNOWN:
+          csvString.append(toCSVByteArray(Base64.decodeBase64(obj.toString())));
+          break;
+        case FIXED_POINT:
+          csvString.append(toCSVFixedPoint(obj, columns[i]));
+          break;
+        case FLOATING_POINT:
+          csvString.append(toCSVFloatingPoint(obj, columns[i]));
+          break;
+        case DECIMAL:
+          csvString.append(toCSVDecimal(obj));
+          break;
+        // stored in JSON as strings in the joda time format
+        case DATE:
+        case TIME:
+        case DATE_TIME:
+          csvString.append(encloseWithQuotes(obj.toString()));
+          break;
+        // 0/1 will be stored as they are in JSON, even though valid values in
+        // JSON
+        // are true/false
+        case BIT:
+          csvString.append(toCSVBit(obj));
+          break;
+        default:
+          throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+              "Column type from schema was not recognized for " + columns[i].getType());
+        }
       }
-      if (i < cols.length - 1) {
+      if (i < columns.length - 1) {
         csvString.append(CSV_SEPARATOR_CHARACTER);
       }
 
@@ -332,21 +350,29 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
   }
 
   @SuppressWarnings("unchecked")
-  private Object[] toObject(JSONObject data) {
+  private Object[] toObject(JSONObject json) {
 
-    if (data == null) {
+    if (json == null) {
       return null;
     }
-    Column[] cols = schema.getColumnsArray();
-    Object[] object = new Object[cols.length];
-
-    Set<String> keys = data.keySet();
-    int i = 0;
-    for (String key : keys) {
-
-      Integer nameIndex = schema.getColumnNameIndex(key);
-      Object obj = data.get(key);
-      Column column = cols[nameIndex];
+    Column[] columns = schema.getColumnsArray();
+    Object[] object = new Object[columns.length];
+
+    Set<String> jsonKeyNames = json.keySet();
+    for (String name : jsonKeyNames) {
+      Integer nameIndex = schema.getColumnNameIndex(name);
+      Column column = columns[nameIndex];
+
+      Object obj = json.get(name);
+      // null is a possible value
+      if (obj == null && !column.isNullable()) {
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
+            column.getName() + " does not support null values");
+      }
+      if (obj == null) {
+        object[nameIndex] = null;
+        continue;
+      }
       switch (column.getType()) {
       case ARRAY:
       case SET:
@@ -387,9 +413,8 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
         break;
       default:
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
-            "Column type from schema was not recognized for " + cols[i].getType());
+            "Column type from schema was not recognized for " + column.getType());
       }
-
     }
     return object;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java
index 86c71b6..72d9d87 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java
@@ -29,8 +29,6 @@ public enum JSONIntermediateDataFormatError implements ErrorCode {
 
   JSON_INTERMEDIATE_DATA_FORMAT_0002("JSON object parse error."),
 
-  JSON_INTERMEDIATE_DATA_FORMAT_0003("Missing key in the JSON object."),
-
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
index 816cc71..5c7b444 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
@@ -19,12 +19,16 @@
 package org.apache.sqoop.connector.idf;
 
 import static org.apache.sqoop.connector.common.SqoopAvroUtils.createEnumSchema;
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.NULL_VALUE;
 import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
+import static org.testng.Assert.assertNull;
 import static org.testng.AssertJUnit.assertEquals;
 
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
+import org.apache.commons.lang.StringUtils;
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.common.SqoopAvroUtils;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.Array;
@@ -58,7 +62,8 @@ public class TestAVROIntermediateDataFormat {
   private final static String csvTime = "'12:59:59'";
   private Column enumCol;
   // no time zone
-  private final static LocalDateTime dateTime = new org.joda.time.LocalDateTime(2014, 10, 01, 12, 0, 0);
+  private final static LocalDateTime dateTime = new org.joda.time.LocalDateTime(2014, 10, 01, 12,
+      0, 0);
   private final static org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59);
   private final static org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
 
@@ -73,15 +78,23 @@ public class TestAVROIntermediateDataFormat {
     options.add("ENUM");
     options.add("NUME");
     enumCol = new org.apache.sqoop.schema.type.Enum("seven").setOptions(options);
-    sqoopSchema.addColumn(new FixedPoint("one")).addColumn(new FixedPoint("two", 2L, false)).addColumn(new Text("three"))
-        .addColumn(new Text("four")).addColumn(new Binary("five")).addColumn(new Text("six")).addColumn(enumCol)
+    sqoopSchema
+        .addColumn(new FixedPoint("one"))
+        .addColumn(new FixedPoint("two", 2L, false))
+        .addColumn(new Text("three"))
+        .addColumn(new Text("four"))
+        .addColumn(new Binary("five"))
+        .addColumn(new Text("six"))
+        .addColumn(enumCol)
         .addColumn(new Array("eight", new Array("array", new FixedPoint("ft"))))
-        .addColumn(new org.apache.sqoop.schema.type.Map("nine", new Text("t1"), new Text("t2"))).addColumn(new Bit("ten"))
+        .addColumn(new org.apache.sqoop.schema.type.Map("nine", new Text("t1"), new Text("t2")))
+        .addColumn(new Bit("ten"))
         .addColumn(new org.apache.sqoop.schema.type.DateTime("eleven", true, false))
         .addColumn(new org.apache.sqoop.schema.type.Time("twelve", false))
         .addColumn(new org.apache.sqoop.schema.type.Date("thirteen"))
         .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("fourteen"))
-        .addColumn(new org.apache.sqoop.schema.type.Set("fifteen", new Array("set", new FixedPoint("ftw"))));
+        .addColumn(
+            new org.apache.sqoop.schema.type.Set("fifteen", new Array("set", new FixedPoint("ftw"))));
     dataFormat = new AVROIntermediateDataFormat(sqoopSchema);
     avroSchema = SqoopAvroUtils.createAvroSchema(sqoopSchema);
   }
@@ -92,9 +105,10 @@ public class TestAVROIntermediateDataFormat {
   @Test
   public void testInputAsCSVTextInAndDataOut() {
 
-    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
-        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
-        + ",13.44," + csvSet;
+    String csvText = "10,34,'54','random data',"
+        + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A)
+        + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + ","
+        + csvDate + ",13.44," + csvSet;
     dataFormat.setCSVTextData(csvText);
     GenericRecord avroObject = createAvroGenericRecord();
     assertEquals(avroObject.toString(), dataFormat.getData().toString());
@@ -102,9 +116,10 @@ public class TestAVROIntermediateDataFormat {
 
   @Test
   public void testInputAsCSVTextInAndObjectArrayOut() {
-    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
-        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
-        + ",13.44," + csvSet;
+    String csvText = "10,34,'54','random data',"
+        + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A)
+        + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + ","
+        + csvDate + ",13.44," + csvSet;
     dataFormat.setCSVTextData(csvText);
     assertEquals(dataFormat.getObjectData().length, 15);
     assertObjectArray();
@@ -158,9 +173,10 @@ public class TestAVROIntermediateDataFormat {
 
   @Test
   public void testInputAsCSVTextInCSVTextOut() {
-    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
-        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
-        + ",13.44," + csvSet;
+    String csvText = "10,34,'54','random data',"
+        + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A)
+        + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + ","
+        + csvDate + ",13.44," + csvSet;
     dataFormat.setCSVTextData(csvText);
     assertEquals(csvText, dataFormat.getCSVTextData());
   }
@@ -219,9 +235,10 @@ public class TestAVROIntermediateDataFormat {
   @Test
   public void testInputAsDataInAndCSVOut() {
 
-    String csvExpected = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
-        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
-        + ",13.44," + csvSet;
+    String csvExpected = "10,34,'54','random data',"
+        + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A)
+        + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + ","
+        + csvDate + ",13.44," + csvSet;
     dataFormat.setData(createAvroGenericRecord());
     assertEquals(csvExpected, dataFormat.getCSVTextData());
   }
@@ -295,7 +312,8 @@ public class TestAVROIntermediateDataFormat {
     Object[] out = createObjectArray();
     dataFormat.setObjectData(out);
     GenericRecord avroObject = createAvroGenericRecord();
-    // SQOOP-SQOOP-1975: direct object compare will fail unless we use the Avro complex types
+    // SQOOP-SQOOP-1975: direct object compare will fail unless we use the Avro
+    // complex types
     assertEquals(avroObject.toString(), dataFormat.getData().toString());
 
   }
@@ -304,9 +322,10 @@ public class TestAVROIntermediateDataFormat {
   public void testInputAsObjectArrayInAndCSVOut() {
     Object[] out = createObjectArray();
     dataFormat.setObjectData(out);
-    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
-        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
-        + ",13.44," + csvSet;
+    String csvText = "10,34,'54','random data',"
+        + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A)
+        + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + ","
+        + csvDate + ",13.44," + csvSet;
     assertEquals(csvText, dataFormat.getCSVTextData());
   }
 
@@ -316,4 +335,212 @@ public class TestAVROIntermediateDataFormat {
     dataFormat.setObjectData(out);
     assertObjectArray();
   }
+
+  // **************test cases for empty and null schema*******************
+  @Test(expectedExceptions = SqoopException.class)
+  public void testEmptySchema() {
+    String testData = "10,34,'54','random data',"
+        + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'\\n'";
+    // no coumns
+    Schema schema = new Schema("Test");
+    dataFormat = new AVROIntermediateDataFormat(schema);
+    dataFormat.setCSVTextData(testData);
+
+    @SuppressWarnings("unused")
+    Object[] out = dataFormat.getObjectData();
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNullSchema() {
+    dataFormat = new AVROIntermediateDataFormat(null);
+    dataFormat.getObjectData();
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndGetObjectData() {
+    dataFormat = new AVROIntermediateDataFormat();
+    dataFormat.getObjectData();
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndGetData() {
+    dataFormat = new AVROIntermediateDataFormat();
+    dataFormat.getData();
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndGetCSVData() {
+    dataFormat = new AVROIntermediateDataFormat();
+    dataFormat.getCSVTextData();
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndSetObjectData() {
+    dataFormat = new AVROIntermediateDataFormat();
+    dataFormat.setObjectData(null);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndSetData() {
+    dataFormat = new AVROIntermediateDataFormat();
+    dataFormat.setData(null);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndSetCSVData() {
+    dataFormat = new AVROIntermediateDataFormat();
+    dataFormat.setCSVTextData(null);
+  }
+
+  // **************test cases for null and empty input*******************
+
+  @Test
+  public void testNullInputAsCSVTextInObjectArrayOut() {
+
+    dataFormat.setCSVTextData(null);
+    Object[] out = dataFormat.getObjectData();
+    assertNull(out);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testEmptyInputAsCSVTextInObjectArrayOut() {
+    dataFormat.setCSVTextData("");
+    dataFormat.getObjectData();
+  }
+
+  @Test
+  public void testNullValueAsObjectArrayInAndCSVTextOut() {
+
+    Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null,
+        null, null };
+    dataFormat.setObjectData(in);
+
+    String csvText = dataFormat.getCSVTextData();
+    String[] textValues = csvText.split(",");
+    assertEquals(15, textValues.length);
+    for (String text : textValues) {
+      assertEquals(text, NULL_VALUE);
+    }
+  }
+
+  @Test
+  public void testNullValueAsObjectArrayInAndObjectArrayOut() {
+    Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null,
+        null, null };
+    dataFormat.setObjectData(in);
+
+    Object[] out = dataFormat.getObjectData();
+    assertEquals(15, out.length);
+    for (Object obj : out) {
+      assertEquals(obj, null);
+    }
+  }
+
+  @Test
+  public void testNullValueAsCSVTextInAndObjectArrayOut() {
+    String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL",
+        "NULL", "NULL", "NULL", "NULL", "NULL", "NULL" };
+    dataFormat.setCSVTextData(StringUtils.join(test, ","));
+    Object[] out = dataFormat.getObjectData();
+    assertEquals(15, out.length);
+    for (Object obj : out) {
+      assertEquals(obj, null);
+    }
+  }
+
+  @Test
+  public void testNullValueAsCSVTextInAndCSVTextOut() {
+
+    String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL",
+        "NULL", "NULL", "NULL", "NULL", "NULL", "NULL" };
+    dataFormat.setCSVTextData(StringUtils.join(test, ","));
+
+    String csvText = dataFormat.getCSVTextData();
+    String[] textValues = csvText.split(",");
+    assertEquals(15, textValues.length);
+    for (String text : textValues) {
+      assertEquals(text, NULL_VALUE);
+    }
+  }
+
+  @Test
+  public void testNullValueAsDataInAndCSVTextOut() {
+
+    GenericRecord avroObject = new GenericData.Record(avroSchema);
+    avroObject = setAvroRecordWithNulls();
+    dataFormat.setData(avroObject);
+
+    String csvText = dataFormat.getCSVTextData();
+    String[] textValues = csvText.split(",");
+    assertEquals(15, textValues.length);
+    for (String text : textValues) {
+      assertEquals(text, NULL_VALUE);
+    }
+  }
+
+  @Test
+  public void testNullValueAsDataInAndObjectArrayOut() {
+    GenericRecord avroObject = new GenericData.Record(avroSchema);
+    avroObject = setAvroRecordWithNulls();
+    dataFormat.setData(avroObject);
+
+    Object[] out = dataFormat.getObjectData();
+    assertEquals(15, out.length);
+    for (Object obj : out) {
+      assertEquals(obj, null);
+    }
+
+  }
+
+  private GenericRecord setAvroRecordWithNulls() {
+    GenericRecord avroObject = new GenericData.Record(avroSchema);
+    avroObject.put("one", null);
+    avroObject.put("two", null);
+    avroObject.put("three", null);
+    avroObject.put("four", null);
+    avroObject.put("five", null);
+    avroObject.put("six", null);
+    avroObject.put("seven", null);
+
+    avroObject.put("eight", null);
+    avroObject.put("nine", null);
+    avroObject.put("ten", null);
+
+    // expect dates as strings
+    avroObject.put("eleven", null);
+    avroObject.put("twelve", null);
+    avroObject.put("thirteen", null);
+    avroObject.put("fourteen", null);
+
+    avroObject.put("fifteen", null);
+    return avroObject;
+  }
+  @Test(expectedExceptions = SqoopException.class)
+  public void testSchemaNotNullableWithObjectArray() {
+    Schema overrideSchema = new Schema("Test").addColumn(new Text("t").setNullable(false));
+    AVROIntermediateDataFormat dataFormat = new AVROIntermediateDataFormat(overrideSchema);
+    Object[] out = new Object[1];
+    out[0] = null;
+    dataFormat.setObjectData(out);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testSchemaNotNullableWithCSV() {
+    Schema overrideSchema = new Schema("Test").addColumn(new Text("one").setNullable(false));
+    AVROIntermediateDataFormat dataFormat = new AVROIntermediateDataFormat(overrideSchema);
+    dataFormat.setCSVTextData(NULL_VALUE);
+  }
+
+  // no validation happens when the setAvro and getAvro is used
+  @Test
+  public void testSchemaNotNullableWithAvro() {
+    Schema overrideSchema = new Schema("Test").addColumn(new Text("one").setNullable(false));
+    AVROIntermediateDataFormat dataFormat = new AVROIntermediateDataFormat(overrideSchema);
+    org.apache.avro.Schema avroSchema = SqoopAvroUtils.createAvroSchema(overrideSchema);
+    GenericRecord avroObject = new GenericData.Record(avroSchema);
+    avroObject.put("one", null);
+    dataFormat.setData(avroObject);
+    dataFormat.getData();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index fca410f..861d34e 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -58,7 +58,7 @@ public class TestCSVIntermediateDataFormat {
   }
 
 
-  //**************test cases for null and empty input*******************
+  //**************test cases for null input*******************
 
   @Test
   public void testNullInputAsCSVTextInObjectArrayOut() {
@@ -1112,7 +1112,7 @@ public class TestCSVIntermediateDataFormat {
     dataFormat.setCSVTextData(testData);
     assertEquals(testData, dataFormat.getCSVTextData());
   }
-  //**************test cases for schema*******************
+  //**************test cases for null and empty schema*******************
   @Test(expectedExceptions=SqoopException.class)
   public void testEmptySchema() {
     String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
@@ -1128,14 +1128,52 @@ public class TestCSVIntermediateDataFormat {
   @Test(expectedExceptions = SqoopException.class)
   public void testNullSchema() {
     dataFormat = new CSVIntermediateDataFormat(null);
-    @SuppressWarnings("unused")
-    Object[] out = dataFormat.getObjectData();
+    dataFormat.getObjectData();
   }
 
   @Test(expectedExceptions = SqoopException.class)
-  public void testNotSettingSchema() {
+  public void testNotSettingSchemaAndGetObjectData() {
     dataFormat = new CSVIntermediateDataFormat();
-    @SuppressWarnings("unused")
-    Object[] out = dataFormat.getObjectData();
+    dataFormat.getObjectData();
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndGetData() {
+    dataFormat = new CSVIntermediateDataFormat();
+    dataFormat.getData();
+  }
+
+  //SQOOP-1936 to enable schema validation after we use compareTo
+  @Test
+  public void testNotSettingSchemaAndGetCSVData() {
+    dataFormat = new CSVIntermediateDataFormat();
+    dataFormat.getCSVTextData();
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndSetObjectData() {
+    dataFormat = new CSVIntermediateDataFormat();
+    dataFormat.setObjectData(null);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndSetData() {
+    dataFormat = new CSVIntermediateDataFormat();
+    dataFormat.setData(null);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndSetCSVData() {
+    dataFormat = new CSVIntermediateDataFormat();
+    dataFormat.setCSVTextData(null);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testSchemaNotNullable() {
+    dataFormat = new CSVIntermediateDataFormat();
+    dataFormat.setSchema(new Schema("Test").addColumn(new Text("t").setNullable(false)));
+    Object[] out = new Object[1];
+    out[0] = null;
+    dataFormat.setObjectData(out);
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2d54e26a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
index 44ea4fd..8a87c65 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
@@ -18,9 +18,13 @@
  */
 package org.apache.sqoop.connector.idf;
 
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.NULL_VALUE;
 import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
+import static org.testng.Assert.assertNull;
 import static org.testng.AssertJUnit.assertEquals;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.common.SqoopIDFUtils;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.Array;
@@ -55,15 +59,23 @@ public class TestJSONIntermediateDataFormat {
 
   private void createJSONIDF() {
     Schema schema = new Schema("test");
-    schema.addColumn(new FixedPoint("1")).addColumn(new FixedPoint("2", 2L, false)).addColumn(new Text("3"))
-        .addColumn(new Text("4")).addColumn(new Binary("5")).addColumn(new Text("6"))
+    schema
+        .addColumn(new FixedPoint("1"))
+        .addColumn(new FixedPoint("2", 2L, false))
+        .addColumn(new Text("3"))
+        .addColumn(new Text("4"))
+        .addColumn(new Binary("5"))
+        .addColumn(new Text("6"))
         .addColumn(new org.apache.sqoop.schema.type.Enum("7"))
         .addColumn(new Array("8", new Array("array", new FixedPoint("ft"))))
-        .addColumn(new org.apache.sqoop.schema.type.Map("9", new Text("t1"), new Text("t2"))).addColumn(new Bit("10"))
+        .addColumn(new org.apache.sqoop.schema.type.Map("9", new Text("t1"), new Text("t2")))
+        .addColumn(new Bit("10"))
         .addColumn(new org.apache.sqoop.schema.type.DateTime("11", true, false))
-        .addColumn(new org.apache.sqoop.schema.type.Time("12", false)).addColumn(new org.apache.sqoop.schema.type.Date("13"))
+        .addColumn(new org.apache.sqoop.schema.type.Time("12", false))
+        .addColumn(new org.apache.sqoop.schema.type.Date("13"))
         .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("14"))
-        .addColumn(new org.apache.sqoop.schema.type.Set("15", new Array("set", new FixedPoint("ftw"))));
+        .addColumn(
+            new org.apache.sqoop.schema.type.Set("15", new Array("set", new FixedPoint("ftw"))));
     dataFormat = new JSONIntermediateDataFormat(schema);
   }
 
@@ -73,9 +85,10 @@ public class TestJSONIntermediateDataFormat {
   @Test
   public void testInputAsCSVTextInAndDataOut() {
 
-    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
-        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
-        + csvSet;
+    String csvText = "10,34,'54','random data',"
+        + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A)
+        + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date
+        + ",13.44," + csvSet;
     dataFormat.setCSVTextData(csvText);
     String jsonExpected = "{\"15\":[[11,12],[14,15]],\"13\":\"2014-10-01\",\"14\":13.44,\"11\":\"2014-10-01 12:00:00.000\","
         + "\"12\":\"12:59:59\",\"3\":\"54\",\"2\":34,\"1\":10,\"10\":true,\"7\":\"ENUM\",\"6\":\"10\",\"5\":\"kDY=\",\"4\":\"random data\","
@@ -85,9 +98,10 @@ public class TestJSONIntermediateDataFormat {
 
   @Test
   public void testInputAsCSVTextInAndObjectArrayOut() {
-    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
-        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
-        + csvSet;
+    String csvText = "10,34,'54','random data',"
+        + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A)
+        + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date
+        + ",13.44," + csvSet;
     dataFormat.setCSVTextData(csvText);
     assertEquals(dataFormat.getObjectData().length, 15);
     assertObjectArray();
@@ -145,9 +159,10 @@ public class TestJSONIntermediateDataFormat {
 
   @Test
   public void testInputAsCSVTextInCSVTextOut() {
-    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
-        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
-        + csvSet;
+    String csvText = "10,34,'54','random data',"
+        + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A)
+        + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date
+        + ",13.44," + csvSet;
     dataFormat.setCSVTextData(csvText);
     assertEquals(csvText, dataFormat.getCSVTextData());
   }
@@ -159,7 +174,10 @@ public class TestJSONIntermediateDataFormat {
     json.put("2", 34);
     json.put("3", "54");
     json.put("4", "random data");
-    json.put("5", org.apache.commons.codec.binary.Base64.encodeBase64String(new byte[] { (byte) -112, (byte) 54 }));
+    json.put(
+        "5",
+        org.apache.commons.codec.binary.Base64.encodeBase64String(new byte[] { (byte) -112,
+            (byte) 54 }));
     json.put("6", String.valueOf(0x0A));
     json.put("7", "ENUM");
     JSONArray givenArrayOne = new JSONArray();
@@ -206,9 +224,10 @@ public class TestJSONIntermediateDataFormat {
   @Test
   public void testInputAsDataInAndCSVOut() {
 
-    String csvExpected = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
-        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
-        + csvSet;
+    String csvExpected = "10,34,'54','random data',"
+        + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A)
+        + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date
+        + ",13.44," + csvSet;
     dataFormat.setData(createJSONObject());
     assertEquals(csvExpected, dataFormat.getCSVTextData());
   }
@@ -294,9 +313,10 @@ public class TestJSONIntermediateDataFormat {
   public void testInputAsObjectArrayInAndCSVOut() {
     Object[] out = createObjectArray();
     dataFormat.setObjectData(out);
-    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
-        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
-        + csvSet;
+    String csvText = "10,34,'54','random data',"
+        + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'" + String.valueOf(0x0A)
+        + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date
+        + ",13.44," + csvSet;
     assertEquals(csvText, dataFormat.getCSVTextData());
   }
 
@@ -306,4 +326,196 @@ public class TestJSONIntermediateDataFormat {
     dataFormat.setObjectData(out);
     assertObjectArray();
   }
+
+  // **************test cases for empty and null schema*******************
+  @Test(expectedExceptions = SqoopException.class)
+  public void testEmptySchema() {
+    String testData = "10,34,'54','random data',"
+        + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'\\n'";
+    // no coumns
+    Schema schema = new Schema("Test");
+    dataFormat = new JSONIntermediateDataFormat(schema);
+    dataFormat.setCSVTextData(testData);
+
+    @SuppressWarnings("unused")
+    Object[] out = dataFormat.getObjectData();
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNullSchema() {
+    dataFormat = new JSONIntermediateDataFormat(null);
+    dataFormat.getObjectData();
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndGetObjectData() {
+    dataFormat = new JSONIntermediateDataFormat();
+    dataFormat.getObjectData();
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndGetData() {
+    dataFormat = new JSONIntermediateDataFormat();
+    dataFormat.getData();
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndGetCSVData() {
+    dataFormat = new JSONIntermediateDataFormat();
+    dataFormat.getCSVTextData();
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndSetObjectData() {
+    dataFormat = new JSONIntermediateDataFormat();
+    dataFormat.setObjectData(null);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndSetData() {
+    dataFormat = new JSONIntermediateDataFormat();
+    dataFormat.setData(null);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testNotSettingSchemaAndSetCSVData() {
+    dataFormat = new JSONIntermediateDataFormat();
+    dataFormat.setCSVTextData(null);
+  }
+
+  // **************test cases for null input*******************
+
+  @Test
+  public void testNullInputAsCSVTextInObjectArrayOut() {
+
+    dataFormat.setCSVTextData(null);
+    Object[] out = dataFormat.getObjectData();
+    assertNull(out);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testEmptyInputAsCSVTextInObjectArrayOut() {
+    dataFormat.setCSVTextData("");
+    dataFormat.getObjectData();
+  }
+
+  @Test
+  public void testNullValueAsObjectArrayInAndCSVTextOut() {
+
+    Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null,
+        null, null };
+    dataFormat.setObjectData(in);
+
+    String csvText = dataFormat.getCSVTextData();
+    String[] textValues = csvText.split(",");
+    assertEquals(15, textValues.length);
+    for (String text : textValues) {
+      assertEquals(text, NULL_VALUE);
+    }
+  }
+
+  @Test
+  public void testNullValueAsObjectArrayInAndObjectArrayOut() {
+    Object[] in = { null, null, null, null, null, null, null, null, null, null, null, null, null,
+        null, null };
+    dataFormat.setObjectData(in);
+
+    Object[] out = dataFormat.getObjectData();
+    assertEquals(15, out.length);
+    for (Object obj : out) {
+      assertEquals(obj, null);
+    }
+  }
+
+  @Test
+  public void testNullValueAsCSVTextInAndObjectArrayOut() {
+    String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL",
+        "NULL", "NULL", "NULL", "NULL", "NULL", "NULL" };
+    dataFormat.setCSVTextData(StringUtils.join(test, ","));
+    Object[] out = dataFormat.getObjectData();
+    assertEquals(15, out.length);
+    for (Object obj : out) {
+      assertEquals(obj, null);
+    }
+  }
+
+  @Test
+  public void testNullValueAsCSVTextInAndCSVTextOut() {
+
+    String[] test = { "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL", "NULL",
+        "NULL", "NULL", "NULL", "NULL", "NULL", "NULL" };
+    dataFormat.setCSVTextData(StringUtils.join(test, ","));
+
+    String csvText = dataFormat.getCSVTextData();
+    String[] textValues = csvText.split(",");
+    assertEquals(15, textValues.length);
+    for (String text : textValues) {
+      assertEquals(text, NULL_VALUE);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNullValueAsDataInAndCSVTextOut() {
+
+    JSONObject json = new JSONObject();
+    for (int i = 1; i <= 15; i++) {
+      json.put(String.valueOf(i), null);
+    }
+    dataFormat.setData(json);
+
+    String csvText = dataFormat.getCSVTextData();
+    String[] textValues = csvText.split(",");
+    assertEquals(15, textValues.length);
+    for (String text : textValues) {
+      assertEquals(text, NULL_VALUE);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNullValueAsDataInAndObjectArrayOut() {
+
+    JSONObject json = new JSONObject();
+    for (int i = 1; i <= 15; i++) {
+      json.put(String.valueOf(i), null);
+    }
+    dataFormat.setData(json);
+
+    Object[] out = dataFormat.getObjectData();
+    assertEquals(15, out.length);
+    for (Object obj : out) {
+      assertEquals(obj, null);
+    }
+
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testSchemaNotNullableWithObjectArray() {
+    dataFormat = new JSONIntermediateDataFormat();
+    dataFormat.setSchema(new Schema("Test").addColumn(new Text("t").setNullable(false)));
+    Object[] out = new Object[1];
+    out[0] = null;
+    dataFormat.setObjectData(out);
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void testSchemaNotNullableWithCSV() {
+    dataFormat = new JSONIntermediateDataFormat();
+    dataFormat.setSchema(new Schema("Test").addColumn(new Text("t").setNullable(false)));
+    dataFormat.setCSVTextData(NULL_VALUE);
+  }
+
+  @SuppressWarnings("unchecked")
+  // no validation happens when the setJSON and getJSON is used
+  @Test
+  public void testSchemaNotNullableWithJSON() {
+    dataFormat = new JSONIntermediateDataFormat();
+    dataFormat.setSchema(new Schema("Test").addColumn(new Text("t").setNullable(false)));
+    JSONObject json = new JSONObject();
+    json.put("test", null);
+    dataFormat.setData(json);
+    dataFormat.getData();
+  }
+
 }