You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2016/02/02 16:55:21 UTC

sqoop git commit: SQOOP-2808: Sqoop2: Integration tests should test rows with null values

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 118aa7c4f -> b71f55e4e


SQOOP-2808: Sqoop2: Integration tests should test rows with null values

(Abraham Fine via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b71f55e4
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b71f55e4
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b71f55e4

Branch: refs/heads/sqoop2
Commit: b71f55e4edfc4d3f07067957c3cdfdd3e426e312
Parents: 118aa7c
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Feb 2 07:54:33 2016 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Feb 2 07:54:33 2016 -0800

----------------------------------------------------------------------
 .../common/test/asserts/ProviderAsserts.java    |   9 +-
 .../connector/jdbc/GenericJdbcExecutor.java     |  59 ++---
 .../sqoop/connector/hdfs/HdfsExtractor.java     |   7 +-
 .../apache/sqoop/connector/hdfs/HdfsLoader.java |   6 +-
 .../sqoop/connector/hdfs/TestExtractor.java     |   8 +-
 .../apache/sqoop/connector/hdfs/TestLoader.java |   2 +-
 .../sqoop/connector/common/SqoopIDFUtils.java   |  18 +-
 .../idf/AVROIntermediateDataFormat.java         |   6 +-
 .../idf/JSONIntermediateDataFormat.java         |   6 +-
 .../idf/TestAVROIntermediateDataFormat.java     |  10 +-
 .../idf/TestCSVIntermediateDataFormat.java      |   4 +-
 .../idf/TestJSONIntermediateDataFormat.java     |  10 +-
 .../connector/hdfs/NullValueTest.java           | 240 +++++++++++++++++++
 13 files changed, 319 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
----------------------------------------------------------------------
diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
index 4e1ef6a..ae1b60d 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
@@ -27,6 +27,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.fail;
 
 /**
@@ -55,8 +56,12 @@ public class ProviderAsserts {
       int i = 1;
       for(Object expectedValue : values) {
         Object actualValue = rs.getObject(i);
-        assertEquals(expectedValue.toString(), actualValue.toString(),
-                "Columns do not match on position: " + i);
+        if (expectedValue == null) {
+          assertNull(actualValue);
+        } else {
+          assertEquals(expectedValue.toString(), actualValue.toString(),
+            "Columns do not match on position: " + i);
+        }
         i++;
       }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
index 7c943c2..134b826 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
@@ -38,6 +38,7 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
+import java.sql.Types;
 import java.util.AbstractMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -308,34 +309,38 @@ public class GenericJdbcExecutor implements AutoCloseable {
     try {
       Column[] schemaColumns = schema.getColumnsArray();
       for (int i = 0; i < array.length; i++) {
-        Column schemaColumn = schemaColumns[i];
-        switch (schemaColumn.getType()) {
-        case DATE:
-          // convert the JODA date to sql date
-          LocalDate date = (LocalDate) array[i];
-          java.sql.Date sqlDate = new java.sql.Date(date.toDateTimeAtCurrentTime().getMillis());
-          preparedStatement.setObject(i + 1, sqlDate);
-          break;
-        case DATE_TIME:
-          // convert the JODA date time to sql date
-          DateTime dateTime = null;
-          if (array[i] instanceof org.joda.time.LocalDateTime) {
-            dateTime = ((org.joda.time.LocalDateTime) array[i]).toDateTime();
-          } else {
-            dateTime = (DateTime) array[i];
+        if (array[i] == null) {
+          preparedStatement.setObject(i + 1, null);
+        } else {
+          Column schemaColumn = schemaColumns[i];
+          switch (schemaColumn.getType()) {
+            case DATE:
+              // convert the JODA date to sql date
+              LocalDate date = (LocalDate) array[i];
+              java.sql.Date sqlDate = new java.sql.Date(date.toDateTimeAtCurrentTime().getMillis());
+              preparedStatement.setObject(i + 1, sqlDate);
+              break;
+            case DATE_TIME:
+              // convert the JODA date time to sql date
+              DateTime dateTime = null;
+              if (array[i] instanceof org.joda.time.LocalDateTime) {
+                dateTime = ((org.joda.time.LocalDateTime) array[i]).toDateTime();
+              } else {
+                dateTime = (DateTime) array[i];
+              }
+              Timestamp timestamp = new Timestamp(dateTime.getMillis());
+              preparedStatement.setObject(i + 1, timestamp);
+              break;
+            case TIME:
+              LocalTime time = (LocalTime) array[i];
+              // convert the JODA time to sql date
+              java.sql.Time sqlTime = new java.sql.Time(time.toDateTimeToday().getMillis());
+              preparedStatement.setObject(i + 1, sqlTime);
+              break;
+            default:
+              // for anything else
+              preparedStatement.setObject(i + 1, array[i]);
           }
-          Timestamp timestamp = new Timestamp(dateTime.getMillis());
-          preparedStatement.setObject(i + 1, timestamp);
-          break;
-        case TIME:
-          // convert the JODA time to sql date
-          LocalTime time = (LocalTime) array[i];
-          java.sql.Time sqlTime = new java.sql.Time(time.toDateTimeToday().getMillis());
-          preparedStatement.setObject(i + 1, sqlTime);
-          break;
-        default:
-          // for anything else
-          preparedStatement.setObject(i + 1, array[i]);
         }
       }
       preparedStatement.addBatch();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index b430739..9ef2a05 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -210,12 +210,11 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
   private void extractRow(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, Text line) throws UnsupportedEncodingException {
     if (schema instanceof ByteArraySchema) {
       dataWriter.writeArrayRecord(new Object[] {line.toString().getBytes(SqoopIDFUtils.BYTE_FIELD_CHARSET)});
-    } else if (!HdfsUtils.hasCustomFormat(linkConfiguration,
-      fromJobConfiguration)) {
+    } else if (!HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
       dataWriter.writeStringRecord(line.toString());
     } else {
-      Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema);
-      dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, data));
+      Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema, fromJobConfiguration.fromJobConfig.nullValue);
+      dataWriter.writeArrayRecord(data);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index 774221a..5de20c6 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -103,12 +103,8 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
         }
       } else {
         Object[] record;
-
         while ((record = reader.readArrayRecord()) != null) {
-          filewriter.write(
-            SqoopIDFUtils.toCSV(
-              HdfsUtils.formatRecord(linkConfiguration, toJobConfig, record),
-              context.getSchema()));
+          filewriter.write(SqoopIDFUtils.toCSV(record, context.getSchema(), toJobConfig.toJobConfig.nullValue));
           rowsWritten++;
         }
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
index 7d2177f..4e1d5c7 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
@@ -84,11 +84,11 @@ public class TestExtractor extends TestHdfsBase {
     FileUtils.mkdirs(inputDirectory);
     switch (this.outputFileType) {
       case TEXT_FILE:
-        createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\\\N");
+        createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N");
         break;
 
       case SEQUENCE_FILE:
-        createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\\\N");
+        createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N");
         break;
     }
   }
@@ -131,7 +131,7 @@ public class TestExtractor extends TestHdfsBase {
         Assert.assertEquals(String.valueOf((double) index), components[1]);
         Assert.assertEquals("NULL", components[2]);
         Assert.assertEquals("'" + index + "'", components[3]);
-        Assert.assertEquals("\\\\N", components[4]);
+        Assert.assertEquals("\\N", components[4]);
 
         assertTestUser(TEST_USER);
 
@@ -180,7 +180,7 @@ public class TestExtractor extends TestHdfsBase {
 
         Assert.assertFalse(visited[index - 1]);
         Assert.assertEquals(String.valueOf((double) index), array[1].toString());
-        Assert.assertEquals(null, array[2]);
+        Assert.assertEquals("NULL", array[2]);
         Assert.assertEquals(String.valueOf(index), array[3]);
         Assert.assertNull(array[4]);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
index 11fcef2..adede3a 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
@@ -203,7 +203,7 @@ public class TestLoader extends TestHdfsBase {
     Assert.assertEquals(1, fs.listStatus(outputPath).length);
 
     for (FileStatus status : fs.listStatus(outputPath)) {
-      verifyOutput(fs, status.getPath(), "%d,%f,'\\\\N',%s");
+      verifyOutput(fs, status.getPath(), "%d,%f,\\N,%s");
     }
 
     loader.load(context, linkConf, jobConf);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
index 9b0885a..fc25100 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
@@ -63,7 +63,7 @@ import java.util.Collections;
 @edu.umd.cs.findbugs.annotations.SuppressWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS")
 public class SqoopIDFUtils {
 
-  public static final String NULL_VALUE = "NULL";
+  public static final String DEFAULT_NULL_VALUE = "NULL";
 
   // ISO-8859-1 is an 8-bit codec that is supported in every java
   // implementation.
@@ -578,8 +578,12 @@ public class SqoopIDFUtils {
    *
    * @param objectArray
    */
-  @SuppressWarnings("unchecked")
   public static String toCSV(Object[] objectArray, Schema schema) {
+    return toCSV(objectArray, schema, DEFAULT_NULL_VALUE);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static String toCSV(Object[] objectArray, Schema schema, String nullValue) {
     Column[] columns = schema.getColumnsArray();
 
     StringBuilder csvString = new StringBuilder();
@@ -589,7 +593,7 @@ public class SqoopIDFUtils {
             columns[i].getName() + " does not support null values");
       }
       if (objectArray[i] == null) {
-        csvString.append(NULL_VALUE);
+        csvString.append(nullValue);
       } else {
         switch (columns[i].getType()) {
           case ARRAY:
@@ -756,6 +760,10 @@ public class SqoopIDFUtils {
    * @return Object[]
    */
   public static Object[] fromCSV(String csvText, Schema schema) {
+    return fromCSV(csvText, schema, DEFAULT_NULL_VALUE);
+  }
+
+  public static Object[] fromCSV(String csvText, Schema schema, String nullValue){
     String[] csvArray = parseCSVString(csvText);
 
     if (csvArray == null) {
@@ -771,11 +779,11 @@ public class SqoopIDFUtils {
 
     Object[] objectArray = new Object[csvArray.length];
     for (int i = 0; i < csvArray.length; i++) {
-      if (csvArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
+      if (csvArray[i].equals(nullValue) && !columns[i].isNullable()) {
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
             columns[i].getName() + " does not support null values");
       }
-      if (csvArray[i].equals(NULL_VALUE)) {
+      if (csvArray[i].equals(nullValue)) {
         objectArray[i] = null;
         continue;
       }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
index d78fa8b..ace1bdf 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
@@ -162,11 +162,11 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
     }
     GenericRecord avroObject = new GenericData.Record(avroSchema);
     for (int i = 0; i < csvStringArray.length; i++) {
-      if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
+      if (csvStringArray[i].equals(DEFAULT_NULL_VALUE) && !columns[i].isNullable()) {
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
             columns[i].getName() + " does not support null values");
       }
-      if (csvStringArray[i].equals(NULL_VALUE)) {
+      if (csvStringArray[i].equals(DEFAULT_NULL_VALUE)) {
         avroObject.put(columns[i].getName(), null);
         continue;
       }
@@ -323,7 +323,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
             columns[i].getName() + " does not support null values");
       }
       if (obj == null) {
-        csvString.append(NULL_VALUE);
+        csvString.append(DEFAULT_NULL_VALUE);
       } else {
 
         switch (columns[i].getType()) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
index 8db4d3d..078b598 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
@@ -146,12 +146,12 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
     }
     JSONObject object = new JSONObject();
     for (int i = 0; i < csvStringArray.length; i++) {
-      if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
+      if (csvStringArray[i].equals(DEFAULT_NULL_VALUE) && !columns[i].isNullable()) {
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
             columns[i].getName() + " does not support null values");
       }
       // check for NULL field and bail out immediately
-      if (csvStringArray[i].equals(NULL_VALUE)) {
+      if (csvStringArray[i].equals(DEFAULT_NULL_VALUE)) {
         object.put(columns[i].getName(), null);
         continue;
       }
@@ -299,7 +299,7 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
             columns[i].getName() + " does not support null values");
       }
       if (obj == null) {
-        csvString.append(NULL_VALUE);
+        csvString.append(DEFAULT_NULL_VALUE);
       } else {
         switch (columns[i].getType()) {
         case ARRAY:

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
index 703ed0a..8475720 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
@@ -19,7 +19,7 @@
 package org.apache.sqoop.connector.idf;
 
 import static org.apache.sqoop.connector.common.SqoopAvroUtils.createEnumSchema;
-import static org.apache.sqoop.connector.common.SqoopIDFUtils.NULL_VALUE;
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.DEFAULT_NULL_VALUE;
 import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertEquals;
@@ -419,7 +419,7 @@ public class TestAVROIntermediateDataFormat {
     String[] textValues = csvText.split(",");
     assertEquals(15, textValues.length);
     for (String text : textValues) {
-      assertEquals(text, NULL_VALUE);
+      assertEquals(text, DEFAULT_NULL_VALUE);
     }
   }
 
@@ -459,7 +459,7 @@ public class TestAVROIntermediateDataFormat {
     String[] textValues = csvText.split(",");
     assertEquals(15, textValues.length);
     for (String text : textValues) {
-      assertEquals(text, NULL_VALUE);
+      assertEquals(text, DEFAULT_NULL_VALUE);
     }
   }
 
@@ -474,7 +474,7 @@ public class TestAVROIntermediateDataFormat {
     String[] textValues = csvText.split(",");
     assertEquals(15, textValues.length);
     for (String text : textValues) {
-      assertEquals(text, NULL_VALUE);
+      assertEquals(text, DEFAULT_NULL_VALUE);
     }
   }
 
@@ -528,7 +528,7 @@ public class TestAVROIntermediateDataFormat {
   public void testSchemaNotNullableWithCSV() {
     Schema overrideSchema = new Schema("Test").addColumn(new Text("one").setNullable(false));
     AVROIntermediateDataFormat dataFormat = new AVROIntermediateDataFormat(overrideSchema);
-    dataFormat.setCSVTextData(NULL_VALUE);
+    dataFormat.setCSVTextData(DEFAULT_NULL_VALUE);
   }
 
   // no validation happens when the setAvro and getAvro is used

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index 040dbfc..e82d817 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -106,7 +106,7 @@ public class TestCSVIntermediateDataFormat {
     String[] textValues = csvText.split(",");
     assertEquals(14, textValues.length);
     for (String text : textValues) {
-      assertEquals(text, NULL_VALUE);
+      assertEquals(text, DEFAULT_NULL_VALUE);
     }
   }
 
@@ -176,7 +176,7 @@ public class TestCSVIntermediateDataFormat {
     String csvText = dataFormat.getCSVTextData();
     String[] textValues = csvText.split(",");
     for (String text : textValues) {
-      assertEquals(text, NULL_VALUE);
+      assertEquals(text, DEFAULT_NULL_VALUE);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
index bcc1f95..09c4a11 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
@@ -18,7 +18,7 @@
  */
 package org.apache.sqoop.connector.idf;
 
-import static org.apache.sqoop.connector.common.SqoopIDFUtils.NULL_VALUE;
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.DEFAULT_NULL_VALUE;
 import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertEquals;
@@ -414,7 +414,7 @@ public class TestJSONIntermediateDataFormat {
     String[] textValues = csvText.split(",");
     assertEquals(15, textValues.length);
     for (String text : textValues) {
-      assertEquals(text, NULL_VALUE);
+      assertEquals(text, DEFAULT_NULL_VALUE);
     }
   }
 
@@ -454,7 +454,7 @@ public class TestJSONIntermediateDataFormat {
     String[] textValues = csvText.split(",");
     assertEquals(15, textValues.length);
     for (String text : textValues) {
-      assertEquals(text, NULL_VALUE);
+      assertEquals(text, DEFAULT_NULL_VALUE);
     }
   }
 
@@ -472,7 +472,7 @@ public class TestJSONIntermediateDataFormat {
     String[] textValues = csvText.split(",");
     assertEquals(15, textValues.length);
     for (String text : textValues) {
-      assertEquals(text, NULL_VALUE);
+      assertEquals(text, DEFAULT_NULL_VALUE);
     }
   }
 
@@ -507,7 +507,7 @@ public class TestJSONIntermediateDataFormat {
   public void testSchemaNotNullableWithCSV() {
     dataFormat = new JSONIntermediateDataFormat();
     dataFormat.setSchema(new Schema("Test").addColumn(new Text("t").setNullable(false)));
-    dataFormat.setCSVTextData(NULL_VALUE);
+    dataFormat.setCSVTextData(DEFAULT_NULL_VALUE);
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java
new file mode 100644
index 0000000..4363814
--- /dev/null
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.hdfs;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multiset;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.common.SqoopIDFUtils;
+import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
+import org.apache.sqoop.model.MDriverConfig;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.test.asserts.HdfsAsserts;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
+import org.apache.sqoop.test.utils.HdfsUtils;
+import org.apache.sqoop.test.utils.ParametrizedUtils;
+import org.testng.Assert;
+import org.testng.ITestContext;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class NullValueTest extends SqoopTestCase {
+
+  private static final Logger LOG = Logger.getLogger(NullValueTest.class);
+
+
+  private ToFormat format;
+
+  // The custom nullValue to use (set to null if default)
+  private String nullValue;
+
+  @DataProvider(name="nul-value-test")
+  public static Object[][] data(ITestContext context) {
+    String customNullValue = "^&*custom!@";
+
+    return Iterables.toArray(
+      ParametrizedUtils.crossProduct(ToFormat.values(), new String[]{SqoopIDFUtils.DEFAULT_NULL_VALUE, customNullValue}),
+      Object[].class);
+  }
+
+  @Factory(dataProvider="nul-value-test")
+  public NullValueTest(ToFormat format, String nullValue) {
+    this.format = format;
+    this.nullValue = nullValue;
+  }
+
+  @Override
+  public String getTestName() {
+    return methodName + "[" + format.name() + ", " + nullValue + "]";
+  }
+
+  @BeforeMethod
+  public void setup() throws Exception {
+    createTableCities();
+  }
+
+  @AfterMethod()
+  public void dropTable() {
+    super.dropTable();
+  }
+
+  private boolean usingCustomNullValue() {
+    return nullValue != SqoopIDFUtils.DEFAULT_NULL_VALUE;
+  }
+
+  private String[] getCsv() {
+    return new String[] {
+      "1,'USA'," + nullValue + ",'San Francisco'",
+      "2,'USA','2004-10-24 00:00:00.000'," + nullValue,
+      "3," + nullValue + ",'2004-10-25 00:00:00.000','Brno'",
+      "4,'USA','2004-10-26 00:00:00.000','Palo Alto'"
+    };
+  }
+
+  @Test
+  public void testFromHdfs() throws Exception {
+    switch (format) {
+      case TEXT_FILE:
+        createFromFile("input-0001", getCsv());
+
+        break;
+      case SEQUENCE_FILE:
+        SequenceFile.Writer.Option optPath =
+          SequenceFile.Writer.file(new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001")));
+        SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(Text.class);
+        SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(NullWritable.class);
+
+
+        SequenceFile.Writer sequenceFileWriter =
+          SequenceFile.createWriter(getHadoopConf(), optPath, optKey, optVal);
+        for (String csv : getCsv()) {
+          sequenceFileWriter.append(new Text(csv), NullWritable.get());
+        }
+        sequenceFileWriter.close();
+        break;
+      default:
+        Assert.fail();
+    }
+
+    MLink hdfsLinkFrom = getClient().createLink("hdfs-connector");
+    fillHdfsLink(hdfsLinkFrom);
+    saveLink(hdfsLinkFrom);
+
+    MLink rdbmsLinkTo = getClient().createLink("generic-jdbc-connector");
+    fillRdbmsLinkConfig(rdbmsLinkTo);
+    saveLink(rdbmsLinkTo);
+
+    MJob job = getClient().createJob(hdfsLinkFrom.getName(), rdbmsLinkTo.getName());
+
+    fillHdfsFromConfig(job);
+    fillRdbmsToConfig(job);
+
+    if (usingCustomNullValue()) {
+      job.getFromJobConfig().getBooleanInput("fromJobConfig.overrideNullValue").setValue(true);
+      job.getFromJobConfig().getStringInput("fromJobConfig.nullValue").setValue(nullValue);
+    }
+
+
+    MDriverConfig driverConfig = job.getDriverConfig();
+    driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3);
+    saveJob(job);
+
+    executeJob(job);
+
+    Assert.assertEquals(4L, provider.rowCount(getTableName()));
+    assertRowInCities(1, "USA", null, "San Francisco");
+    assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null);
+    assertRowInCities(3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
+    assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
+  }
+
+  @Test
+  public void testToHdfs() throws Exception {
+    provider.insertRow(getTableName(), 1, "USA", (java.sql.Timestamp) null, "San Francisco");
+    provider.insertRow(getTableName(), 2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null);
+    provider.insertRow(getTableName(), 3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
+    provider.insertRow(getTableName(), 4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
+
+    MLink rdbmsLinkFrom = getClient().createLink("generic-jdbc-connector");
+    fillRdbmsLinkConfig(rdbmsLinkFrom);
+    saveLink(rdbmsLinkFrom);
+
+
+    MLink hdfsLinkTo = getClient().createLink("hdfs-connector");
+    fillHdfsLink(hdfsLinkTo);
+    saveLink(hdfsLinkTo);
+
+    MJob job = getClient().createJob(rdbmsLinkFrom.getName(), hdfsLinkTo.getName());
+
+    fillRdbmsFromConfig(job, "id");
+    fillHdfsToConfig(job, format);
+
+    if (usingCustomNullValue()) {
+      job.getToJobConfig().getBooleanInput("toJobConfig.overrideNullValue").setValue(true);
+      job.getToJobConfig().getStringInput("toJobConfig.nullValue").setValue(nullValue);
+    }
+
+    hdfsClient.mkdirs(new Path(HdfsUtils.joinPathFragments
+      (getMapreduceDirectory(), "TO")));
+
+    job.getToJobConfig().getStringInput("toJobConfig.outputDirectory")
+      .setValue(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"));
+
+
+    MDriverConfig driverConfig = job.getDriverConfig();
+    driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3);
+    saveJob(job);
+
+    executeJob(job);
+
+    switch (format) {
+      case TEXT_FILE:
+        HdfsAsserts.assertMapreduceOutput(hdfsClient,
+          HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"), getCsv());
+        break;
+      case SEQUENCE_FILE:
+        Multiset<String> setLines = HashMultiset.create(Arrays.asList(getCsv()));
+        List<String> notFound = new ArrayList<>();
+        Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"));
+
+        for(Path file : files) {
+          SequenceFile.Reader.Option optPath = SequenceFile.Reader.file(file);
+          SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(getHadoopConf(), optPath);
+
+          Text text = new Text();
+          while (sequenceFileReader.next(text)) {
+            if (!setLines.remove(text.toString())) {
+              notFound.add(text.toString());
+            }
+          }
+        }
+        if(!setLines.isEmpty() || !notFound.isEmpty()) {
+          LOG.error("Output do not match expectations.");
+          LOG.error("Expected lines that weren't present in the files:");
+          LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'");
+          LOG.error("Extra lines in files that weren't expected:");
+          LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'");
+          Assert.fail("Output do not match expectations.");
+        }
+        break;
+      default:
+        Assert.fail();
+    }
+  }
+}