You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2016/02/02 16:55:21 UTC
sqoop git commit: SQOOP-2808: Sqoop2: Integration tests should test
rows with null values
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 118aa7c4f -> b71f55e4e
SQOOP-2808: Sqoop2: Integration tests should test rows with null values
(Abraham Fine via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b71f55e4
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b71f55e4
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b71f55e4
Branch: refs/heads/sqoop2
Commit: b71f55e4edfc4d3f07067957c3cdfdd3e426e312
Parents: 118aa7c
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Feb 2 07:54:33 2016 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Feb 2 07:54:33 2016 -0800
----------------------------------------------------------------------
.../common/test/asserts/ProviderAsserts.java | 9 +-
.../connector/jdbc/GenericJdbcExecutor.java | 59 ++---
.../sqoop/connector/hdfs/HdfsExtractor.java | 7 +-
.../apache/sqoop/connector/hdfs/HdfsLoader.java | 6 +-
.../sqoop/connector/hdfs/TestExtractor.java | 8 +-
.../apache/sqoop/connector/hdfs/TestLoader.java | 2 +-
.../sqoop/connector/common/SqoopIDFUtils.java | 18 +-
.../idf/AVROIntermediateDataFormat.java | 6 +-
.../idf/JSONIntermediateDataFormat.java | 6 +-
.../idf/TestAVROIntermediateDataFormat.java | 10 +-
.../idf/TestCSVIntermediateDataFormat.java | 4 +-
.../idf/TestJSONIntermediateDataFormat.java | 10 +-
.../connector/hdfs/NullValueTest.java | 240 +++++++++++++++++++
13 files changed, 319 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
----------------------------------------------------------------------
diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
index 4e1ef6a..ae1b60d 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
@@ -27,6 +27,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;
/**
@@ -55,8 +56,12 @@ public class ProviderAsserts {
int i = 1;
for(Object expectedValue : values) {
Object actualValue = rs.getObject(i);
- assertEquals(expectedValue.toString(), actualValue.toString(),
- "Columns do not match on position: " + i);
+ if (expectedValue == null) {
+ assertNull(actualValue);
+ } else {
+ assertEquals(expectedValue.toString(), actualValue.toString(),
+ "Columns do not match on position: " + i);
+ }
i++;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
index 7c943c2..134b826 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
@@ -38,6 +38,7 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
+import java.sql.Types;
import java.util.AbstractMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -308,34 +309,38 @@ public class GenericJdbcExecutor implements AutoCloseable {
try {
Column[] schemaColumns = schema.getColumnsArray();
for (int i = 0; i < array.length; i++) {
- Column schemaColumn = schemaColumns[i];
- switch (schemaColumn.getType()) {
- case DATE:
- // convert the JODA date to sql date
- LocalDate date = (LocalDate) array[i];
- java.sql.Date sqlDate = new java.sql.Date(date.toDateTimeAtCurrentTime().getMillis());
- preparedStatement.setObject(i + 1, sqlDate);
- break;
- case DATE_TIME:
- // convert the JODA date time to sql date
- DateTime dateTime = null;
- if (array[i] instanceof org.joda.time.LocalDateTime) {
- dateTime = ((org.joda.time.LocalDateTime) array[i]).toDateTime();
- } else {
- dateTime = (DateTime) array[i];
+ if (array[i] == null) {
+ preparedStatement.setObject(i + 1, null);
+ } else {
+ Column schemaColumn = schemaColumns[i];
+ switch (schemaColumn.getType()) {
+ case DATE:
+ // convert the JODA date to sql date
+ LocalDate date = (LocalDate) array[i];
+ java.sql.Date sqlDate = new java.sql.Date(date.toDateTimeAtCurrentTime().getMillis());
+ preparedStatement.setObject(i + 1, sqlDate);
+ break;
+ case DATE_TIME:
+ // convert the JODA date time to sql date
+ DateTime dateTime = null;
+ if (array[i] instanceof org.joda.time.LocalDateTime) {
+ dateTime = ((org.joda.time.LocalDateTime) array[i]).toDateTime();
+ } else {
+ dateTime = (DateTime) array[i];
+ }
+ Timestamp timestamp = new Timestamp(dateTime.getMillis());
+ preparedStatement.setObject(i + 1, timestamp);
+ break;
+ case TIME:
+ LocalTime time = (LocalTime) array[i];
+ // convert the JODA time to sql date
+ java.sql.Time sqlTime = new java.sql.Time(time.toDateTimeToday().getMillis());
+ preparedStatement.setObject(i + 1, sqlTime);
+ break;
+ default:
+ // for anything else
+ preparedStatement.setObject(i + 1, array[i]);
}
- Timestamp timestamp = new Timestamp(dateTime.getMillis());
- preparedStatement.setObject(i + 1, timestamp);
- break;
- case TIME:
- // convert the JODA time to sql date
- LocalTime time = (LocalTime) array[i];
- java.sql.Time sqlTime = new java.sql.Time(time.toDateTimeToday().getMillis());
- preparedStatement.setObject(i + 1, sqlTime);
- break;
- default:
- // for anything else
- preparedStatement.setObject(i + 1, array[i]);
}
}
preparedStatement.addBatch();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index b430739..9ef2a05 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -210,12 +210,11 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
private void extractRow(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, Text line) throws UnsupportedEncodingException {
if (schema instanceof ByteArraySchema) {
dataWriter.writeArrayRecord(new Object[] {line.toString().getBytes(SqoopIDFUtils.BYTE_FIELD_CHARSET)});
- } else if (!HdfsUtils.hasCustomFormat(linkConfiguration,
- fromJobConfiguration)) {
+ } else if (!HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
dataWriter.writeStringRecord(line.toString());
} else {
- Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema);
- dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, data));
+ Object[] data = SqoopIDFUtils.fromCSV(line.toString(), schema, fromJobConfiguration.fromJobConfig.nullValue);
+ dataWriter.writeArrayRecord(data);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index 774221a..5de20c6 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -103,12 +103,8 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
}
} else {
Object[] record;
-
while ((record = reader.readArrayRecord()) != null) {
- filewriter.write(
- SqoopIDFUtils.toCSV(
- HdfsUtils.formatRecord(linkConfiguration, toJobConfig, record),
- context.getSchema()));
+ filewriter.write(SqoopIDFUtils.toCSV(record, context.getSchema(), toJobConfig.toJobConfig.nullValue));
rowsWritten++;
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
index 7d2177f..4e1d5c7 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
@@ -84,11 +84,11 @@ public class TestExtractor extends TestHdfsBase {
FileUtils.mkdirs(inputDirectory);
switch (this.outputFileType) {
case TEXT_FILE:
- createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\\\N");
+ createTextInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N");
break;
case SEQUENCE_FILE:
- createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\\\N");
+ createSequenceInput(inputDirectory, this.compressionClass, NUMBER_OF_FILES, NUMBER_OF_ROWS_PER_FILE, "%d,%f,NULL,%s,\\N");
break;
}
}
@@ -131,7 +131,7 @@ public class TestExtractor extends TestHdfsBase {
Assert.assertEquals(String.valueOf((double) index), components[1]);
Assert.assertEquals("NULL", components[2]);
Assert.assertEquals("'" + index + "'", components[3]);
- Assert.assertEquals("\\\\N", components[4]);
+ Assert.assertEquals("\\N", components[4]);
assertTestUser(TEST_USER);
@@ -180,7 +180,7 @@ public class TestExtractor extends TestHdfsBase {
Assert.assertFalse(visited[index - 1]);
Assert.assertEquals(String.valueOf((double) index), array[1].toString());
- Assert.assertEquals(null, array[2]);
+ Assert.assertEquals("NULL", array[2]);
Assert.assertEquals(String.valueOf(index), array[3]);
Assert.assertNull(array[4]);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
index 11fcef2..adede3a 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
@@ -203,7 +203,7 @@ public class TestLoader extends TestHdfsBase {
Assert.assertEquals(1, fs.listStatus(outputPath).length);
for (FileStatus status : fs.listStatus(outputPath)) {
- verifyOutput(fs, status.getPath(), "%d,%f,'\\\\N',%s");
+ verifyOutput(fs, status.getPath(), "%d,%f,\\N,%s");
}
loader.load(context, linkConf, jobConf);
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
index 9b0885a..fc25100 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
@@ -63,7 +63,7 @@ import java.util.Collections;
@edu.umd.cs.findbugs.annotations.SuppressWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS")
public class SqoopIDFUtils {
- public static final String NULL_VALUE = "NULL";
+ public static final String DEFAULT_NULL_VALUE = "NULL";
// ISO-8859-1 is an 8-bit codec that is supported in every java
// implementation.
@@ -578,8 +578,12 @@ public class SqoopIDFUtils {
*
* @param objectArray
*/
- @SuppressWarnings("unchecked")
public static String toCSV(Object[] objectArray, Schema schema) {
+ return toCSV(objectArray, schema, DEFAULT_NULL_VALUE);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static String toCSV(Object[] objectArray, Schema schema, String nullValue) {
Column[] columns = schema.getColumnsArray();
StringBuilder csvString = new StringBuilder();
@@ -589,7 +593,7 @@ public class SqoopIDFUtils {
columns[i].getName() + " does not support null values");
}
if (objectArray[i] == null) {
- csvString.append(NULL_VALUE);
+ csvString.append(nullValue);
} else {
switch (columns[i].getType()) {
case ARRAY:
@@ -756,6 +760,10 @@ public class SqoopIDFUtils {
* @return Object[]
*/
public static Object[] fromCSV(String csvText, Schema schema) {
+ return fromCSV(csvText, schema, DEFAULT_NULL_VALUE);
+ }
+
+ public static Object[] fromCSV(String csvText, Schema schema, String nullValue){
String[] csvArray = parseCSVString(csvText);
if (csvArray == null) {
@@ -771,11 +779,11 @@ public class SqoopIDFUtils {
Object[] objectArray = new Object[csvArray.length];
for (int i = 0; i < csvArray.length; i++) {
- if (csvArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
+ if (csvArray[i].equals(nullValue) && !columns[i].isNullable()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
columns[i].getName() + " does not support null values");
}
- if (csvArray[i].equals(NULL_VALUE)) {
+ if (csvArray[i].equals(nullValue)) {
objectArray[i] = null;
continue;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
index d78fa8b..ace1bdf 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
@@ -162,11 +162,11 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
}
GenericRecord avroObject = new GenericData.Record(avroSchema);
for (int i = 0; i < csvStringArray.length; i++) {
- if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
+ if (csvStringArray[i].equals(DEFAULT_NULL_VALUE) && !columns[i].isNullable()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
columns[i].getName() + " does not support null values");
}
- if (csvStringArray[i].equals(NULL_VALUE)) {
+ if (csvStringArray[i].equals(DEFAULT_NULL_VALUE)) {
avroObject.put(columns[i].getName(), null);
continue;
}
@@ -323,7 +323,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
columns[i].getName() + " does not support null values");
}
if (obj == null) {
- csvString.append(NULL_VALUE);
+ csvString.append(DEFAULT_NULL_VALUE);
} else {
switch (columns[i].getType()) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
index 8db4d3d..078b598 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
@@ -146,12 +146,12 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
}
JSONObject object = new JSONObject();
for (int i = 0; i < csvStringArray.length; i++) {
- if (csvStringArray[i].equals(NULL_VALUE) && !columns[i].isNullable()) {
+ if (csvStringArray[i].equals(DEFAULT_NULL_VALUE) && !columns[i].isNullable()) {
throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
columns[i].getName() + " does not support null values");
}
// check for NULL field and bail out immediately
- if (csvStringArray[i].equals(NULL_VALUE)) {
+ if (csvStringArray[i].equals(DEFAULT_NULL_VALUE)) {
object.put(columns[i].getName(), null);
continue;
}
@@ -299,7 +299,7 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
columns[i].getName() + " does not support null values");
}
if (obj == null) {
- csvString.append(NULL_VALUE);
+ csvString.append(DEFAULT_NULL_VALUE);
} else {
switch (columns[i].getType()) {
case ARRAY:
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
index 703ed0a..8475720 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
@@ -19,7 +19,7 @@
package org.apache.sqoop.connector.idf;
import static org.apache.sqoop.connector.common.SqoopAvroUtils.createEnumSchema;
-import static org.apache.sqoop.connector.common.SqoopIDFUtils.NULL_VALUE;
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.DEFAULT_NULL_VALUE;
import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertEquals;
@@ -419,7 +419,7 @@ public class TestAVROIntermediateDataFormat {
String[] textValues = csvText.split(",");
assertEquals(15, textValues.length);
for (String text : textValues) {
- assertEquals(text, NULL_VALUE);
+ assertEquals(text, DEFAULT_NULL_VALUE);
}
}
@@ -459,7 +459,7 @@ public class TestAVROIntermediateDataFormat {
String[] textValues = csvText.split(",");
assertEquals(15, textValues.length);
for (String text : textValues) {
- assertEquals(text, NULL_VALUE);
+ assertEquals(text, DEFAULT_NULL_VALUE);
}
}
@@ -474,7 +474,7 @@ public class TestAVROIntermediateDataFormat {
String[] textValues = csvText.split(",");
assertEquals(15, textValues.length);
for (String text : textValues) {
- assertEquals(text, NULL_VALUE);
+ assertEquals(text, DEFAULT_NULL_VALUE);
}
}
@@ -528,7 +528,7 @@ public class TestAVROIntermediateDataFormat {
public void testSchemaNotNullableWithCSV() {
Schema overrideSchema = new Schema("Test").addColumn(new Text("one").setNullable(false));
AVROIntermediateDataFormat dataFormat = new AVROIntermediateDataFormat(overrideSchema);
- dataFormat.setCSVTextData(NULL_VALUE);
+ dataFormat.setCSVTextData(DEFAULT_NULL_VALUE);
}
// no validation happens when the setAvro and getAvro is used
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index 040dbfc..e82d817 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -106,7 +106,7 @@ public class TestCSVIntermediateDataFormat {
String[] textValues = csvText.split(",");
assertEquals(14, textValues.length);
for (String text : textValues) {
- assertEquals(text, NULL_VALUE);
+ assertEquals(text, DEFAULT_NULL_VALUE);
}
}
@@ -176,7 +176,7 @@ public class TestCSVIntermediateDataFormat {
String csvText = dataFormat.getCSVTextData();
String[] textValues = csvText.split(",");
for (String text : textValues) {
- assertEquals(text, NULL_VALUE);
+ assertEquals(text, DEFAULT_NULL_VALUE);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
index bcc1f95..09c4a11 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
@@ -18,7 +18,7 @@
*/
package org.apache.sqoop.connector.idf;
-import static org.apache.sqoop.connector.common.SqoopIDFUtils.NULL_VALUE;
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.DEFAULT_NULL_VALUE;
import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertEquals;
@@ -414,7 +414,7 @@ public class TestJSONIntermediateDataFormat {
String[] textValues = csvText.split(",");
assertEquals(15, textValues.length);
for (String text : textValues) {
- assertEquals(text, NULL_VALUE);
+ assertEquals(text, DEFAULT_NULL_VALUE);
}
}
@@ -454,7 +454,7 @@ public class TestJSONIntermediateDataFormat {
String[] textValues = csvText.split(",");
assertEquals(15, textValues.length);
for (String text : textValues) {
- assertEquals(text, NULL_VALUE);
+ assertEquals(text, DEFAULT_NULL_VALUE);
}
}
@@ -472,7 +472,7 @@ public class TestJSONIntermediateDataFormat {
String[] textValues = csvText.split(",");
assertEquals(15, textValues.length);
for (String text : textValues) {
- assertEquals(text, NULL_VALUE);
+ assertEquals(text, DEFAULT_NULL_VALUE);
}
}
@@ -507,7 +507,7 @@ public class TestJSONIntermediateDataFormat {
public void testSchemaNotNullableWithCSV() {
dataFormat = new JSONIntermediateDataFormat();
dataFormat.setSchema(new Schema("Test").addColumn(new Text("t").setNullable(false)));
- dataFormat.setCSVTextData(NULL_VALUE);
+ dataFormat.setCSVTextData(DEFAULT_NULL_VALUE);
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b71f55e4/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java
new file mode 100644
index 0000000..4363814
--- /dev/null
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/hdfs/NullValueTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.hdfs;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multiset;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.common.SqoopIDFUtils;
+import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
+import org.apache.sqoop.model.MDriverConfig;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.test.asserts.HdfsAsserts;
+import org.apache.sqoop.test.infrastructure.Infrastructure;
+import org.apache.sqoop.test.infrastructure.SqoopTestCase;
+import org.apache.sqoop.test.infrastructure.providers.DatabaseInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.HadoopInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.KdcInfrastructureProvider;
+import org.apache.sqoop.test.infrastructure.providers.SqoopInfrastructureProvider;
+import org.apache.sqoop.test.utils.HdfsUtils;
+import org.apache.sqoop.test.utils.ParametrizedUtils;
+import org.testng.Assert;
+import org.testng.ITestContext;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@Infrastructure(dependencies = {KdcInfrastructureProvider.class, HadoopInfrastructureProvider.class, SqoopInfrastructureProvider.class, DatabaseInfrastructureProvider.class})
+public class NullValueTest extends SqoopTestCase {
+
+ private static final Logger LOG = Logger.getLogger(NullValueTest.class);
+
+
+ private ToFormat format;
+
+ // The custom nullValue to use (set to null if default)
+ private String nullValue;
+
+ @DataProvider(name="nul-value-test")
+ public static Object[][] data(ITestContext context) {
+ String customNullValue = "^&*custom!@";
+
+ return Iterables.toArray(
+ ParametrizedUtils.crossProduct(ToFormat.values(), new String[]{SqoopIDFUtils.DEFAULT_NULL_VALUE, customNullValue}),
+ Object[].class);
+ }
+
+ @Factory(dataProvider="nul-value-test")
+ public NullValueTest(ToFormat format, String nullValue) {
+ this.format = format;
+ this.nullValue = nullValue;
+ }
+
+ @Override
+ public String getTestName() {
+ return methodName + "[" + format.name() + ", " + nullValue + "]";
+ }
+
+ @BeforeMethod
+ public void setup() throws Exception {
+ createTableCities();
+ }
+
+ @AfterMethod()
+ public void dropTable() {
+ super.dropTable();
+ }
+
+ private boolean usingCustomNullValue() {
+ return nullValue != SqoopIDFUtils.DEFAULT_NULL_VALUE;
+ }
+
+ private String[] getCsv() {
+ return new String[] {
+ "1,'USA'," + nullValue + ",'San Francisco'",
+ "2,'USA','2004-10-24 00:00:00.000'," + nullValue,
+ "3," + nullValue + ",'2004-10-25 00:00:00.000','Brno'",
+ "4,'USA','2004-10-26 00:00:00.000','Palo Alto'"
+ };
+ }
+
+ @Test
+ public void testFromHdfs() throws Exception {
+ switch (format) {
+ case TEXT_FILE:
+ createFromFile("input-0001", getCsv());
+
+ break;
+ case SEQUENCE_FILE:
+ SequenceFile.Writer.Option optPath =
+ SequenceFile.Writer.file(new Path(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "input-0001")));
+ SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(Text.class);
+ SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(NullWritable.class);
+
+
+ SequenceFile.Writer sequenceFileWriter =
+ SequenceFile.createWriter(getHadoopConf(), optPath, optKey, optVal);
+ for (String csv : getCsv()) {
+ sequenceFileWriter.append(new Text(csv), NullWritable.get());
+ }
+ sequenceFileWriter.close();
+ break;
+ default:
+ Assert.fail();
+ }
+
+ MLink hdfsLinkFrom = getClient().createLink("hdfs-connector");
+ fillHdfsLink(hdfsLinkFrom);
+ saveLink(hdfsLinkFrom);
+
+ MLink rdbmsLinkTo = getClient().createLink("generic-jdbc-connector");
+ fillRdbmsLinkConfig(rdbmsLinkTo);
+ saveLink(rdbmsLinkTo);
+
+ MJob job = getClient().createJob(hdfsLinkFrom.getName(), rdbmsLinkTo.getName());
+
+ fillHdfsFromConfig(job);
+ fillRdbmsToConfig(job);
+
+ if (usingCustomNullValue()) {
+ job.getFromJobConfig().getBooleanInput("fromJobConfig.overrideNullValue").setValue(true);
+ job.getFromJobConfig().getStringInput("fromJobConfig.nullValue").setValue(nullValue);
+ }
+
+
+ MDriverConfig driverConfig = job.getDriverConfig();
+ driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3);
+ saveJob(job);
+
+ executeJob(job);
+
+ Assert.assertEquals(4L, provider.rowCount(getTableName()));
+ assertRowInCities(1, "USA", null, "San Francisco");
+ assertRowInCities(2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null);
+ assertRowInCities(3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
+ assertRowInCities(4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
+ }
+
+ @Test
+ public void testToHdfs() throws Exception {
+ provider.insertRow(getTableName(), 1, "USA", (java.sql.Timestamp) null, "San Francisco");
+ provider.insertRow(getTableName(), 2, "USA", Timestamp.valueOf("2004-10-24 00:00:00.000"), (String) null);
+ provider.insertRow(getTableName(), 3, (String) null, Timestamp.valueOf("2004-10-25 00:00:00.000"), "Brno");
+ provider.insertRow(getTableName(), 4, "USA", Timestamp.valueOf("2004-10-26 00:00:00.000"), "Palo Alto");
+
+ MLink rdbmsLinkFrom = getClient().createLink("generic-jdbc-connector");
+ fillRdbmsLinkConfig(rdbmsLinkFrom);
+ saveLink(rdbmsLinkFrom);
+
+
+ MLink hdfsLinkTo = getClient().createLink("hdfs-connector");
+ fillHdfsLink(hdfsLinkTo);
+ saveLink(hdfsLinkTo);
+
+ MJob job = getClient().createJob(rdbmsLinkFrom.getName(), hdfsLinkTo.getName());
+
+ fillRdbmsFromConfig(job, "id");
+ fillHdfsToConfig(job, format);
+
+ if (usingCustomNullValue()) {
+ job.getToJobConfig().getBooleanInput("toJobConfig.overrideNullValue").setValue(true);
+ job.getToJobConfig().getStringInput("toJobConfig.nullValue").setValue(nullValue);
+ }
+
+ hdfsClient.mkdirs(new Path(HdfsUtils.joinPathFragments
+ (getMapreduceDirectory(), "TO")));
+
+ job.getToJobConfig().getStringInput("toJobConfig.outputDirectory")
+ .setValue(HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"));
+
+
+ MDriverConfig driverConfig = job.getDriverConfig();
+ driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(3);
+ saveJob(job);
+
+ executeJob(job);
+
+ switch (format) {
+ case TEXT_FILE:
+ HdfsAsserts.assertMapreduceOutput(hdfsClient,
+ HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"), getCsv());
+ break;
+ case SEQUENCE_FILE:
+ Multiset<String> setLines = HashMultiset.create(Arrays.asList(getCsv()));
+ List<String> notFound = new ArrayList<>();
+ Path[] files = HdfsUtils.getOutputMapreduceFiles(hdfsClient, HdfsUtils.joinPathFragments(getMapreduceDirectory(), "TO"));
+
+ for(Path file : files) {
+ SequenceFile.Reader.Option optPath = SequenceFile.Reader.file(file);
+ SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(getHadoopConf(), optPath);
+
+ Text text = new Text();
+ while (sequenceFileReader.next(text)) {
+ if (!setLines.remove(text.toString())) {
+ notFound.add(text.toString());
+ }
+ }
+ }
+ if(!setLines.isEmpty() || !notFound.isEmpty()) {
+ LOG.error("Output do not match expectations.");
+ LOG.error("Expected lines that weren't present in the files:");
+ LOG.error("\t'" + StringUtils.join(setLines, "'\n\t'") + "'");
+ LOG.error("Extra lines in files that weren't expected:");
+ LOG.error("\t'" + StringUtils.join(notFound, "'\n\t'") + "'");
+ Assert.fail("Output do not match expectations.");
+ }
+ break;
+ default:
+ Assert.fail();
+ }
+ }
+}