You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/01/06 03:57:17 UTC
sqoop git commit: SQOOP-1901: Sqoop2: Support DRY code in IDF
impementations and add JSONIDF
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 c7ef89dba -> 4ffa806ba
SQOOP-1901: Sqoop2: Support DRY code in IDF impementations and add JSONIDF
(Veena Basavaraj via Abraham Elmahrek)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/4ffa806b
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/4ffa806b
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/4ffa806b
Branch: refs/heads/sqoop2
Commit: 4ffa806bace0fbe7cb922ef6d80ae0c5d891bfca
Parents: c7ef89d
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Mon Jan 5 17:34:12 2015 -0800
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Mon Jan 5 18:56:45 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/sqoop/schema/Schema.java | 29 +-
.../org/apache/sqoop/schema/SchemaError.java | 2 +
connector/connector-sdk/pom.xml | 4 +
.../sqoop/connector/common/SqoopIDFUtils.java | 172 +++++---
.../idf/CSVIntermediateDataFormat.java | 177 ++------
.../idf/CSVIntermediateDataFormatError.java | 17 +-
.../connector/idf/IntermediateDataFormat.java | 112 ++++-
.../idf/IntermediateDataFormatError.java | 50 +++
.../idf/JSONIntermediateDataFormat.java | 405 +++++++++++++++++++
.../idf/JSONIntermediateDataFormatError.java | 49 +++
.../idf/TestCSVIntermediateDataFormat.java | 24 +-
.../idf/TestJSONIntermediateDataFormat.java | 309 ++++++++++++++
pom.xml | 16 +-
13 files changed, 1135 insertions(+), 231 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/common/src/main/java/org/apache/sqoop/schema/Schema.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/Schema.java b/common/src/main/java/org/apache/sqoop/schema/Schema.java
index 189dbe9..bc14bcc 100644
--- a/common/src/main/java/org/apache/sqoop/schema/Schema.java
+++ b/common/src/main/java/org/apache/sqoop/schema/Schema.java
@@ -23,12 +23,15 @@ import org.apache.sqoop.schema.type.Column;
import java.util.ArrayList;
import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
/**
- * Schema represents the data fields that are transferred between {@link #From} and {@link #To}
+ * Schema represents the data fields that are transferred between {@link #From}
+ * and {@link #To}
*/
public class Schema {
@@ -57,10 +60,22 @@ public class Schema {
*/
private Set<String> columNames;
+ /**
+ * Helper map to map column name to index
+ */
+ private Map<String, Integer> nameToIndexMap;
+
+ /**
+ * column global index
+ */
+
+ int columnIndex;
+
private Schema() {
creationDate = new Date();
columns = new ArrayList<Column>();
columNames = new HashSet<String>();
+ nameToIndexMap = new HashMap<String, Integer>();
}
public Schema(String name) {
@@ -85,11 +100,10 @@ public class Schema {
if(columNames.contains(column.getName())) {
throw new SqoopException(SchemaError.SCHEMA_0002, "Column: " + column);
}
-
columNames.add(column.getName());
-
columns.add(column);
-
+ nameToIndexMap.put(column.getName(), columnIndex);
+ columnIndex ++;
return this;
}
@@ -127,6 +141,13 @@ public class Schema {
return columns.size();
}
+ public Integer getColumnNameIndex(String name) {
+ if (columNames.contains(name)) {
+ return nameToIndexMap.get(name);
+ }
+ throw new SqoopException(SchemaError.SCHEMA_0007, "Column: " + name);
+ }
+
public boolean isEmpty() {
return columns.size() == 0;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/common/src/main/java/org/apache/sqoop/schema/SchemaError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/SchemaError.java b/common/src/main/java/org/apache/sqoop/schema/SchemaError.java
index 33e4672..6b7fb48 100644
--- a/common/src/main/java/org/apache/sqoop/schema/SchemaError.java
+++ b/common/src/main/java/org/apache/sqoop/schema/SchemaError.java
@@ -38,6 +38,8 @@ public enum SchemaError implements ErrorCode {
SCHEMA_0006("Schema without name"),
+ SCHEMA_0007("Unknown column name"),
+
;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/pom.xml b/connector/connector-sdk/pom.xml
index 38c217a..46cb9f8 100644
--- a/connector/connector-sdk/pom.xml
+++ b/connector/connector-sdk/pom.xml
@@ -37,6 +37,10 @@ limitations under the License.
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
index 48adae1..979aa4f 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.connector.common;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.CSVIntermediateDataFormatError;
+import org.apache.sqoop.connector.idf.IntermediateDataFormatError;
import org.apache.sqoop.schema.type.AbstractComplexListType;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.ColumnType;
@@ -38,6 +39,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -87,16 +89,25 @@ public class SqoopIDFUtils {
// only date, no time
public static final DateTimeFormatter df = DateTimeFormat.forPattern("yyyy-MM-dd");
// time with fraction only, no timezone
- public static final DateTimeFormatter tfWithFraction = DateTimeFormat.forPattern("HH:mm:ss.SSSSSS");
+ public static final DateTimeFormatter tfWithFraction = DateTimeFormat.forPattern("HH:mm:ss.SSS");
public static final DateTimeFormatter tfWithNoFraction = DateTimeFormat.forPattern("HH:mm:ss");
- static final String[] TRUE_BIT_VALUES = new String[] { "1", "true", "TRUE" };
- static final Set<String> TRUE_BIT_SET = new HashSet<String>(Arrays.asList(TRUE_BIT_VALUES));
- static final String[] FALSE_BIT_VALUES = new String[] { "0", "false", "FALSE" };
- static final Set<String> FALSE_BIT_SET = new HashSet<String>(Arrays.asList(FALSE_BIT_VALUES));
+ public static final String[] TRUE_BIT_VALUES = new String[] { "1", "true", "TRUE" };
+ public static final Set<String> TRUE_BIT_SET = new HashSet<String>(Arrays.asList(TRUE_BIT_VALUES));
+ public static final String[] FALSE_BIT_VALUES = new String[] { "0", "false", "FALSE" };
+ public static final Set<String> FALSE_BIT_SET = new HashSet<String>(Arrays.asList(FALSE_BIT_VALUES));
// ******** Number Column Type utils***********
+ public static String encodeToCSVFixedPoint(Object obj, Column column) {
+ Long byteSize = ((FixedPoint) column).getByteSize();
+ if (byteSize != null && byteSize <= Integer.SIZE) {
+ return ((Integer) obj).toString();
+ } else {
+ return ((Long) obj).toString();
+ }
+ }
+
public static Object toFixedPoint(String csvString, Column column) {
Object returnValue;
Long byteSize = ((FixedPoint) column).getByteSize();
@@ -108,6 +119,15 @@ public class SqoopIDFUtils {
return returnValue;
}
+ public static String encodeToCSVFloatingPoint(Object obj, Column column) {
+ Long byteSize = ((FloatingPoint) column).getByteSize();
+ if (byteSize != null && byteSize <= Float.SIZE) {
+ return ((Float) obj).toString();
+ } else {
+ return ((Double) obj).toString();
+ }
+ }
+
public static Object toFloatingPoint(String csvString, Column column) {
Object returnValue;
Long byteSize = ((FloatingPoint) column).getByteSize();
@@ -119,43 +139,44 @@ public class SqoopIDFUtils {
return returnValue;
}
+ public static String encodeToCSVDecimal(Object obj) {
+ return ((BigDecimal)obj).toString();
+ }
public static Object toDecimal(String csvString, Column column) {
return new BigDecimal(csvString);
}
// ********** BIT Column Type utils******************
- public static void encodeToCSVBit(Object[] objectArray, int i) {
- String bitStringValue = objectArray[i].toString();
+ public static String encodeToCSVBit(Object obj) {
+ String bitStringValue = obj.toString();
if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
- objectArray[i] = bitStringValue;
+ return bitStringValue;
} else {
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: "
- + objectArray[i]);
+ throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006, " given bit value: " + bitStringValue);
}
}
- public static Object toBit(String csvString, Object returnValue) {
+ public static Object toBit(String csvString) {
if ((TRUE_BIT_SET.contains(csvString)) || (FALSE_BIT_SET.contains(csvString))) {
- returnValue = TRUE_BIT_SET.contains(csvString);
+ return TRUE_BIT_SET.contains(csvString);
} else {
// throw an exception for any unsupported value for BITs
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0009, " given bit value: " + csvString);
+ throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006, " given bit value: " + csvString);
}
- return returnValue;
}
// *********** DATE and TIME Column Type utils **********
- public static void encodeToCSVDate(Object[] objectArray, int i) {
- org.joda.time.LocalDate date = (org.joda.time.LocalDate) objectArray[i];
- objectArray[i] = encloseWithQuote(df.print(date));
+ public static String encodeToCSVDate(Object obj) {
+ org.joda.time.LocalDate date = (org.joda.time.LocalDate) obj;
+ return encloseWithQuote(df.print(date));
}
- public static void encodeToCSVTime(Object[] objectArray, int i, Column col) {
+ public static String encodeToCSVTime(Object obj, Column col) {
if (((org.apache.sqoop.schema.type.Time) col).hasFraction()) {
- objectArray[i] = encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) objectArray[i]));
+ return encloseWithQuote(tfWithFraction.print((org.joda.time.LocalTime) obj));
} else {
- objectArray[i] = encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) objectArray[i]));
+ return encloseWithQuote(tfWithNoFraction.print((org.joda.time.LocalTime) obj));
}
}
@@ -169,31 +190,34 @@ public class SqoopIDFUtils {
// *********** DATE TIME Column Type utils **********
- public static void encodeToCSVLocalDateTime(Object[] objectArray, int i, Column col, org.joda.time.LocalDateTime localDateTime) {
+ public static String encodeToCSVLocalDateTime(Object obj, Column col) {
+ org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) obj;
org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
if (column.hasFraction()) {
- objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(localDateTime));
+ return encloseWithQuote(dtfWithFractionNoTimeZone.print(localDateTime));
} else {
- objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(localDateTime));
+ return encloseWithQuote(dtfWithNoFractionAndTimeZone.print(localDateTime));
}
}
- public static void encodeToCSVDateTime(Object[] objectArray, int i, Column col, org.joda.time.DateTime dateTime) {
+
+ public static String encodeToCSVDateTime(Object obj, Column col) {
+ org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj;
org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
if (column.hasFraction() && column.hasTimezone()) {
- objectArray[i] = encloseWithQuote(dtfWithFractionAndTimeZone.print(dateTime));
+ return encloseWithQuote(dtfWithFractionAndTimeZone.print(dateTime));
} else if (column.hasFraction() && !column.hasTimezone()) {
- objectArray[i] = encloseWithQuote(dtfWithFractionNoTimeZone.print(dateTime));
+ return encloseWithQuote(dtfWithFractionNoTimeZone.print(dateTime));
} else if (column.hasTimezone()) {
- objectArray[i] = encloseWithQuote(dtfWithNoFractionWithTimeZone.print(dateTime));
+ return encloseWithQuote(dtfWithNoFractionWithTimeZone.print(dateTime));
} else {
- objectArray[i] = encloseWithQuote(dtfWithNoFractionAndTimeZone.print(dateTime));
+ return encloseWithQuote(dtfWithNoFractionAndTimeZone.print(dateTime));
}
}
- public static Object toDateTime(String fieldString, Column column) {
+ public static Object toDateTime(String csvString, Column column) {
Object returnValue;
- String dateTime = removeQuotes(fieldString);
+ String dateTime = removeQuotes(csvString);
org.apache.sqoop.schema.type.DateTime col = ((org.apache.sqoop.schema.type.DateTime) column);
if (col.hasFraction() && col.hasTimezone()) {
// After calling withOffsetParsed method, a string
@@ -227,7 +251,7 @@ public class SqoopIDFUtils {
try {
object = (JSONObject) new JSONParser().parse(removeQuotes(csvString));
} catch (org.json.simple.parser.ParseException e) {
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e);
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0003, e);
}
if (object != null) {
return toMap(object);
@@ -235,7 +259,7 @@ public class SqoopIDFUtils {
return null;
}
- private static List<Object> toList(JSONArray array) {
+ public static List<Object> toList(JSONArray array) {
List<Object> list = new ArrayList<Object>();
for (int i = 0; i < array.size(); i++) {
Object value = array.get(i);
@@ -252,7 +276,7 @@ public class SqoopIDFUtils {
}
@SuppressWarnings("unchecked")
- private static Map<Object, Object> toMap(JSONObject object) {
+ public static Map<Object, Object> toMap(JSONObject object) {
Map<Object, Object> elementMap = new HashMap<Object, Object>();
Set<Map.Entry<Object, Object>> entries = object.entrySet();
for (Map.Entry<Object, Object> entry : entries) {
@@ -273,13 +297,18 @@ public class SqoopIDFUtils {
// ************ LIST Column Type utils*********
@SuppressWarnings("unchecked")
- public static String encodeToCSVList(Object[] list, AbstractComplexListType column) {
+ public static String encodeToCSVList(Object[] list, Column column) {
List<Object> elementList = new ArrayList<Object>();
for (int n = 0; n < list.length; n++) {
Column listType = ((AbstractComplexListType) column).getListType();
+ //2 level nesting supported
if (isColumnListType(listType)) {
Object[] listElements = (Object[]) list[n];
- elementList.add((Arrays.deepToString(listElements)));
+ JSONArray subArray = new JSONArray();
+ for (int i = 0; i < listElements.length; i++) {
+ subArray.add(listElements[i]);
+ }
+ elementList.add(subArray);
} else {
elementList.add(list[n]);
}
@@ -295,7 +324,7 @@ public class SqoopIDFUtils {
try {
array = (JSONArray) new JSONParser().parse(removeQuotes(csvString));
} catch (org.json.simple.parser.ParseException e) {
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0008, e);
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0003, e);
}
if (array != null) {
return array.toArray();
@@ -321,31 +350,32 @@ public class SqoopIDFUtils {
replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j]));
}
} catch (Exception e) {
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + " " + replacement
- + " " + String.valueOf(j) + " " + e.getMessage());
+ throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + " " + replacement + " "
+ + String.valueOf(j) + " " + e.getMessage());
}
return encloseWithQuote(replacement);
}
- public static String toText(String csvString) {
+ public static String toText(String string) {
// Remove the trailing and starting quotes.
- csvString = removeQuotes(csvString);
+ string = removeQuotes(string);
int j = 0;
try {
for (j = 0; j < replacements.length; j++) {
- csvString = csvString.replaceAll(getRegExp(replacements[j]), Matcher.quoteReplacement(String.valueOf(originals[j])));
+ string = string.replaceAll(getRegExp(replacements[j]), Matcher.quoteReplacement(String.valueOf(originals[j])));
}
} catch (Exception e) {
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, csvString + " "
- + String.valueOf(j) + e.getMessage());
+ throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, string + " " + String.valueOf(j)
+ + e.getMessage());
}
- return csvString;
+ return string;
}
// ************ BINARY Column type utils*********
- public static String encodeToCSVByteArray(byte[] bytes) {
+ public static String encodeToCSVByteArray(Object obj) {
+ byte[] bytes = (byte[]) obj;
try {
return encodeToCSVString(new String(bytes, BYTE_FIELD_CHARSET));
} catch (UnsupportedEncodingException e) {
@@ -393,4 +423,56 @@ public class SqoopIDFUtils {
return stringType.getType().equals(ColumnType.TEXT) || stringType.getType().equals(ColumnType.ENUM);
}
+ // ******* parse sqoop CSV ********
+ /**
+ * Custom CSV Text parser that honors quoting and escaped quotes.
+ *
+ * @return String[]
+ */
+ public static String[] parseCSVString(String csvText) {
+ if (csvText == null) {
+ return null;
+ }
+
+ boolean quoted = false;
+ boolean escaped = false;
+
+ List<String> parsedData = new LinkedList<String>();
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < csvText.length(); ++i) {
+ char c = csvText.charAt(i);
+ switch (c) {
+ case QUOTE_CHARACTER:
+ builder.append(c);
+ if (escaped) {
+ escaped = false;
+ } else {
+ quoted = !quoted;
+ }
+ break;
+ case ESCAPE_CHARACTER:
+ builder.append(ESCAPE_CHARACTER);
+ escaped = !escaped;
+ break;
+ case CSV_SEPARATOR_CHARACTER:
+ if (quoted) {
+ builder.append(c);
+ } else {
+ parsedData.add(builder.toString());
+ builder = new StringBuilder();
+ }
+ break;
+ default:
+ if (escaped) {
+ escaped = false;
+ }
+ builder.append(c);
+ break;
+ }
+ }
+ parsedData.add(builder.toString());
+
+ return parsedData.toArray(new String[parsedData.size()]);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index ad1dc04..be1147d 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -26,7 +26,6 @@ import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.AbstractComplexListType;
import org.apache.sqoop.schema.type.Column;
-import org.apache.sqoop.schema.type.ColumnType;
import org.apache.sqoop.utils.ClassUtils;
import org.joda.time.DateTime;
import org.joda.time.LocalDate;
@@ -38,8 +37,6 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -56,25 +53,12 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
public static final Logger LOG = Logger.getLogger(CSVIntermediateDataFormat.class);
- private final Set<Integer> stringTypeColumnIndices = new HashSet<Integer>();
- private final Set<Integer> bitTypeColumnIndices = new HashSet<Integer>();
- private final Set<Integer> byteTypeColumnIndices = new HashSet<Integer>();
- private final Set<Integer> listTypeColumnIndices = new HashSet<Integer>();
- private final Set<Integer> mapTypeColumnIndices = new HashSet<Integer>();
- private final Set<Integer> dateTimeTypeColumnIndices = new HashSet<Integer>();
- private final Set<Integer> dateTypeColumnIndices = new HashSet<Integer>();
- private final Set<Integer> timeColumnIndices = new HashSet<Integer>();
-
-
- private Schema schema;
-
public CSVIntermediateDataFormat() {
}
public CSVIntermediateDataFormat(Schema schema) {
setSchema(schema);
}
-
/**
* {@inheritDoc}
*/
@@ -87,129 +71,45 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
* {@inheritDoc}
*/
@Override
- public void setCSVTextData(String text) {
- this.data = text;
+ public void setCSVTextData(String csvText) {
+ this.data = csvText;
}
/**
* {@inheritDoc}
*/
@Override
- public void setSchema(Schema schema) {
- if (schema == null) {
- return;
- }
- this.schema = schema;
- Column[] columns = schema.getColumnsArray();
- int i = 0;
- for (Column col : columns) {
- if (isColumnStringType(col)) {
- stringTypeColumnIndices.add(i);
- } else if (col.getType() == ColumnType.BIT) {
- bitTypeColumnIndices.add(i);
- } else if (col.getType() == ColumnType.DATE) {
- dateTypeColumnIndices.add(i);
- } else if (col.getType() == ColumnType.TIME) {
- timeColumnIndices.add(i);
- } else if (col.getType() == ColumnType.DATE_TIME) {
- dateTimeTypeColumnIndices.add(i);
- } else if (col.getType() == ColumnType.BINARY) {
- byteTypeColumnIndices.add(i);
- } else if (isColumnListType(col)) {
- listTypeColumnIndices.add(i);
- } else if (col.getType() == ColumnType.MAP) {
- mapTypeColumnIndices.add(i);
- }
- i++;
- }
- }
-
- /**
- * Custom CSV Text parser that honors quoting and escaped quotes.
- *
- * @return String[]
- */
- private String[] parseCSVString() {
- if (data == null) {
- return null;
- }
-
- boolean quoted = false;
- boolean escaped = false;
-
- List<String> parsedData = new LinkedList<String>();
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < data.length(); ++i) {
- char c = data.charAt(i);
- switch (c) {
- case QUOTE_CHARACTER:
- builder.append(c);
- if (escaped) {
- escaped = false;
- } else {
- quoted = !quoted;
- }
- break;
- case ESCAPE_CHARACTER:
- builder.append(ESCAPE_CHARACTER);
- escaped = !escaped;
- break;
- case CSV_SEPARATOR_CHARACTER:
- if (quoted) {
- builder.append(c);
- } else {
- parsedData.add(builder.toString());
- builder = new StringBuilder();
- }
- break;
- default:
- if (escaped) {
- escaped = false;
- }
- builder.append(c);
- break;
- }
- }
- parsedData.add(builder.toString());
-
- return parsedData.toArray(new String[parsedData.size()]);
- }
-
- /**
- * Converts the CSV String array into actual object array based on its
- * corresponding column type {@inheritDoc}
- */
- @Override
public Object[] getObjectData() {
if (schema == null || schema.isEmpty()) {
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006);
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0002);
}
// fieldStringArray represents the csv fields parsed into string array
- String[] fieldStringArray = parseCSVString();
+ String[] csvStringArray = parseCSVString(this.data);
- if (fieldStringArray == null) {
+ if (csvStringArray == null) {
return null;
}
- if (fieldStringArray.length != schema.getColumnsCount()) {
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0005,
- "The data " + getCSVTextData() + " has the wrong number of fields.");
+ if (csvStringArray.length != schema.getColumnsArray().length) {
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + getCSVTextData()
+ + " has the wrong number of fields.");
}
- Object[] objectArray = new Object[fieldStringArray.length];
+ Object[] objectArray = new Object[csvStringArray.length];
Column[] columnArray = schema.getColumnsArray();
- for (int i = 0; i < fieldStringArray.length; i++) {
+ for (int i = 0; i < csvStringArray.length; i++) {
// check for NULL field and bail out immediately
- if (fieldStringArray[i].equals(NULL_VALUE)) {
+ if (csvStringArray[i].equals(NULL_VALUE)) {
objectArray[i] = null;
continue;
}
- objectArray[i] = toObject(fieldStringArray[i], columnArray[i]);
+ objectArray[i] = toObject(csvStringArray[i], columnArray[i]);
}
return objectArray;
}
+
private Object toObject(String csvString, Column column) {
Object returnValue = null;
@@ -242,7 +142,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
returnValue = toDateTime(csvString, column);
break;
case BIT:
- returnValue = toBit(csvString, returnValue);
+ returnValue = toBit(csvString);
break;
case ARRAY:
case SET:
@@ -258,14 +158,12 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
return returnValue;
}
-
-
/**
- * Appends the actual java objects into CSV string {@inheritDoc}
+ * {@inheritDoc}
*/
@Override
public void setObjectData(Object[] data) {
- Set<Integer> nullValueIndices = new HashSet<Integer>();
+ Set<Integer> nullValueIndices = new HashSet<Integer>();
Column[] columnArray = schema.getColumnsArray();
// check for null
for (int i = 0; i < data.length; i++) {
@@ -277,6 +175,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
// ignore the null values while encoding the object array into csv string
encodeToCSVText(data, columnArray, nullValueIndices);
this.data = StringUtils.join(data, CSV_SEPARATOR_CHARACTER);
+
}
/**
@@ -288,6 +187,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
}
/**
+ *
* {@inheritDoc}
*/
@Override
@@ -296,30 +196,18 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
}
/**
- * {@inheritDoc}
+ * Encode to the sqoop prescribed CSV String for every element in the object
+ * array
+ *
+ * @param objectArray
+ * @param columnArray
+ * @param nullValueIndices
*/
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
- if (other == null || !(other instanceof CSVIntermediateDataFormat)) {
- return false;
- }
- return data.equals(((CSVIntermediateDataFormat) other).data);
- }
-
- /**
- * Encode to the sqoop prescribed CSV String for every element in the objet array
- * @param objectArray
- * @param columnArray
- * @param nullValueIndices
- */
@SuppressWarnings("unchecked")
private void encodeToCSVText(Object[] objectArray, Column[] columnArray, Set<Integer> nullValueIndices) {
for (int i : bitTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
- encodeToCSVBit(objectArray, i);
+ objectArray[i] = encodeToCSVBit(objectArray[i]);
}
}
for (int i : stringTypeColumnIndices) {
@@ -333,22 +221,22 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
if (objectArray[i] instanceof org.joda.time.DateTime) {
org.joda.time.DateTime dateTime = (org.joda.time.DateTime) objectArray[i];
// check for fraction and time zone and then use the right formatter
- encodeToCSVDateTime(objectArray, i, col, dateTime);
+ objectArray[i] = encodeToCSVDateTime(dateTime, col);
} else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
org.joda.time.LocalDateTime localDateTime = (org.joda.time.LocalDateTime) objectArray[i];
- encodeToCSVLocalDateTime(objectArray, i, col, localDateTime);
+ objectArray[i] = encodeToCSVLocalDateTime(localDateTime, col);
}
}
}
for (int i : dateTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
- encodeToCSVDate(objectArray, i);
+ objectArray[i] = encodeToCSVDate(objectArray[i]);
}
}
- for (int i : timeColumnIndices) {
+ for (int i : timeTypeColumnIndices) {
Column col = columnArray[i];
if (!nullValueIndices.contains(i)) {
- encodeToCSVTime(objectArray, i, col);
+ objectArray[i] = encodeToCSVTime(objectArray[i], col);
}
}
for (int i : byteTypeColumnIndices) {
@@ -358,7 +246,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
}
for (int i : listTypeColumnIndices) {
if (!nullValueIndices.contains(i)) {
- objectArray[i] = encodeToCSVList((Object[]) objectArray[i], (AbstractComplexListType)columnArray[i]);
+ objectArray[i] = encodeToCSVList((Object[]) objectArray[i], (AbstractComplexListType) columnArray[i]);
}
}
for (int i : mapTypeColumnIndices) {
@@ -384,5 +272,4 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
jars.add(ClassUtils.jarForClass(JSONValue.class));
return jars;
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
index 884550d..a88db45 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
@@ -22,7 +22,7 @@ import org.apache.sqoop.common.ErrorCode;
public enum CSVIntermediateDataFormatError implements ErrorCode {
/** An unknown error has occurred. */
- INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
+ CSV_INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
/** An encoding is missing in the Java native libraries. */
CSV_INTERMEDIATE_DATA_FORMAT_0001("Native character set error."),
@@ -36,17 +36,14 @@ public enum CSVIntermediateDataFormatError implements ErrorCode {
/** Column type isn't known by Intermediate Data Format. */
CSV_INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
- /** Number of columns in schema does not match the data set. */
- CSV_INTERMEDIATE_DATA_FORMAT_0005("Wrong number of columns."),
-
- /** Schema is missing in the IDF. */
- CSV_INTERMEDIATE_DATA_FORMAT_0006("Schema missing."),
-
- /** For arrays and maps we use JSON representation and incorrect representation results in parse exception*/
- CSV_INTERMEDIATE_DATA_FORMAT_0008("JSON parse internal error."),
+ /**
+ * For arrays and maps we use JSON representation and incorrect representation
+ * results in parse exception
+ */
+ CSV_INTERMEDIATE_DATA_FORMAT_0005("JSON parse internal error."),
/** Unsupported bit values */
- CSV_INTERMEDIATE_DATA_FORMAT_0009("Unsupported bit value."),
+ CSV_INTERMEDIATE_DATA_FORMAT_0006("Unsupported bit value."),
;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index 7ac44dc..adeb2ec 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -18,12 +18,21 @@
*/
package org.apache.sqoop.connector.idf;
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
+
+import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.AbstractComplexListType;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.ColumnType;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
/**
@@ -44,9 +53,16 @@ public abstract class IntermediateDataFormat<T> {
protected volatile T data;
- public int hashCode() {
- return data.hashCode();
- }
+ protected Schema schema;
+
+ protected final Set<Integer> stringTypeColumnIndices = new HashSet<Integer>();
+ protected final Set<Integer> bitTypeColumnIndices = new HashSet<Integer>();
+ protected final Set<Integer> byteTypeColumnIndices = new HashSet<Integer>();
+ protected final Set<Integer> listTypeColumnIndices = new HashSet<Integer>();
+ protected final Set<Integer> mapTypeColumnIndices = new HashSet<Integer>();
+ protected final Set<Integer> dateTimeTypeColumnIndices = new HashSet<Integer>();
+ protected final Set<Integer> dateTypeColumnIndices = new HashSet<Integer>();
+ protected final Set<Integer> timeTypeColumnIndices = new HashSet<Integer>();
/**
@@ -68,6 +84,7 @@ public abstract class IntermediateDataFormat<T> {
public void setData(T obj) {
this.data = obj;
}
+
/**
* Get one row of data as CSV text. Use {@link #SqoopIDFUtils} for reading and writing
* into the sqoop specified CSV text format for each {@link #ColumnType} field in the row
@@ -89,33 +106,66 @@ public abstract class IntermediateDataFormat<T> {
/**
* Set one row of data as CSV.
- *
*/
public abstract void setCSVTextData(String csvText);
/**
- * Get one row of data as an Object array. Sqoop uses defined object representation
- * for each column type. For instance org.joda.time to represent date.Use {@link #SqoopIDFUtils}
- * for reading and writing into the sqoop specified object format
- * for each {@link #ColumnType} field in the row
+ * Get one row of data as an Object array.
+ * Sqoop uses defined object representation
+ * for each column type. For instance org.joda.time to represent date.
+ * Use {@link #SqoopIDFUtils} for reading and writing into the sqoop
+ * specified object format for each {@link #ColumnType} field in the row
* </p>
* @return - String representing the data as an Object array
* If FROM and TO schema exist, we will use SchemaMatcher to get the data according to "TO" schema
*/
public abstract Object[] getObjectData();
- /**
- * Set one row of data as an Object array.
- *
- */
+ /**
+ * Set one row of data as an Object array.
+ * It also should construct the data representation
+ * that the IDF represents so that the object is ready to
+ * consume when getData is invoked. Custom implementations
+ * will override this method to convert form object array
+ * to the data format
+ */
public abstract void setObjectData(Object[] data);
/**
- * Set the schema for serializing/de-serializing data.
+ * Set the schema for serializing/de-serializing data.
*
- * @param schema - the schema used for serializing/de-serializing data
+ * @param schema
+ * - the schema used for serializing/de-serializing data
*/
- public abstract void setSchema(Schema schema);
+ public void setSchema(Schema schema) {
+ if (schema == null) {
+ // TODO(SQOOP-1956): throw an exception since working without a schema is dangerous
+ return;
+ }
+ this.schema = schema;
+ Column[] columns = schema.getColumnsArray();
+ int i = 0;
+ for (Column col : columns) {
+ if (isColumnStringType(col)) {
+ stringTypeColumnIndices.add(i);
+ } else if (col.getType() == ColumnType.BIT) {
+ bitTypeColumnIndices.add(i);
+ } else if (col.getType() == ColumnType.DATE) {
+ dateTypeColumnIndices.add(i);
+ } else if (col.getType() == ColumnType.TIME) {
+ timeTypeColumnIndices.add(i);
+ } else if (col.getType() == ColumnType.DATE_TIME) {
+ dateTimeTypeColumnIndices.add(i);
+ } else if (col.getType() == ColumnType.BINARY) {
+ byteTypeColumnIndices.add(i);
+ } else if (isColumnListType(col)) {
+ listTypeColumnIndices.add(i);
+ } else if (col.getType() == ColumnType.MAP) {
+ mapTypeColumnIndices.add(i);
+ }
+ i++;
+ }
+ }
/**
* Serialize the fields of this object to <code>out</code>.
@@ -135,7 +185,6 @@ public abstract class IntermediateDataFormat<T> {
* @throws IOException
*/
public abstract void read(DataInput in) throws IOException;
-
/**
* Provide the external jars that the IDF depends on
* @return set of jars
@@ -144,4 +193,35 @@ public abstract class IntermediateDataFormat<T> {
return new HashSet<String>();
}
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((data == null) ? 0 : data.hashCode());
+ result = prime * result + ((schema == null) ? 0 : schema.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ IntermediateDataFormat<?> other = (IntermediateDataFormat<?>) obj;
+ if (data == null) {
+ if (other.data != null)
+ return false;
+ } else if (!data.equals(other.data))
+ return false;
+ if (schema == null) {
+ if (other.schema != null)
+ return false;
+ } else if (!schema.equals(other.schema))
+ return false;
+ return true;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
new file mode 100644
index 0000000..bda75fc
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum IntermediateDataFormatError implements ErrorCode {
+ /** An unknown error has occurred. */
+ INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
+
+ /** Number of columns in schema does not match the data set. */
+ INTERMEDIATE_DATA_FORMAT_0001("Wrong number of columns."),
+
+ /** Schema is missing in the IDF. */
+ INTERMEDIATE_DATA_FORMAT_0002("Schema missing."),
+
+ INTERMEDIATE_DATA_FORMAT_0003("JSON parse error"),
+
+ ;
+
+ private final String message;
+
+ private IntermediateDataFormatError(String message) {
+ this.message = message;
+ }
+
+ public String getCode() {
+ return name();
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
new file mode 100644
index 0000000..9329cf8
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.utils.ClassUtils;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * IDF representing the intermediate format in JSON
+ */
+public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObject> {
+
+ // We need schema at all times
+ public JSONIntermediateDataFormat(Schema schema) {
+ setSchema(schema);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setCSVTextData(String text) {
+ // convert the CSV text to JSON
+ this.data = toJSON(text);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getCSVTextData() {
+ // convert JSON to sqoop CSV
+ return toCSV(data);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setObjectData(Object[] data) {
+ // convert the object Array to JSON
+ this.data = toJSON(data);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Object[] getObjectData() {
+ // convert JSON to object array
+ return toObject(data);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(this.data.toJSONString());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void read(DataInput in) throws IOException {
+ try {
+ data = (JSONObject) new JSONParser().parse(in.readUTF());
+ } catch (ParseException e) {
+ throw new SqoopException(JSONIntermediateDataFormatError.JSON_INTERMEDIATE_DATA_FORMAT_0002, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Set<String> getJars() {
+
+ Set<String> jars = super.getJars();
+ jars.add(ClassUtils.jarForClass(JSONObject.class));
+ return jars;
+ }
+
+ @SuppressWarnings("unchecked")
+ private JSONObject toJSON(String csv) {
+
+ String[] csvStringArray = parseCSVString(csv);
+
+ if (csvStringArray == null) {
+ return null;
+ }
+
+ if (csvStringArray.length != schema.getColumnsArray().length) {
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + csv
+ + " has the wrong number of fields.");
+ }
+ JSONObject object = new JSONObject();
+ Column[] columnArray = schema.getColumnsArray();
+ for (int i = 0; i < csvStringArray.length; i++) {
+ // check for NULL field and bail out immediately
+ if (csvStringArray[i].equals(NULL_VALUE)) {
+ object.put(columnArray[i].getName(), null);
+ continue;
+ }
+ object.put(columnArray[i].getName(), toJSON(csvStringArray[i], columnArray[i]));
+
+ }
+ return object;
+ }
+
+ private Object toJSON(String csvString, Column column) {
+ Object returnValue = null;
+
+ switch (column.getType()) {
+ case ARRAY:
+ case SET:
+ try {
+ returnValue = (JSONArray) new JSONParser().parse(removeQuotes(csvString));
+ } catch (ParseException e) {
+ throw new SqoopException(JSONIntermediateDataFormatError.JSON_INTERMEDIATE_DATA_FORMAT_0002, e);
+ }
+ break;
+ case MAP:
+ try {
+ returnValue = (JSONObject) new JSONParser().parse(removeQuotes(csvString));
+ } catch (ParseException e) {
+ throw new SqoopException(JSONIntermediateDataFormatError.JSON_INTERMEDIATE_DATA_FORMAT_0003, e);
+ }
+ break;
+ case ENUM:
+ case TEXT:
+ returnValue = toText(csvString);
+ break;
+ case BINARY:
+ case UNKNOWN:
+ returnValue = Base64.encodeBase64String(toByteArray(csvString));
+ break;
+ case FIXED_POINT:
+ returnValue = toFixedPoint(csvString, column);
+ break;
+ case FLOATING_POINT:
+ returnValue = toFloatingPoint(csvString, column);
+ break;
+ case DECIMAL:
+ returnValue = toDecimal(csvString, column);
+ break;
+ case DATE:
+ case TIME:
+ case DATE_TIME:
+ // store as string expected to be in the JODA time format in CSV
+ // stored in JSON as joda time format
+ returnValue = removeQuotes(csvString);
+ break;
+ // true/false and TRUE/ FALSE are the only accepted values for JSON Bit
+ // will be stored as true/false in JSON
+ case BIT:
+ returnValue = Boolean.valueOf(removeQuotes(csvString));
+ break;
+ default:
+ throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0004,
+ "Column type from schema was not recognized for " + column.getType());
+ }
+ return returnValue;
+ }
+
+ @SuppressWarnings("unchecked")
+ private JSONObject toJSON(Object[] data) {
+
+ if (data == null) {
+ return null;
+ }
+
+ if (data.length != schema.getColumnsArray().length) {
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + data.toString()
+ + " has the wrong number of fields.");
+ }
+ JSONObject object = new JSONObject();
+ Column[] cols = schema.getColumnsArray();
+ for (int i = 0; i < data.length; i++) {
+ switch (cols[i].getType()) {
+ case ARRAY:
+ case SET:
+ // store as JSON array
+ Object[] objArray = (Object[]) data[i];
+ JSONArray jsonArray = toJSONArray(objArray);
+ object.put(cols[i].getName(), jsonArray);
+ break;
+ case MAP:
+ // store as JSON object
+ Map<Object, Object> map = (Map<Object, Object>) data[i];
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.putAll(map);
+ object.put(cols[i].getName(), jsonObject);
+ break;
+ case ENUM:
+ case TEXT:
+ object.put(cols[i].getName(), data[i]);
+ break;
+ case BINARY:
+ case UNKNOWN:
+ object.put(cols[i].getName(), Base64.encodeBase64String((byte[]) data[i]));
+ break;
+ case FIXED_POINT:
+ case FLOATING_POINT:
+ case DECIMAL:
+ // store a object
+ object.put(cols[i].getName(), data[i]);
+ break;
+ // stored in JSON as the same format as csv strings in the joda time
+ // format
+ case DATE_TIME:
+ object.put(cols[i].getName(), removeQuotes(encodeToCSVDateTime(data[i], cols[i])));
+ break;
+ case TIME:
+ object.put(cols[i].getName(), removeQuotes(encodeToCSVTime(data[i], cols[i])));
+ break;
+ case DATE:
+ object.put(cols[i].getName(), removeQuotes(encodeToCSVDate(data[i])));
+ break;
+ case BIT:
+ object.put(cols[i].getName(), Boolean.valueOf(encodeToCSVBit(data[i])));
+ break;
+ default:
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+ "Column type from schema was not recognized for " + cols[i].getType());
+ }
+ }
+
+ return object;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static JSONArray toJSONArray(Object[] objectArray) {
+ JSONArray jsonArray = new JSONArray();
+ for (int i = 0; i < objectArray.length; i++) {
+ Object value = objectArray[i];
+ if (value instanceof Object[]) {
+ value = toJSONArray((Object[]) value);
+ }
+ jsonArray.add(value);
+ }
+ return jsonArray;
+ }
+
+ private String toCSV(JSONObject json) {
+ Column[] cols = this.schema.getColumnsArray();
+
+ StringBuilder csvString = new StringBuilder();
+ for (int i = 0; i < cols.length; i++) {
+
+ // or we can to json.entrySet();
+ Object obj = json.get(cols[i].getName());
+ if (obj == null) {
+ throw new SqoopException(JSONIntermediateDataFormatError.JSON_INTERMEDIATE_DATA_FORMAT_0003, " for " + cols[i].getName());
+ }
+
+ switch (cols[i].getType()) {
+ case ARRAY:
+ case SET:
+ // stored as JSON array
+ JSONArray array = (JSONArray) obj;
+ csvString.append(encloseWithQuote(array.toJSONString()));
+ break;
+ case MAP:
+ // stored as JSON object
+ csvString.append(encloseWithQuote((((JSONObject) obj).toJSONString())));
+ break;
+ case ENUM:
+ case TEXT:
+ csvString.append(encodeToCSVString(obj.toString()));
+ break;
+ case BINARY:
+ case UNKNOWN:
+ csvString.append(encodeToCSVByteArray(Base64.decodeBase64(obj.toString())));
+ break;
+ case FIXED_POINT:
+ csvString.append(encodeToCSVFixedPoint(obj, cols[i]));
+ break;
+ case FLOATING_POINT:
+ csvString.append(encodeToCSVFloatingPoint(obj, cols[i]));
+ break;
+ case DECIMAL:
+ csvString.append(encodeToCSVDecimal(obj));
+ break;
+ // stored in JSON as strings in the joda time format
+ case DATE:
+ case TIME:
+ case DATE_TIME:
+ csvString.append(encloseWithQuote(obj.toString()));
+ break;
+ // 0/1 will be stored as they are in JSON, even though valid values in
+ // JSON
+ // are true/false
+ case BIT:
+ csvString.append(encodeToCSVBit(obj));
+ break;
+ default:
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+ "Column type from schema was not recognized for " + cols[i].getType());
+ }
+ if (i < cols.length - 1) {
+ csvString.append(CSV_SEPARATOR_CHARACTER);
+ }
+
+ }
+
+ return csvString.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Object[] toObject(JSONObject data) {
+
+ if (data == null) {
+ return null;
+ }
+ Column[] cols = schema.getColumnsArray();
+ Object[] object = new Object[cols.length];
+
+ Set<String> keys = data.keySet();
+ int i = 0;
+ for (String key : keys) {
+
+ Integer nameIndex = schema.getColumnNameIndex(key);
+ Object obj = data.get(key);
+ Column column = cols[nameIndex];
+ switch (column.getType()) {
+ case ARRAY:
+ case SET:
+ object[nameIndex] = toList((JSONArray) obj).toArray();
+ break;
+ case MAP:
+ object[nameIndex] = toMap((JSONObject) obj);
+ break;
+ case ENUM:
+ case TEXT:
+ object[nameIndex] = toText(obj.toString());
+ break;
+ case BINARY:
+ case UNKNOWN:
+ // JSON spec is to store byte array as base64 encoded
+ object[nameIndex] = Base64.decodeBase64(obj.toString());
+ break;
+ case FIXED_POINT:
+ object[nameIndex] = toFixedPoint(obj.toString(), column);
+ break;
+ case FLOATING_POINT:
+ object[nameIndex] = toFloatingPoint(obj.toString(), column);
+ break;
+ case DECIMAL:
+ object[nameIndex] = toDecimal(obj.toString(), column);
+ break;
+ case DATE:
+ object[nameIndex] = toDate(obj.toString(), column);
+ break;
+ case TIME:
+ object[nameIndex] = toTime(obj.toString(), column);
+ break;
+ case DATE_TIME:
+ object[nameIndex] = toDateTime(obj.toString(), column);
+ break;
+ case BIT:
+ object[nameIndex] = toBit(obj.toString());
+ break;
+ default:
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+ "Column type from schema was not recognized for " + cols[i].getType());
+ }
+
+ }
+ return object;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java
new file mode 100644
index 0000000..86c71b6
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormatError.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sqoop.connector.idf;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum JSONIntermediateDataFormatError implements ErrorCode {
+ /** An unknown error has occurred. */
+ JSON_INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
+
+ JSON_INTERMEDIATE_DATA_FORMAT_0001("JSON array parse error."),
+
+ JSON_INTERMEDIATE_DATA_FORMAT_0002("JSON object parse error."),
+
+ JSON_INTERMEDIATE_DATA_FORMAT_0003("Missing key in the JSON object."),
+
+ ;
+
+ private final String message;
+
+ private JSONIntermediateDataFormatError(String message) {
+ this.message = message;
+ }
+
+ public String getCode() {
+ return name();
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index f6852a0..2602c61 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -188,6 +188,16 @@ public class TestCSVIntermediateDataFormat {
assertEquals(testData, dataFormat.getCSVTextData());
}
+
+ @Test
+ public void testInputAsCSVTextInAndDataOut() {
+ String testData = "'ENUM',10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+ + ",'" + String.valueOf(0x0A) + "'";
+ dataFormat.setCSVTextData(testData);
+ assertEquals(testData, dataFormat.getData());
+ }
+
+
@Test
public void testInputAsCSVTextInObjectOut() {
@@ -219,7 +229,7 @@ public class TestCSVIntermediateDataFormat {
}
@Test
- public void testInputAsObjectArayInCSVTextOut() {
+ public void testInputAsObjectArayInCSVTextOrDataOut() {
Schema schema = new Schema("test");
schema.addColumn(new FixedPoint("1"))
.addColumn(new FixedPoint("2"))
@@ -246,6 +256,7 @@ public class TestCSVIntermediateDataFormat {
String testData = "10,34,'54','random data'," + getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r")
+ ",'\\n','TEST_ENUM'";
assertEquals(testData, dataFormat.getCSVTextData());
+ assertEquals(testData, dataFormat.getData());
}
@Test
@@ -384,7 +395,7 @@ public class TestCSVIntermediateDataFormat {
org.joda.time.LocalTime time = new org.joda.time.LocalTime(15, 0, 0);
Object[] in = { time, "test" };
dataFormat.setObjectData(in);
- assertEquals("'15:00:00.000000','test'", dataFormat.getCSVTextData());
+ assertEquals("'15:00:00.000','test'", dataFormat.getCSVTextData());
}
@Test
@@ -854,7 +865,8 @@ public class TestCSVIntermediateDataFormat {
dataFormat.setObjectData(data);
Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
assertEquals(2, expectedArray.length);
- assertEquals(Arrays.deepToString(arrayOfArrays), Arrays.deepToString(expectedArray));
+ String arrayOfArraysString = "[[11,12], [14,15]]";
+ assertEquals(arrayOfArraysString, Arrays.toString(expectedArray));
assertEquals("text", dataFormat.getObjectData()[1]);
}
@@ -879,7 +891,7 @@ public class TestCSVIntermediateDataFormat {
dataFormat.setCSVTextData("'[\"[11, 12]\",\"[14, 15]\"]','text'");
Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
assertEquals(2, expectedArray.length);
- assertEquals(Arrays.deepToString(arrayOfArrays), Arrays.deepToString(expectedArray));
+ assertEquals(Arrays.deepToString(arrayOfArrays), Arrays.toString(expectedArray));
assertEquals("text", dataFormat.getObjectData()[1]);
}
@@ -890,7 +902,7 @@ public class TestCSVIntermediateDataFormat {
new org.apache.sqoop.schema.type.Array("array", new FixedPoint("ft"))));
schema.addColumn(new org.apache.sqoop.schema.type.Text("2"));
dataFormat.setSchema(schema);
- String input = "'[\"[11, 12]\",\"[14, 15]\"]','text'";
+ String input = "'[[11, 12],[14, 15]]','text'";
dataFormat.setCSVTextData(input);
Object[] expectedArray = (Object[]) dataFormat.getObjectData()[0];
assertEquals(2, expectedArray.length);
@@ -916,7 +928,7 @@ public class TestCSVIntermediateDataFormat {
data[0] = arrayOfArrays;
data[1] = "text";
dataFormat.setObjectData(data);
- String expected = "'[\"[11, 12]\",\"[14, 15]\"]','text'";
+ String expected = "'[[11,12],[14,15]]','text'";
assertEquals(expected, dataFormat.getCSVTextData());
}
//**************test cases for map**********************
http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
new file mode 100644
index 0000000..8672712
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestJSONIntermediateDataFormat.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.sqoop.connector.common.SqoopIDFUtils;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Array;
+import org.apache.sqoop.schema.type.Binary;
+import org.apache.sqoop.schema.type.Bit;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.joda.time.LocalDateTime;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestJSONIntermediateDataFormat {
+
+ private JSONIntermediateDataFormat dataFormat;
+ private final static String csvArray = "'[[11,11],[14,15]]'";
+ private final static String map = "'{\"testKey\":\"testValue\"}'";
+ private final static String csvSet = "'[[11,12],[14,15]]'";
+ private final static String date = "'2014-10-01'";
+ private final static String dateTime = "'2014-10-01 12:00:00.000'";
+ private final static String time = "'12:59:59'";
+
+ @Before
+ public void setUp() {
+ createJSONIDF();
+ }
+
+ private void createJSONIDF() {
+ Schema schema = new Schema("test");
+ schema.addColumn(new FixedPoint("1")).addColumn(new FixedPoint("2", 2L, false)).addColumn(new Text("3"))
+ .addColumn(new Text("4")).addColumn(new Binary("5")).addColumn(new Text("6"))
+ .addColumn(new org.apache.sqoop.schema.type.Enum("7"))
+ .addColumn(new Array("8", new Array("array", new FixedPoint("ft"))))
+ .addColumn(new org.apache.sqoop.schema.type.Map("9", new Text("t1"), new Text("t2"))).addColumn(new Bit("10"))
+ .addColumn(new org.apache.sqoop.schema.type.DateTime("11", true, false))
+ .addColumn(new org.apache.sqoop.schema.type.Time("12", false)).addColumn(new org.apache.sqoop.schema.type.Date("13"))
+ .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("14"))
+ .addColumn(new org.apache.sqoop.schema.type.Set("15", new Array("set", new FixedPoint("ftw"))));
+ dataFormat = new JSONIntermediateDataFormat(schema);
+ }
+
+ /**
+ * setCSVGetData setCSVGetObjectArray setCSVGetCSV
+ */
+ @Test
+ public void testInputAsCSVTextInAndDataOut() {
+
+ String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
+ + csvSet;
+ dataFormat.setCSVTextData(csvText);
+ String jsonExpected = "{\"15\":[[11,12],[14,15]],\"13\":\"2014-10-01\",\"14\":13.44,\"11\":\"2014-10-01 12:00:00.000\","
+ + "\"12\":\"12:59:59\",\"3\":\"54\",\"2\":34,\"1\":10,\"10\":true,\"7\":\"ENUM\",\"6\":\"10\",\"5\":\"kDY=\",\"4\":\"random data\","
+ + "\"9\":{\"testKey\":\"testValue\"},\"8\":[[11,11],[14,15]]}";
+ assertEquals(jsonExpected, dataFormat.getData().toJSONString());
+ }
+
+ @Test
+ public void testInputAsCSVTextInAndObjectArrayOut() {
+ String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
+ + csvSet;
+ dataFormat.setCSVTextData(csvText);
+ assertEquals(dataFormat.getObjectData().length, 15);
+ assertObjectArray();
+
+ }
+
+ private void assertObjectArray() {
+ Object[] out = dataFormat.getObjectData();
+ assertEquals(10L, out[0]);
+ assertEquals(34, out[1]);
+ assertEquals("54", out[2]);
+ assertEquals("random data", out[3]);
+ assertEquals(-112, ((byte[]) out[4])[0]);
+ assertEquals(54, ((byte[]) out[4])[1]);
+ assertEquals("10", out[5]);
+ assertEquals("ENUM", out[6]);
+
+ Object[] givenArrayOne = new Object[2];
+ givenArrayOne[0] = 11;
+ givenArrayOne[1] = 11;
+ Object[] givenArrayTwo = new Object[2];
+ givenArrayTwo[0] = 14;
+ givenArrayTwo[1] = 15;
+ Object[] arrayOfArrays = new Object[2];
+ arrayOfArrays[0] = givenArrayOne;
+ arrayOfArrays[1] = givenArrayTwo;
+ Map<Object, Object> map = new HashMap<Object, Object>();
+ map.put("testKey", "testValue");
+ // no time zone
+ LocalDateTime dateTime = new org.joda.time.LocalDateTime(2014, 10, 01, 12, 0, 0);
+ org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59);
+ org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
+ Object[] set0 = new Object[2];
+ set0[0] = 11;
+ set0[1] = 12;
+ Object[] set1 = new Object[2];
+ set1[0] = 14;
+ set1[1] = 15;
+ Object[] set = new Object[2];
+ set[0] = set0;
+ set[1] = set1;
+ out[14] = set;
+ assertEquals(arrayOfArrays.length, 2);
+ assertEquals(Arrays.deepToString(arrayOfArrays), Arrays.deepToString((Object[]) out[7]));
+ assertEquals(map, out[8]);
+ assertEquals(true, out[9]);
+ assertEquals(dateTime, out[10]);
+ assertEquals(time, out[11]);
+ assertEquals(date, out[12]);
+ assertEquals(13.44, out[13]);
+ assertEquals(set.length, 2);
+ assertEquals(Arrays.deepToString(set), Arrays.deepToString((Object[]) out[14]));
+
+ }
+
+ @Test
+ public void testInputAsCSVTextInCSVTextOut() {
+ String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
+ + csvSet;
+ dataFormat.setCSVTextData(csvText);
+ assertEquals(csvText, dataFormat.getCSVTextData());
+ }
+
+ @SuppressWarnings("unchecked")
+ private JSONObject createJSONObject() {
+ JSONObject json = new JSONObject();
+ json.put("1", 10L);
+ json.put("2", 34);
+ json.put("3", "54");
+ json.put("4", "random data");
+ json.put("5", org.apache.commons.codec.binary.Base64.encodeBase64String(new byte[] { (byte) -112, (byte) 54 }));
+ json.put("6", String.valueOf(0x0A));
+ json.put("7", "ENUM");
+ JSONArray givenArrayOne = new JSONArray();
+ givenArrayOne.add(11);
+ givenArrayOne.add(11);
+ JSONArray givenArrayTwo = new JSONArray();
+ givenArrayTwo.add(14);
+ givenArrayTwo.add(15);
+
+ JSONArray arrayOfArrays = new JSONArray();
+ arrayOfArrays.add(givenArrayOne);
+ arrayOfArrays.add(givenArrayTwo);
+
+ JSONObject map = new JSONObject();
+ map.put("testKey", "testValue");
+
+ json.put("8", arrayOfArrays);
+ json.put("9", map);
+ json.put("10", true);
+
+ // expect dates as strings
+ json.put("11", SqoopIDFUtils.removeQuotes(dateTime));
+ json.put("12", SqoopIDFUtils.removeQuotes(time));
+ json.put("13", SqoopIDFUtils.removeQuotes(date));
+ json.put("14", 13.44);
+
+ JSONArray givenSetOne = new JSONArray();
+ givenSetOne.add(11);
+ givenSetOne.add(12);
+ JSONArray givenSetTwo = new JSONArray();
+ givenSetTwo.add(14);
+ givenSetTwo.add(15);
+ JSONArray arrayOfSet = new JSONArray();
+ arrayOfSet.add(givenSetOne);
+ arrayOfSet.add(givenSetTwo);
+
+ json.put("15", arrayOfSet);
+ return json;
+ }
+
+ /**
+ * setDataGetCSV setDataGetObjectArray setDataGetData
+ */
+ @Test
+ public void testInputAsDataInAndCSVOut() {
+
+ String csvExpected = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
+ + csvSet;
+ dataFormat.setData(createJSONObject());
+ assertEquals(csvExpected, dataFormat.getCSVTextData());
+ }
+
+ @Test
+ public void testInputAsDataInAndObjectArrayOut() {
+ JSONObject json = createJSONObject();
+ dataFormat.setData(json);
+ assertObjectArray();
+ }
+
+ @Test
+ public void testInputAsDataInAndDataOut() {
+ JSONObject json = createJSONObject();
+ dataFormat.setData(json);
+ assertEquals(json, dataFormat.getData());
+ }
+
+ private Object[] createObjectArray() {
+ Object[] out = new Object[15];
+ out[0] = 10L;
+ out[1] = 34;
+ out[2] = "54";
+ out[3] = "random data";
+ out[4] = new byte[] { (byte) -112, (byte) 54 };
+ out[5] = String.valueOf(0x0A);
+ out[6] = "ENUM";
+
+ Object[] givenArrayOne = new Object[2];
+ givenArrayOne[0] = 11;
+ givenArrayOne[1] = 11;
+ Object[] givenArrayTwo = new Object[2];
+ givenArrayTwo[0] = 14;
+ givenArrayTwo[1] = 15;
+
+ Object[] arrayOfArrays = new Object[2];
+ arrayOfArrays[0] = givenArrayOne;
+ arrayOfArrays[1] = givenArrayTwo;
+
+ Map<Object, Object> map = new HashMap<Object, Object>();
+ map.put("testKey", "testValue");
+
+ out[7] = arrayOfArrays;
+ out[8] = map;
+ out[9] = true;
+ org.joda.time.DateTime dateTime = new org.joda.time.DateTime(2014, 10, 01, 12, 0, 0, 0);
+ out[10] = dateTime;
+ org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59);
+
+ out[11] = time;
+ org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
+
+ out[12] = date;
+
+ out[13] = 13.44;
+ Object[] set0 = new Object[2];
+ set0[0] = 11;
+ set0[1] = 12;
+ Object[] set1 = new Object[2];
+ set1[0] = 14;
+ set1[1] = 15;
+
+ Object[] set = new Object[2];
+ set[0] = set0;
+ set[1] = set1;
+ out[14] = set;
+ return out;
+ }
+
+ /**
+ * setObjectArrayGetData setObjectArrayGetCSV setObjectArrayGetObjectArray
+ */
+ @Test
+ public void testInputAsObjectArrayInAndDataOut() {
+
+ Object[] out = createObjectArray();
+ dataFormat.setObjectData(out);
+ JSONObject json = createJSONObject();
+ assertEquals(json, dataFormat.getData());
+ }
+
+ @Test
+ public void testInputAsObjectArrayInAndCSVOut() {
+ Object[] out = createObjectArray();
+ dataFormat.setObjectData(out);
+ String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + dateTime + "," + time + "," + date + ",13.44,"
+ + csvSet;
+ assertEquals(csvText, dataFormat.getCSVTextData());
+ }
+
+ @Test
+ public void testInputAsObjectArrayInAndObjectArrayOut() {
+ Object[] out = createObjectArray();
+ dataFormat.setObjectData(out);
+ assertObjectArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/4ffa806b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2381566..4dbc48f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -94,6 +94,7 @@ limitations under the License.
<commons-lang.version>2.5</commons-lang.version>
<commons-io.version>2.4</commons-io.version>
<commons-compress.version>1.9</commons-compress.version>
+ <commons-codec.version>1.9</commons-codec.version>
<derby.version>10.8.2.2</derby.version>
<hadoop.1.version>1.0.3</hadoop.1.version>
<hadoop.2.version>2.6.0</hadoop.2.version>
@@ -444,11 +445,16 @@ limitations under the License.
<artifactId>commons-cli</artifactId>
<version>${commons-cli.version}</version>
</dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>${commons-io.version}</version>
- </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>${commons-io.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>${commons-codec.version}</version>
+ </dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>