You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/01/08 03:30:14 UTC
sqoop git commit: SQOOP-1902: Sqoop2: Avro IDF class and unit tests
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 6d009c6c0 -> 3bb9c595d
SQOOP-1902: Sqoop2: Avro IDF class and unit tests
(Veena Basavaraj via Abraham Elmahrek)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3bb9c595
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3bb9c595
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3bb9c595
Branch: refs/heads/sqoop2
Commit: 3bb9c595d9d6df99b758c664604e5eccd971ef5c
Parents: 6d009c6
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Wed Jan 7 18:28:39 2015 -0800
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Wed Jan 7 18:28:39 2015 -0800
----------------------------------------------------------------------
connector/connector-sdk/pom.xml | 4 +
.../sqoop/connector/common/SqoopAvroUtils.java | 128 ++++++
.../sqoop/connector/common/SqoopIDFUtils.java | 91 +++-
.../idf/AVROIntermediateDataFormat.java | 431 +++++++++++++++++++
.../idf/AVROIntermediateDataFormatError.java | 45 ++
.../idf/CSVIntermediateDataFormat.java | 2 +-
.../idf/CSVIntermediateDataFormatError.java | 7 +-
.../connector/idf/IntermediateDataFormat.java | 8 +-
.../idf/IntermediateDataFormatError.java | 3 +
.../idf/JSONIntermediateDataFormat.java | 17 +-
.../idf/TestAVROIntermediateDataFormat.java | 319 ++++++++++++++
pom.xml | 6 +
12 files changed, 1016 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/pom.xml b/connector/connector-sdk/pom.xml
index 46cb9f8..2795084 100644
--- a/connector/connector-sdk/pom.xml
+++ b/connector/connector-sdk/pom.xml
@@ -50,6 +50,10 @@ limitations under the License.
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
new file mode 100644
index 0000000..e47d8fe
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.common;
+
+import org.apache.avro.Schema;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.IntermediateDataFormatError;
+import org.apache.sqoop.schema.type.AbstractComplexListType;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class SqoopAvroUtils {
+
+ public static final String COLUMN_TYPE = "columnType";
+ public static final String SQOOP_SCHEMA_NAMESPACE = "org.apache.sqoop";
+
+ /**
+ * Creates an Avro schema from a Sqoop schema.
+ */
+ public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) {
+ String name = sqoopSchema.getName();
+ String doc = sqoopSchema.getNote();
+ String namespace = SQOOP_SCHEMA_NAMESPACE;
+ Schema schema = Schema.createRecord(name, doc, namespace, false);
+
+ List<Schema.Field> fields = new ArrayList<Schema.Field>();
+ for (Column column : sqoopSchema.getColumnsArray()) {
+ Schema.Field field = new Schema.Field(column.getName(), createAvroFieldSchema(column), null, null);
+ field.addProp(COLUMN_TYPE, column.getType().toString());
+ fields.add(field);
+ }
+ schema.setFields(fields);
+ return schema;
+ }
+
+ public static Schema createAvroFieldSchema(Column column) {
+ Schema schema = toAvroFieldType(column);
+ if (!column.getNullable()) {
+ return schema;
+ } else {
+ List<Schema> union = new ArrayList<Schema>();
+ union.add(schema);
+ union.add(Schema.create(Schema.Type.NULL));
+ return Schema.createUnion(union);
+ }
+ }
+
+ public static Schema toAvroFieldType(Column column) throws IllegalArgumentException {
+ switch (column.getType()) {
+ case ARRAY:
+ case SET:
+ AbstractComplexListType listColumn = (AbstractComplexListType) column;
+ return Schema.createArray(toAvroFieldType(listColumn.getListType()));
+ case UNKNOWN:
+ case BINARY:
+ return Schema.create(Schema.Type.BYTES);
+ case BIT:
+ return Schema.create(Schema.Type.BOOLEAN);
+ case DATE:
+ case DATE_TIME:
+ case TIME:
+ // avro 1.8 will have date type
+ // https://issues.apache.org/jira/browse/AVRO-739
+ return Schema.create(Schema.Type.LONG);
+ case DECIMAL:
+ // TODO: is string ok, used it since kite code seems to use it
+ return Schema.create(Schema.Type.STRING);
+ case ENUM:
+ return createEnumSchema(column);
+ case FIXED_POINT:
+ Long byteSize = ((FixedPoint) column).getByteSize();
+ if (byteSize != null && byteSize <= Integer.SIZE) {
+ return Schema.create(Schema.Type.INT);
+ } else {
+ return Schema.create(Schema.Type.LONG);
+ }
+ case FLOATING_POINT:
+ byteSize = ((FloatingPoint) column).getByteSize();
+ if (byteSize != null && byteSize <= Float.SIZE) {
+ return Schema.create(Schema.Type.FLOAT);
+ } else {
+ return Schema.create(Schema.Type.DOUBLE);
+ }
+ case MAP:
+ org.apache.sqoop.schema.type.Map mapColumn = (org.apache.sqoop.schema.type.Map) column;
+ return Schema.createArray(toAvroFieldType(mapColumn.getValue()));
+ case TEXT:
+ return Schema.create(Schema.Type.STRING);
+ default:
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, column.getType().name());
+ }
+ }
+
+ public static Schema createEnumSchema(Column column) {
+ Set<String> options = ((org.apache.sqoop.schema.type.Enum) column).getOptions();
+ List<String> listOptions = new ArrayList<String>(options);
+ return Schema.createEnum(column.getName(), null, SQOOP_SCHEMA_NAMESPACE, listOptions);
+ }
+
+ public static byte[] getBytesFromByteBuffer(Object obj) {
+ ByteBuffer buffer = (ByteBuffer) obj;
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.duplicate().get(bytes);
+ return bytes;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
index 979aa4f..26ff629 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
@@ -68,15 +68,10 @@ public class SqoopIDFUtils {
public static final char QUOTE_CHARACTER = '\'';
// string related replacements
- private static final String[] replacements = {
- new String(new char[] { ESCAPE_CHARACTER, '\\' }),
- new String(new char[] { ESCAPE_CHARACTER, '0' }),
- new String(new char[] { ESCAPE_CHARACTER, 'n' }),
- new String(new char[] { ESCAPE_CHARACTER, 'r' }),
- new String(new char[] { ESCAPE_CHARACTER, 'Z' }),
- new String(new char[] { ESCAPE_CHARACTER, '\"' }),
- new String(new char[] { ESCAPE_CHARACTER, '\'' })
- };
+ private static final String[] replacements = { new String(new char[] { ESCAPE_CHARACTER, '\\' }),
+ new String(new char[] { ESCAPE_CHARACTER, '0' }), new String(new char[] { ESCAPE_CHARACTER, 'n' }),
+ new String(new char[] { ESCAPE_CHARACTER, 'r' }), new String(new char[] { ESCAPE_CHARACTER, 'Z' }),
+ new String(new char[] { ESCAPE_CHARACTER, '\"' }), new String(new char[] { ESCAPE_CHARACTER, '\'' }) };
// http://www.joda.org/joda-time/key_format.html provides details on the
// formatter token
@@ -140,8 +135,9 @@ public class SqoopIDFUtils {
}
public static String encodeToCSVDecimal(Object obj) {
- return ((BigDecimal)obj).toString();
+ return ((BigDecimal) obj).toString();
}
+
public static Object toDecimal(String csvString, Column column) {
return new BigDecimal(csvString);
}
@@ -152,7 +148,8 @@ public class SqoopIDFUtils {
if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
return bitStringValue;
} else {
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006, " given bit value: " + bitStringValue);
+ throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0005, " given bit value: "
+ + bitStringValue);
}
}
@@ -161,7 +158,7 @@ public class SqoopIDFUtils {
return TRUE_BIT_SET.contains(csvString);
} else {
// throw an exception for any unsupported value for BITs
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006, " given bit value: " + csvString);
+ throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0005, " given bit value: " + csvString);
}
}
@@ -200,7 +197,6 @@ public class SqoopIDFUtils {
}
}
-
public static String encodeToCSVDateTime(Object obj, Column col) {
org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj;
org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
@@ -236,6 +232,27 @@ public class SqoopIDFUtils {
return returnValue;
}
+ public static Long toDateTimeInMillis(String csvString, Column column) {
+ long returnValue;
+ String dateTime = removeQuotes(csvString);
+ org.apache.sqoop.schema.type.DateTime col = ((org.apache.sqoop.schema.type.DateTime) column);
+ if (col.hasFraction() && col.hasTimezone()) {
+ // After calling withOffsetParsed method, a string
+ // '2004-06-09T10:20:30-08:00' will create a datetime with a zone of
+ // -08:00 (a fixed zone, with no daylight savings rules)
+ returnValue = dtfWithFractionAndTimeZone.withOffsetParsed().parseDateTime(dateTime).toDate().getTime();
+ } else if (col.hasFraction() && !col.hasTimezone()) {
+ // we use local date time explicitly to not include the timezone
+ returnValue = dtfWithFractionNoTimeZone.parseLocalDateTime(dateTime).toDate().getTime();
+ } else if (col.hasTimezone()) {
+ returnValue = dtfWithNoFractionWithTimeZone.withOffsetParsed().parseDateTime(dateTime).toDate().getTime();
+ } else {
+ // we use local date time explicitly to not include the timezone
+ returnValue = dtfWithNoFractionAndTimeZone.parseLocalDateTime(dateTime).toDate().getTime();
+ }
+ return returnValue;
+ }
+
// ************ MAP Column Type utils*********
@SuppressWarnings("unchecked")
@@ -301,7 +318,7 @@ public class SqoopIDFUtils {
List<Object> elementList = new ArrayList<Object>();
for (int n = 0; n < list.length; n++) {
Column listType = ((AbstractComplexListType) column).getListType();
- //2 level nesting supported
+ // 2 level nesting supported
if (isColumnListType(listType)) {
Object[] listElements = (Object[]) list[n];
JSONArray subArray = new JSONArray();
@@ -332,6 +349,44 @@ public class SqoopIDFUtils {
return null;
}
+ @SuppressWarnings("unchecked")
+ public static JSONArray toJSONArray(Object[] objectArray) {
+ JSONArray jsonArray = new JSONArray();
+ for (int i = 0; i < objectArray.length; i++) {
+ Object value = objectArray[i];
+ if (value instanceof Object[]) {
+ value = toJSONArray((Object[]) value);
+ }
+ jsonArray.add(value);
+ }
+ return jsonArray;
+ }
+
+ public static List<Object> toList(Object[] objectArray) {
+ List<Object> objList = new ArrayList<Object>();
+ for (int i = 0; i < objectArray.length; i++) {
+ Object value = objectArray[i];
+ if (value instanceof Object[]) {
+ value = toList((Object[]) value);
+ }
+ objList.add(value);
+ }
+ return objList;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Object[] toObjectArray(List<Object> list) {
+ Object[] array = new Object[list.size()];
+ for (int i = 0; i < list.size(); i++) {
+ Object value = list.get(i);
+ if (value instanceof List) {
+ value = toObjectArray((List<Object>) value);
+ }
+ array[i] = value;
+ }
+ return array;
+ }
+
// ************ TEXT Column Type utils*********
private static String getRegExp(char character) {
@@ -350,8 +405,8 @@ public class SqoopIDFUtils {
replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j]));
}
} catch (Exception e) {
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + " " + replacement + " "
- + String.valueOf(j) + " " + e.getMessage());
+ throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + " " + replacement
+ + " " + String.valueOf(j) + " " + e.getMessage());
}
return encloseWithQuote(replacement);
}
@@ -365,8 +420,8 @@ public class SqoopIDFUtils {
string = string.replaceAll(getRegExp(replacements[j]), Matcher.quoteReplacement(String.valueOf(originals[j])));
}
} catch (Exception e) {
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, string + " " + String.valueOf(j)
- + e.getMessage());
+ throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, string + " "
+ + String.valueOf(j) + e.getMessage());
}
return string;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
new file mode 100644
index 0000000..b12b59a
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
+import static org.apache.sqoop.connector.common.SqoopAvroUtils.*;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.util.Utf8;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.utils.ClassUtils;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * IDF representing the intermediate format in Avro object
+ */
+public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRecord> {
+
+ private Schema avroSchema;
+
+ // need this default constructor for reflection magic used in execution engine
+ public AVROIntermediateDataFormat() {
+ }
+
+ // We need schema at all times
+ public AVROIntermediateDataFormat(org.apache.sqoop.schema.Schema schema) {
+ setSchema(schema);
+ avroSchema = createAvroSchema(schema);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setCSVTextData(String text) {
+ // convert the CSV text to avro
+ this.data = toAVRO(text);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getCSVTextData() {
+ // convert avro to sqoop CSV
+ return toCSV(data);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setObjectData(Object[] data) {
+ // convert the object array to avro
+ this.data = toAVRO(data);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Object[] getObjectData() {
+ // convert avro to object array
+ return toObject(data);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // do we need to write the schema?
+ DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroSchema);
+ BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder((DataOutputStream) out, null);
+ writer.write(data, encoder);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void read(DataInput in) throws IOException {
+ DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(avroSchema);
+ Decoder decoder = DecoderFactory.get().binaryDecoder((InputStream) in, null);
+ data = reader.read(null, decoder);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Set<String> getJars() {
+
+ Set<String> jars = super.getJars();
+ jars.add(ClassUtils.jarForClass(GenericRecord.class));
+ return jars;
+ }
+
+ private GenericRecord toAVRO(String csv) {
+
+ String[] csvStringArray = parseCSVString(csv);
+
+ if (csvStringArray == null) {
+ return null;
+ }
+
+ if (csvStringArray.length != schema.getColumnsArray().length) {
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + csv
+ + " has the wrong number of fields.");
+ }
+ GenericRecord avroObject = new GenericData.Record(avroSchema);
+ Column[] columnArray = schema.getColumnsArray();
+ for (int i = 0; i < csvStringArray.length; i++) {
+ // check for NULL field and assume this field is nullable as per the sqoop
+ // schema
+ if (csvStringArray[i].equals(NULL_VALUE) && columnArray[i].getNullable()) {
+ avroObject.put(columnArray[i].getName(), null);
+ continue;
+ }
+ avroObject.put(columnArray[i].getName(), toAVRO(csvStringArray[i], columnArray[i]));
+ }
+ return avroObject;
+ }
+
+ private Object toAVRO(String csvString, Column column) {
+ Object returnValue = null;
+
+ switch (column.getType()) {
+ case ARRAY:
+ case SET:
+ Object[] list = toList(csvString);
+ // store as a java collection
+ returnValue = Arrays.asList(list);
+ break;
+ case MAP:
+ // store as a map
+ returnValue = toMap(csvString);
+ break;
+ case ENUM:
+ returnValue = new GenericData.EnumSymbol(createEnumSchema(column), (removeQuotes(csvString)));
+ break;
+ case TEXT:
+ returnValue = new Utf8(removeQuotes(csvString));
+ break;
+ case BINARY:
+ case UNKNOWN:
+ // avro accepts byte buffer for binary data
+ returnValue = ByteBuffer.wrap(toByteArray(csvString));
+ break;
+ case FIXED_POINT:
+ returnValue = toFixedPoint(csvString, column);
+ break;
+ case FLOATING_POINT:
+ returnValue = toFloatingPoint(csvString, column);
+ break;
+ case DECIMAL:
+ // TODO: store as FIXED in SQOOP-16161
+ returnValue = removeQuotes(csvString);
+ break;
+ case DATE:
+ // until 1.8 avro store as long
+ returnValue = ((LocalDate) toDate(csvString, column)).toDate().getTime();
+ break;
+ case TIME:
+ // until 1.8 avro store as long
+ returnValue = ((LocalTime) toTime(csvString, column)).toDateTimeToday().getMillis();
+ break;
+ case DATE_TIME:
+ // until 1.8 avro store as long
+ returnValue = toDateTimeInMillis(csvString, column);
+ break;
+ case BIT:
+ returnValue = Boolean.valueOf(removeQuotes(csvString));
+ break;
+ default:
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
+ "Column type from schema was not recognized for " + column.getType());
+ }
+ return returnValue;
+ }
+
+ private GenericRecord toAVRO(Object[] data) {
+
+ if (data == null) {
+ return null;
+ }
+
+ if (data.length != schema.getColumnsArray().length) {
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + data.toString()
+ + " has the wrong number of fields.");
+ }
+ // get avro schema from sqoop schema
+ GenericRecord avroObject = new GenericData.Record(avroSchema);
+ Column[] cols = schema.getColumnsArray();
+ for (int i = 0; i < data.length; i++) {
+ switch (cols[i].getType()) {
+ case ARRAY:
+ case SET:
+ avroObject.put(cols[i].getName(), toList((Object[]) data[i]));
+ break;
+ case MAP:
+ avroObject.put(cols[i].getName(), data[i]);
+ break;
+ case ENUM:
+ GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(createEnumSchema(cols[i]), (String) data[i]);
+ avroObject.put(cols[i].getName(), enumValue);
+ break;
+ case TEXT:
+ avroObject.put(cols[i].getName(), new Utf8((String) data[i]));
+ break;
+ case BINARY:
+ case UNKNOWN:
+ avroObject.put(cols[i].getName(), ByteBuffer.wrap((byte[]) data[i]));
+ break;
+ case FIXED_POINT:
+ case FLOATING_POINT:
+ avroObject.put(cols[i].getName(), data[i]);
+ break;
+ case DECIMAL:
+ // TODO: store as FIXED in SQOOP-16161
+ avroObject.put(cols[i].getName(), ((BigDecimal) data[i]).toPlainString());
+ break;
+ case DATE_TIME:
+ if (data[i] instanceof org.joda.time.DateTime) {
+ avroObject.put(cols[i].getName(), ((org.joda.time.DateTime) data[i]).toDate().getTime());
+ } else if (data[i] instanceof org.joda.time.LocalDateTime) {
+ avroObject.put(cols[i].getName(), ((org.joda.time.LocalDateTime) data[i]).toDate().getTime());
+ }
+ break;
+ case TIME:
+ avroObject.put(cols[i].getName(), ((org.joda.time.LocalTime) data[i]).toDateTimeToday().getMillis());
+ break;
+ case DATE:
+ avroObject.put(cols[i].getName(), ((org.joda.time.LocalDate) data[i]).toDate().getTime());
+ break;
+ case BIT:
+ avroObject.put(cols[i].getName(), Boolean.valueOf((Boolean) data[i]));
+ break;
+ default:
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+ "Column type from schema was not recognized for " + cols[i].getType());
+ }
+ }
+
+ return avroObject;
+ }
+
+ @SuppressWarnings("unchecked")
+ private String toCSV(GenericRecord record) {
+ Column[] cols = this.schema.getColumnsArray();
+
+ StringBuilder csvString = new StringBuilder();
+ for (int i = 0; i < cols.length; i++) {
+
+ Object obj = record.get(cols[i].getName());
+
+ if (obj == null) {
+ throw new SqoopException(AVROIntermediateDataFormatError.AVRO_INTERMEDIATE_DATA_FORMAT_0001, " for " + cols[i].getName());
+ }
+
+ switch (cols[i].getType()) {
+ case ARRAY:
+ case SET:
+ List<Object> objList = (List<Object>) obj;
+ csvString.append(encodeToCSVList(toObjectArray(objList), cols[i]));
+ break;
+ case MAP:
+ Map<Object, Object> objMap = (Map<Object, Object>) obj;
+ csvString.append(encodeToCSVMap(objMap, cols[i]));
+ break;
+ case ENUM:
+ case TEXT:
+ csvString.append(encodeToCSVString(obj.toString()));
+ break;
+ case BINARY:
+ case UNKNOWN:
+ csvString.append(encodeToCSVByteArray(getBytesFromByteBuffer(obj)));
+ break;
+ case FIXED_POINT:
+ csvString.append(encodeToCSVFixedPoint(obj, cols[i]));
+ break;
+ case FLOATING_POINT:
+ csvString.append(encodeToCSVFloatingPoint(obj, cols[i]));
+ break;
+ case DECIMAL:
+ // stored as string
+ csvString.append(encodeToCSVDecimal(obj));
+ break;
+ case DATE:
+ // stored as long
+ Long dateInMillis = (Long) obj;
+ csvString.append(encodeToCSVDate(new org.joda.time.LocalDate(dateInMillis)));
+ break;
+ case TIME:
+ // stored as long
+ Long timeInMillis = (Long) obj;
+ csvString.append(encodeToCSVTime(new org.joda.time.LocalTime(timeInMillis), cols[i]));
+ break;
+ case DATE_TIME:
+ // stored as long
+ Long dateTimeInMillis = (Long) obj;
+ csvString.append(encodeToCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), cols[i]));
+ break;
+ case BIT:
+ csvString.append(encodeToCSVBit(obj));
+ break;
+ default:
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+ "Column type from schema was not recognized for " + cols[i].getType());
+ }
+ if (i < cols.length - 1) {
+ csvString.append(CSV_SEPARATOR_CHARACTER);
+ }
+
+ }
+
+ return csvString.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Object[] toObject(GenericRecord record) {
+
+ if (data == null) {
+ return null;
+ }
+ Column[] cols = schema.getColumnsArray();
+ Object[] object = new Object[cols.length];
+
+ for (int i = 0; i < cols.length; i++) {
+ Object obj = record.get(cols[i].getName());
+ if (obj == null) {
+ throw new SqoopException(AVROIntermediateDataFormatError.AVRO_INTERMEDIATE_DATA_FORMAT_0001, " for " + cols[i].getName());
+ }
+ Integer nameIndex = schema.getColumnNameIndex(cols[i].getName());
+ Column column = cols[nameIndex];
+ switch (column.getType()) {
+ case ARRAY:
+ case SET:
+ object[nameIndex] = toObjectArray((List<Object>) obj);
+ break;
+ case MAP:
+ object[nameIndex] = obj;
+ break;
+ case ENUM:
+ // stored as enum symbol
+ case TEXT:
+ // stored as UTF8
+ object[nameIndex] = obj.toString();
+ break;
+ case BINARY:
+ case UNKNOWN:
+ // stored as byte buffer
+ object[nameIndex] = getBytesFromByteBuffer(obj);
+ break;
+ case FIXED_POINT:
+ case FLOATING_POINT:
+ // stored as java objects in avro as well
+ object[nameIndex] = obj;
+ break;
+ case DECIMAL:
+ // stored as string
+ object[nameIndex] = obj.toString();
+ break;
+ case DATE:
+ Long dateInMillis = (Long) obj;
+ object[nameIndex] = new org.joda.time.LocalDate(dateInMillis);
+ break;
+ case TIME:
+ Long timeInMillis = (Long) obj;
+ object[nameIndex] = new org.joda.time.LocalTime(timeInMillis);
+ break;
+ case DATE_TIME:
+ Long dateTimeInMillis = (Long) obj;
+ if (((org.apache.sqoop.schema.type.DateTime) column).hasTimezone()) {
+ object[nameIndex] = new org.joda.time.DateTime(dateTimeInMillis);
+ } else {
+ object[nameIndex] = new org.joda.time.LocalDateTime(dateTimeInMillis);
+ }
+ break;
+ case BIT:
+ object[nameIndex] = toBit(obj.toString());
+ break;
+ default:
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+ "Column type from schema was not recognized for " + cols[i].getType());
+ }
+
+ }
+ return object;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java
new file mode 100644
index 0000000..6af21a3
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sqoop.connector.idf;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum AVROIntermediateDataFormatError implements ErrorCode {
+ /** An unknown error has occurred. */
+ AVRO_INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
+
+ AVRO_INTERMEDIATE_DATA_FORMAT_0001("Missing key in the AVRO object.")
+
+ ;
+
+ private final String message;
+
+ private AVROIntermediateDataFormatError(String message) {
+ this.message = message;
+ }
+
+ public String getCode() {
+ return name();
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index be1147d..33b5d0a 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -152,7 +152,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
returnValue = toMap(csvString);
break;
default:
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0004,
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
"Column type from schema was not recognized for " + column.getType());
}
return returnValue;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
index a88db45..9aae251 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
@@ -33,17 +33,14 @@ public enum CSVIntermediateDataFormatError implements ErrorCode {
/** Error while escaping a row. */
CSV_INTERMEDIATE_DATA_FORMAT_0003("An error has occurred while unescaping a row."),
- /** Column type isn't known by Intermediate Data Format. */
- CSV_INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
-
/**
* For arrays and maps we use JSON representation and incorrect representation
* results in parse exception
*/
- CSV_INTERMEDIATE_DATA_FORMAT_0005("JSON parse internal error."),
+ CSV_INTERMEDIATE_DATA_FORMAT_0004("JSON parse internal error."),
/** Unsupported bit values */
- CSV_INTERMEDIATE_DATA_FORMAT_0006("Unsupported bit value."),
+ CSV_INTERMEDIATE_DATA_FORMAT_0005("Unsupported bit value."),
;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index adeb2ec..055b41c 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -18,11 +18,10 @@
*/
package org.apache.sqoop.connector.idf;
-import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.isColumnListType;
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.isColumnStringType;
-import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.AbstractComplexListType;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.ColumnType;
@@ -30,9 +29,6 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
import java.util.Set;
/**
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
index bda75fc..1f583b2 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
@@ -32,6 +32,9 @@ public enum IntermediateDataFormatError implements ErrorCode {
INTERMEDIATE_DATA_FORMAT_0003("JSON parse error"),
+ /** Column type isn't known by Intermediate Data Format. */
+ INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
+
;
private final String message;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
index 9329cf8..90294f0 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
@@ -189,8 +189,8 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
returnValue = Boolean.valueOf(removeQuotes(csvString));
break;
default:
- throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0004,
- "Column type from schema was not recognized for " + column.getType());
+ throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
+ "Column type from schema was not recognized for " + column.getType());
}
return returnValue;
}
@@ -261,19 +261,6 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
return object;
}
- @SuppressWarnings("unchecked")
- public static JSONArray toJSONArray(Object[] objectArray) {
- JSONArray jsonArray = new JSONArray();
- for (int i = 0; i < objectArray.length; i++) {
- Object value = objectArray[i];
- if (value instanceof Object[]) {
- value = toJSONArray((Object[]) value);
- }
- jsonArray.add(value);
- }
- return jsonArray;
- }
-
private String toCSV(JSONObject json) {
Column[] cols = this.schema.getColumnsArray();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
new file mode 100644
index 0000000..b00b3b9
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import static org.apache.sqoop.connector.common.SqoopAvroUtils.createEnumSchema;
+import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.sqoop.connector.common.SqoopAvroUtils;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Array;
+import org.apache.sqoop.schema.type.Binary;
+import org.apache.sqoop.schema.type.Bit;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.joda.time.LocalDateTime;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestAVROIntermediateDataFormat {
+
+ private AVROIntermediateDataFormat dataFormat;
+ private org.apache.avro.Schema avroSchema;
+ private final static String csvArray = "'[[11,11],[14,15]]'";
+ private final static String map = "'{\"testKey\":\"testValue\"}'";
+ private final static String csvSet = "'[[11,12],[14,15]]'";
+ private final static String csvDate = "'2014-10-01'";
+ private final static String csvDateTime = "'2014-10-01 12:00:00.000'";
+ private final static String csvTime = "'12:59:59'";
+ private Column enumCol;
+ // no time zone
+ private final static LocalDateTime dateTime = new org.joda.time.LocalDateTime(2014, 10, 01, 12, 0, 0);
+ private final static org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59);
+ private final static org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
+
+ @Before
+ public void setUp() {
+ createAvroIDF();
+ }
+
+ private void createAvroIDF() {
+ Schema sqoopSchema = new Schema("test");
+ Set<String> options = new HashSet<String>();
+ options.add("ENUM");
+ options.add("NUME");
+ enumCol = new org.apache.sqoop.schema.type.Enum("seven").setOptions(options);
+ sqoopSchema.addColumn(new FixedPoint("one")).addColumn(new FixedPoint("two", 2L, false)).addColumn(new Text("three"))
+ .addColumn(new Text("four")).addColumn(new Binary("five")).addColumn(new Text("six")).addColumn(enumCol)
+ .addColumn(new Array("eight", new Array("array", new FixedPoint("ft"))))
+ .addColumn(new org.apache.sqoop.schema.type.Map("nine", new Text("t1"), new Text("t2"))).addColumn(new Bit("ten"))
+ .addColumn(new org.apache.sqoop.schema.type.DateTime("eleven", true, false))
+ .addColumn(new org.apache.sqoop.schema.type.Time("twelve", false))
+ .addColumn(new org.apache.sqoop.schema.type.Date("thirteen"))
+ .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("fourteen"))
+ .addColumn(new org.apache.sqoop.schema.type.Set("fifteen", new Array("set", new FixedPoint("ftw"))));
+ dataFormat = new AVROIntermediateDataFormat(sqoopSchema);
+ avroSchema = SqoopAvroUtils.createAvroSchema(sqoopSchema);
+ }
+
+ /**
+ * setCSVGetData setCSVGetObjectArray setCSVGetCSV
+ */
+ @Test
+ public void testInputAsCSVTextInAndDataOut() {
+
+ String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+ + ",13.44," + csvSet;
+ dataFormat.setCSVTextData(csvText);
+ GenericRecord avroObject = createAvroGenericRecord();
+ assertEquals(avroObject.toString(), dataFormat.getData().toString());
+ }
+
+ @Test
+ public void testInputAsCSVTextInAndObjectArrayOut() {
+ String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+ + ",13.44," + csvSet;
+ dataFormat.setCSVTextData(csvText);
+ assertEquals(dataFormat.getObjectData().length, 15);
+ assertObjectArray();
+
+ }
+
+ private void assertObjectArray() {
+ Object[] out = dataFormat.getObjectData();
+ assertEquals(10L, out[0]);
+ assertEquals(34, out[1]);
+ assertEquals("54", out[2]);
+ assertEquals("random data", out[3]);
+ assertEquals(-112, ((byte[]) out[4])[0]);
+ assertEquals(54, ((byte[]) out[4])[1]);
+ assertEquals("10", out[5]);
+ assertEquals("ENUM", out[6]);
+
+ Object[] givenArrayOne = new Object[2];
+ givenArrayOne[0] = 11;
+ givenArrayOne[1] = 11;
+ Object[] givenArrayTwo = new Object[2];
+ givenArrayTwo[0] = 14;
+ givenArrayTwo[1] = 15;
+ Object[] arrayOfArrays = new Object[2];
+ arrayOfArrays[0] = givenArrayOne;
+ arrayOfArrays[1] = givenArrayTwo;
+ Map<Object, Object> map = new HashMap<Object, Object>();
+ map.put("testKey", "testValue");
+ Object[] set0 = new Object[2];
+ set0[0] = 11;
+ set0[1] = 12;
+ Object[] set1 = new Object[2];
+ set1[0] = 14;
+ set1[1] = 15;
+ Object[] set = new Object[2];
+ set[0] = set0;
+ set[1] = set1;
+ out[14] = set;
+ assertEquals(arrayOfArrays.length, 2);
+ assertEquals(Arrays.deepToString(arrayOfArrays), Arrays.deepToString((Object[]) out[7]));
+ assertEquals(map, out[8]);
+ assertEquals(true, out[9]);
+ assertEquals(dateTime, out[10]);
+ assertEquals(time, out[11]);
+ assertEquals(date, out[12]);
+ assertEquals(13.44, out[13]);
+ assertEquals(set.length, 2);
+ assertEquals(Arrays.deepToString(set), Arrays.deepToString((Object[]) out[14]));
+
+ }
+
+ @Test
+ public void testInputAsCSVTextInCSVTextOut() {
+ String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+ + ",13.44," + csvSet;
+ dataFormat.setCSVTextData(csvText);
+ assertEquals(csvText, dataFormat.getCSVTextData());
+ }
+
+ private GenericRecord createAvroGenericRecord() {
+ GenericRecord avroObject = new GenericData.Record(avroSchema);
+ avroObject.put("one", 10L);
+ avroObject.put("two", 34);
+ avroObject.put("three", new Utf8("54"));
+ avroObject.put("four", new Utf8("random data"));
+ // store byte array in byte buffer
+ byte[] b = new byte[] { (byte) -112, (byte) 54 };
+ avroObject.put("five", ByteBuffer.wrap(b));
+ avroObject.put("six", new Utf8(String.valueOf(0x0A)));
+ avroObject.put("seven", new GenericData.EnumSymbol(createEnumSchema(enumCol), "ENUM"));
+
+ List<Object> givenArrayOne = new ArrayList<Object>();
+ givenArrayOne.add(11);
+ givenArrayOne.add(11);
+ List<Object> givenArrayTwo = new ArrayList<Object>();
+ givenArrayTwo.add(14);
+ givenArrayTwo.add(15);
+ List<Object> arrayOfArrays = new ArrayList<Object>();
+
+ arrayOfArrays.add(givenArrayOne);
+ arrayOfArrays.add(givenArrayTwo);
+
+ Map<Object, Object> map = new HashMap<Object, Object>();
+ map.put("testKey", "testValue");
+
+ avroObject.put("eight", arrayOfArrays);
+ avroObject.put("nine", map);
+ avroObject.put("ten", true);
+
+ // expect dates as strings
+ avroObject.put("eleven", dateTime.toDate().getTime());
+ avroObject.put("twelve", time.toDateTimeToday().getMillis());
+ avroObject.put("thirteen", date.toDate().getTime());
+ avroObject.put("fourteen", 13.44);
+ List<Object> givenSetOne = new ArrayList<Object>();
+ givenSetOne.add(11);
+ givenSetOne.add(12);
+ List<Object> givenSetTwo = new ArrayList<Object>();
+ givenSetTwo.add(14);
+ givenSetTwo.add(15);
+ List<Object> set = new ArrayList<Object>();
+ set.add(givenSetOne);
+ set.add(givenSetTwo);
+ avroObject.put("fifteen", set);
+ return avroObject;
+ }
+
+ /**
+ * setDataGetCSV setDataGetObjectArray setDataGetData
+ */
+ @Test
+ public void testInputAsDataInAndCSVOut() {
+
+ String csvExpected = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+ + ",13.44," + csvSet;
+ dataFormat.setData(createAvroGenericRecord());
+ assertEquals(csvExpected, dataFormat.getCSVTextData());
+ }
+
+ @Test
+ public void testInputAsDataInAndObjectArrayOut() {
+ GenericRecord avroObject = createAvroGenericRecord();
+ dataFormat.setData(avroObject);
+ assertObjectArray();
+ }
+
+ @Test
+ public void testInputAsDataInAndDataOut() {
+ GenericRecord avroObject = createAvroGenericRecord();
+ dataFormat.setData(avroObject);
+ assertEquals(avroObject, dataFormat.getData());
+ }
+
+ private Object[] createObjectArray() {
+ Object[] out = new Object[15];
+ out[0] = 10L;
+ out[1] = 34;
+ out[2] = "54";
+ out[3] = "random data";
+ out[4] = new byte[] { (byte) -112, (byte) 54 };
+ out[5] = String.valueOf(0x0A);
+ out[6] = "ENUM";
+
+ Object[] givenArrayOne = new Object[2];
+ givenArrayOne[0] = 11;
+ givenArrayOne[1] = 11;
+ Object[] givenArrayTwo = new Object[2];
+ givenArrayTwo[0] = 14;
+ givenArrayTwo[1] = 15;
+
+ Object[] arrayOfArrays = new Object[2];
+ arrayOfArrays[0] = givenArrayOne;
+ arrayOfArrays[1] = givenArrayTwo;
+
+ Map<Object, Object> map = new HashMap<Object, Object>();
+ map.put("testKey", "testValue");
+
+ out[7] = arrayOfArrays;
+ out[8] = map;
+ out[9] = true;
+ out[10] = dateTime;
+ out[11] = time;
+ out[12] = date;
+
+ out[13] = 13.44;
+ Object[] set0 = new Object[2];
+ set0[0] = 11;
+ set0[1] = 12;
+ Object[] set1 = new Object[2];
+ set1[0] = 14;
+ set1[1] = 15;
+
+ Object[] set = new Object[2];
+ set[0] = set0;
+ set[1] = set1;
+ out[14] = set;
+ return out;
+ }
+
+ /**
+ * setObjectArrayGetData setObjectArrayGetCSV setObjectArrayGetObjectArray
+ */
+ @Test
+ public void testInputAsObjectArrayInAndDataOut() {
+
+ Object[] out = createObjectArray();
+ dataFormat.setObjectData(out);
+ GenericRecord avroObject = createAvroGenericRecord();
+ // SQOOP-SQOOP-1975: direct object compare will fail unless we use the Avro complex types
+ assertEquals(avroObject.toString(), dataFormat.getData().toString());
+
+ }
+
+ @Test
+ public void testInputAsObjectArrayInAndCSVOut() {
+ Object[] out = createObjectArray();
+ dataFormat.setObjectData(out);
+ String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+ + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+ + ",13.44," + csvSet;
+ assertEquals(csvText, dataFormat.getCSVTextData());
+ }
+
+ @Test
+ public void testInputAsObjectArrayInAndObjectArrayOut() {
+ Object[] out = createObjectArray();
+ dataFormat.setObjectData(out);
+ assertObjectArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4dbc48f..d1f0a45 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,6 +123,7 @@ limitations under the License.
<jackson.core.version>2.2.2</jackson.core.version>
<jackson.databind.version>2.2.2</jackson.databind.version>
<jackson.annotations.version>2.2.2</jackson.annotations.version>
+ <avro.version>1.7.7</avro.version>
</properties>
<dependencies>
@@ -599,6 +600,11 @@ limitations under the License.
<artifactId>hadoop-common</artifactId>
<version>${hadoop.2.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>