You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/01/08 03:30:14 UTC

sqoop git commit: SQOOP-1902: Sqoop2: Avro IDF class and unit tests

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 6d009c6c0 -> 3bb9c595d


SQOOP-1902: Sqoop2: Avro IDF class and unit tests

(Veena Basavaraj via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3bb9c595
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3bb9c595
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3bb9c595

Branch: refs/heads/sqoop2
Commit: 3bb9c595d9d6df99b758c664604e5eccd971ef5c
Parents: 6d009c6
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Wed Jan 7 18:28:39 2015 -0800
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Wed Jan 7 18:28:39 2015 -0800

----------------------------------------------------------------------
 connector/connector-sdk/pom.xml                 |   4 +
 .../sqoop/connector/common/SqoopAvroUtils.java  | 128 ++++++
 .../sqoop/connector/common/SqoopIDFUtils.java   |  91 +++-
 .../idf/AVROIntermediateDataFormat.java         | 431 +++++++++++++++++++
 .../idf/AVROIntermediateDataFormatError.java    |  45 ++
 .../idf/CSVIntermediateDataFormat.java          |   2 +-
 .../idf/CSVIntermediateDataFormatError.java     |   7 +-
 .../connector/idf/IntermediateDataFormat.java   |   8 +-
 .../idf/IntermediateDataFormatError.java        |   3 +
 .../idf/JSONIntermediateDataFormat.java         |  17 +-
 .../idf/TestAVROIntermediateDataFormat.java     | 319 ++++++++++++++
 pom.xml                                         |   6 +
 12 files changed, 1016 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/pom.xml b/connector/connector-sdk/pom.xml
index 46cb9f8..2795084 100644
--- a/connector/connector-sdk/pom.xml
+++ b/connector/connector-sdk/pom.xml
@@ -50,6 +50,10 @@ limitations under the License.
       <groupId>org.apache.sqoop</groupId>
       <artifactId>sqoop-common</artifactId>
     </dependency>
+     <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
 
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
new file mode 100644
index 0000000..e47d8fe
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.common;
+
+import org.apache.avro.Schema;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.IntermediateDataFormatError;
+import org.apache.sqoop.schema.type.AbstractComplexListType;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class SqoopAvroUtils {
+
+  public static final String COLUMN_TYPE = "columnType";
+  public static final String SQOOP_SCHEMA_NAMESPACE = "org.apache.sqoop";
+
+  /**
+   * Creates an Avro schema from a Sqoop schema.
+   */
+  public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) {
+    String name = sqoopSchema.getName();
+    String doc = sqoopSchema.getNote();
+    String namespace = SQOOP_SCHEMA_NAMESPACE;
+    Schema schema = Schema.createRecord(name, doc, namespace, false);
+
+    List<Schema.Field> fields = new ArrayList<Schema.Field>();
+    for (Column column : sqoopSchema.getColumnsArray()) {
+      Schema.Field field = new Schema.Field(column.getName(), createAvroFieldSchema(column), null, null);
+      field.addProp(COLUMN_TYPE, column.getType().toString());
+      fields.add(field);
+    }
+    schema.setFields(fields);
+    return schema;
+  }
+
+  public static Schema createAvroFieldSchema(Column column) {
+    Schema schema = toAvroFieldType(column);
+    if (!column.getNullable()) {
+      return schema;
+    } else {
+      List<Schema> union = new ArrayList<Schema>();
+      union.add(schema);
+      union.add(Schema.create(Schema.Type.NULL));
+      return Schema.createUnion(union);
+    }
+  }
+
+  public static Schema toAvroFieldType(Column column) throws IllegalArgumentException {
+    switch (column.getType()) {
+    case ARRAY:
+    case SET:
+      AbstractComplexListType listColumn = (AbstractComplexListType) column;
+      return Schema.createArray(toAvroFieldType(listColumn.getListType()));
+    case UNKNOWN:
+    case BINARY:
+      return Schema.create(Schema.Type.BYTES);
+    case BIT:
+      return Schema.create(Schema.Type.BOOLEAN);
+    case DATE:
+    case DATE_TIME:
+    case TIME:
+      // avro 1.8 will have date type
+      // https://issues.apache.org/jira/browse/AVRO-739
+      return Schema.create(Schema.Type.LONG);
+    case DECIMAL:
+      // TODO: is string ok, used it since kite code seems to use it
+      return Schema.create(Schema.Type.STRING);
+    case ENUM:
+      return createEnumSchema(column);
+    case FIXED_POINT:
+      Long byteSize = ((FixedPoint) column).getByteSize();
+      if (byteSize != null && byteSize <= Integer.SIZE) {
+        return Schema.create(Schema.Type.INT);
+      } else {
+        return Schema.create(Schema.Type.LONG);
+      }
+    case FLOATING_POINT:
+      byteSize = ((FloatingPoint) column).getByteSize();
+      if (byteSize != null && byteSize <= Float.SIZE) {
+        return Schema.create(Schema.Type.FLOAT);
+      } else {
+        return Schema.create(Schema.Type.DOUBLE);
+      }
+    case MAP:
+      org.apache.sqoop.schema.type.Map mapColumn = (org.apache.sqoop.schema.type.Map) column;
+      return Schema.createArray(toAvroFieldType(mapColumn.getValue()));
+    case TEXT:
+      return Schema.create(Schema.Type.STRING);
+    default:
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, column.getType().name());
+    }
+  }
+
+  public static Schema createEnumSchema(Column column) {
+    Set<String> options = ((org.apache.sqoop.schema.type.Enum) column).getOptions();
+    List<String> listOptions = new ArrayList<String>(options);
+    return Schema.createEnum(column.getName(), null, SQOOP_SCHEMA_NAMESPACE, listOptions);
+  }
+
+  public static byte[] getBytesFromByteBuffer(Object obj) {
+    ByteBuffer buffer = (ByteBuffer) obj;
+    byte[] bytes = new byte[buffer.remaining()];
+    buffer.duplicate().get(bytes);
+    return bytes;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
index 979aa4f..26ff629 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopIDFUtils.java
@@ -68,15 +68,10 @@ public class SqoopIDFUtils {
   public static final char QUOTE_CHARACTER = '\'';
 
   // string related replacements
-  private static final String[] replacements = {
-      new String(new char[] { ESCAPE_CHARACTER, '\\' }),
-      new String(new char[] { ESCAPE_CHARACTER, '0' }),
-      new String(new char[] { ESCAPE_CHARACTER, 'n' }),
-      new String(new char[] { ESCAPE_CHARACTER, 'r' }),
-      new String(new char[] { ESCAPE_CHARACTER, 'Z' }),
-      new String(new char[] { ESCAPE_CHARACTER, '\"' }),
-      new String(new char[] { ESCAPE_CHARACTER, '\'' })
-      };
+  private static final String[] replacements = { new String(new char[] { ESCAPE_CHARACTER, '\\' }),
+      new String(new char[] { ESCAPE_CHARACTER, '0' }), new String(new char[] { ESCAPE_CHARACTER, 'n' }),
+      new String(new char[] { ESCAPE_CHARACTER, 'r' }), new String(new char[] { ESCAPE_CHARACTER, 'Z' }),
+      new String(new char[] { ESCAPE_CHARACTER, '\"' }), new String(new char[] { ESCAPE_CHARACTER, '\'' }) };
 
   // http://www.joda.org/joda-time/key_format.html provides details on the
   // formatter token
@@ -140,8 +135,9 @@ public class SqoopIDFUtils {
   }
 
   public static String encodeToCSVDecimal(Object obj) {
-     return ((BigDecimal)obj).toString();
+    return ((BigDecimal) obj).toString();
   }
+
   public static Object toDecimal(String csvString, Column column) {
     return new BigDecimal(csvString);
   }
@@ -152,7 +148,8 @@ public class SqoopIDFUtils {
     if ((TRUE_BIT_SET.contains(bitStringValue)) || (FALSE_BIT_SET.contains(bitStringValue))) {
       return bitStringValue;
     } else {
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006, " given bit value: " + bitStringValue);
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0005, " given bit value: "
+          + bitStringValue);
     }
   }
 
@@ -161,7 +158,7 @@ public class SqoopIDFUtils {
       return TRUE_BIT_SET.contains(csvString);
     } else {
       // throw an exception for any unsupported value for BITs
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0006, " given bit value: " + csvString);
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0005, " given bit value: " + csvString);
     }
   }
 
@@ -200,7 +197,6 @@ public class SqoopIDFUtils {
     }
   }
 
-
   public static String encodeToCSVDateTime(Object obj, Column col) {
     org.joda.time.DateTime dateTime = (org.joda.time.DateTime) obj;
     org.apache.sqoop.schema.type.DateTime column = (org.apache.sqoop.schema.type.DateTime) col;
@@ -236,6 +232,27 @@ public class SqoopIDFUtils {
     return returnValue;
   }
 
+  public static Long toDateTimeInMillis(String csvString, Column column) {
+    long returnValue;
+    String dateTime = removeQuotes(csvString);
+    org.apache.sqoop.schema.type.DateTime col = ((org.apache.sqoop.schema.type.DateTime) column);
+    if (col.hasFraction() && col.hasTimezone()) {
+      // After calling withOffsetParsed method, a string
+      // '2004-06-09T10:20:30-08:00' will create a datetime with a zone of
+      // -08:00 (a fixed zone, with no daylight savings rules)
+      returnValue = dtfWithFractionAndTimeZone.withOffsetParsed().parseDateTime(dateTime).toDate().getTime();
+    } else if (col.hasFraction() && !col.hasTimezone()) {
+      // we use local date time explicitly to not include the timezone
+      returnValue = dtfWithFractionNoTimeZone.parseLocalDateTime(dateTime).toDate().getTime();
+    } else if (col.hasTimezone()) {
+      returnValue = dtfWithNoFractionWithTimeZone.withOffsetParsed().parseDateTime(dateTime).toDate().getTime();
+    } else {
+      // we use local date time explicitly to not include the timezone
+      returnValue = dtfWithNoFractionAndTimeZone.parseLocalDateTime(dateTime).toDate().getTime();
+    }
+    return returnValue;
+  }
+
   // ************ MAP Column Type utils*********
 
   @SuppressWarnings("unchecked")
@@ -301,7 +318,7 @@ public class SqoopIDFUtils {
     List<Object> elementList = new ArrayList<Object>();
     for (int n = 0; n < list.length; n++) {
       Column listType = ((AbstractComplexListType) column).getListType();
-      //2 level nesting supported
+      // 2 level nesting supported
       if (isColumnListType(listType)) {
         Object[] listElements = (Object[]) list[n];
         JSONArray subArray = new JSONArray();
@@ -332,6 +349,44 @@ public class SqoopIDFUtils {
     return null;
   }
 
+  @SuppressWarnings("unchecked")
+  public static JSONArray toJSONArray(Object[] objectArray) {
+    JSONArray jsonArray = new JSONArray();
+    for (int i = 0; i < objectArray.length; i++) {
+      Object value = objectArray[i];
+      if (value instanceof Object[]) {
+        value = toJSONArray((Object[]) value);
+      }
+      jsonArray.add(value);
+    }
+    return jsonArray;
+  }
+
+  public static List<Object> toList(Object[] objectArray) {
+    List<Object> objList = new ArrayList<Object>();
+    for (int i = 0; i < objectArray.length; i++) {
+      Object value = objectArray[i];
+      if (value instanceof Object[]) {
+        value = toList((Object[]) value);
+      }
+      objList.add(value);
+    }
+    return objList;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static Object[] toObjectArray(List<Object> list) {
+    Object[] array = new Object[list.size()];
+    for (int i = 0; i < list.size(); i++) {
+      Object value = list.get(i);
+      if (value instanceof List) {
+        value = toObjectArray((List<Object>) value);
+      }
+      array[i] = value;
+    }
+    return array;
+  }
+
   // ************ TEXT Column Type utils*********
 
   private static String getRegExp(char character) {
@@ -350,8 +405,8 @@ public class SqoopIDFUtils {
         replacement = replacement.replaceAll(getRegExp(originals[j]), Matcher.quoteReplacement(replacements[j]));
       }
     } catch (Exception e) {
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + "  " + replacement + "  "
-          + String.valueOf(j) + "  " + e.getMessage());
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0002, string + "  " + replacement
+          + "  " + String.valueOf(j) + "  " + e.getMessage());
     }
     return encloseWithQuote(replacement);
   }
@@ -365,8 +420,8 @@ public class SqoopIDFUtils {
         string = string.replaceAll(getRegExp(replacements[j]), Matcher.quoteReplacement(String.valueOf(originals[j])));
       }
     } catch (Exception e) {
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, string + "  " + String.valueOf(j)
-          + e.getMessage());
+      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0003, string + "  "
+          + String.valueOf(j) + e.getMessage());
     }
 
     return string;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
new file mode 100644
index 0000000..b12b59a
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
+import static org.apache.sqoop.connector.common.SqoopAvroUtils.*;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.util.Utf8;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.utils.ClassUtils;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * IDF representing the intermediate format in Avro object
+ */
+public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRecord> {
+
+  private Schema avroSchema;
+
+  // need this default constructor for reflection magic used in execution engine
+  public AVROIntermediateDataFormat() {
+  }
+
+  // We need schema at all times
+  public AVROIntermediateDataFormat(org.apache.sqoop.schema.Schema schema) {
+    setSchema(schema);
+    avroSchema = createAvroSchema(schema);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setCSVTextData(String text) {
+    // convert the CSV text to avro
+    this.data = toAVRO(text);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String getCSVTextData() {
+    // convert avro to sqoop CSV
+    return toCSV(data);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setObjectData(Object[] data) {
+    // convert the object array to avro
+    this.data = toAVRO(data);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Object[] getObjectData() {
+    // convert avro to object array
+    return toObject(data);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // do we need to write the schema?
+    DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroSchema);
+    BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder((DataOutputStream) out, null);
+    writer.write(data, encoder);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void read(DataInput in) throws IOException {
+    DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(avroSchema);
+    Decoder decoder = DecoderFactory.get().binaryDecoder((InputStream) in, null);
+    data = reader.read(null, decoder);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Set<String> getJars() {
+
+    Set<String> jars = super.getJars();
+    jars.add(ClassUtils.jarForClass(GenericRecord.class));
+    return jars;
+  }
+
+  private GenericRecord toAVRO(String csv) {
+
+    String[] csvStringArray = parseCSVString(csv);
+
+    if (csvStringArray == null) {
+      return null;
+    }
+
+    if (csvStringArray.length != schema.getColumnsArray().length) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + csv
+          + " has the wrong number of fields.");
+    }
+    GenericRecord avroObject = new GenericData.Record(avroSchema);
+    Column[] columnArray = schema.getColumnsArray();
+    for (int i = 0; i < csvStringArray.length; i++) {
+      // check for NULL field and assume this field is nullable as per the sqoop
+      // schema
+      if (csvStringArray[i].equals(NULL_VALUE) && columnArray[i].getNullable()) {
+        avroObject.put(columnArray[i].getName(), null);
+        continue;
+      }
+      avroObject.put(columnArray[i].getName(), toAVRO(csvStringArray[i], columnArray[i]));
+    }
+    return avroObject;
+  }
+
+  private Object toAVRO(String csvString, Column column) {
+    Object returnValue = null;
+
+    switch (column.getType()) {
+    case ARRAY:
+    case SET:
+      Object[] list = toList(csvString);
+      // store as a java collection
+      returnValue = Arrays.asList(list);
+      break;
+    case MAP:
+      // store as a map
+      returnValue = toMap(csvString);
+      break;
+    case ENUM:
+      returnValue = new GenericData.EnumSymbol(createEnumSchema(column), (removeQuotes(csvString)));
+      break;
+    case TEXT:
+      returnValue = new Utf8(removeQuotes(csvString));
+      break;
+    case BINARY:
+    case UNKNOWN:
+      // avro accepts byte buffer for binary data
+      returnValue = ByteBuffer.wrap(toByteArray(csvString));
+      break;
+    case FIXED_POINT:
+      returnValue = toFixedPoint(csvString, column);
+      break;
+    case FLOATING_POINT:
+      returnValue = toFloatingPoint(csvString, column);
+      break;
+    case DECIMAL:
+      // TODO: store as FIXED in SQOOP-16161
+      returnValue = removeQuotes(csvString);
+      break;
+    case DATE:
+      // until 1.8 avro store as long
+      returnValue = ((LocalDate) toDate(csvString, column)).toDate().getTime();
+      break;
+    case TIME:
+      // until 1.8 avro store as long
+      returnValue = ((LocalTime) toTime(csvString, column)).toDateTimeToday().getMillis();
+      break;
+    case DATE_TIME:
+      // until 1.8 avro store as long
+      returnValue = toDateTimeInMillis(csvString, column);
+      break;
+    case BIT:
+      returnValue = Boolean.valueOf(removeQuotes(csvString));
+      break;
+    default:
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
+          "Column type from schema was not recognized for " + column.getType());
+    }
+    return returnValue;
+  }
+
+  private GenericRecord toAVRO(Object[] data) {
+
+    if (data == null) {
+      return null;
+    }
+
+    if (data.length != schema.getColumnsArray().length) {
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001, "The data " + data.toString()
+          + " has the wrong number of fields.");
+    }
+    // get avro schema from sqoop schema
+    GenericRecord avroObject = new GenericData.Record(avroSchema);
+    Column[] cols = schema.getColumnsArray();
+    for (int i = 0; i < data.length; i++) {
+      switch (cols[i].getType()) {
+      case ARRAY:
+      case SET:
+        avroObject.put(cols[i].getName(), toList((Object[]) data[i]));
+        break;
+      case MAP:
+        avroObject.put(cols[i].getName(), data[i]);
+        break;
+      case ENUM:
+        GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(createEnumSchema(cols[i]), (String) data[i]);
+        avroObject.put(cols[i].getName(), enumValue);
+        break;
+      case TEXT:
+        avroObject.put(cols[i].getName(), new Utf8((String) data[i]));
+        break;
+      case BINARY:
+      case UNKNOWN:
+        avroObject.put(cols[i].getName(), ByteBuffer.wrap((byte[]) data[i]));
+        break;
+      case FIXED_POINT:
+      case FLOATING_POINT:
+        avroObject.put(cols[i].getName(), data[i]);
+        break;
+      case DECIMAL:
+        // TODO: store as FIXED in SQOOP-16161
+        avroObject.put(cols[i].getName(), ((BigDecimal) data[i]).toPlainString());
+        break;
+      case DATE_TIME:
+        if (data[i] instanceof org.joda.time.DateTime) {
+          avroObject.put(cols[i].getName(), ((org.joda.time.DateTime) data[i]).toDate().getTime());
+        } else if (data[i] instanceof org.joda.time.LocalDateTime) {
+          avroObject.put(cols[i].getName(), ((org.joda.time.LocalDateTime) data[i]).toDate().getTime());
+        }
+        break;
+      case TIME:
+        avroObject.put(cols[i].getName(), ((org.joda.time.LocalTime) data[i]).toDateTimeToday().getMillis());
+        break;
+      case DATE:
+        avroObject.put(cols[i].getName(), ((org.joda.time.LocalDate) data[i]).toDate().getTime());
+        break;
+      case BIT:
+        avroObject.put(cols[i].getName(), Boolean.valueOf((Boolean) data[i]));
+        break;
+      default:
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+            "Column type from schema was not recognized for " + cols[i].getType());
+      }
+    }
+
+    return avroObject;
+  }
+
+  @SuppressWarnings("unchecked")
+  private String toCSV(GenericRecord record) {
+    Column[] cols = this.schema.getColumnsArray();
+
+    StringBuilder csvString = new StringBuilder();
+    for (int i = 0; i < cols.length; i++) {
+
+      Object obj = record.get(cols[i].getName());
+
+      if (obj == null) {
+        throw new SqoopException(AVROIntermediateDataFormatError.AVRO_INTERMEDIATE_DATA_FORMAT_0001, " for " + cols[i].getName());
+      }
+
+      switch (cols[i].getType()) {
+      case ARRAY:
+      case SET:
+        List<Object> objList = (List<Object>) obj;
+        csvString.append(encodeToCSVList(toObjectArray(objList), cols[i]));
+        break;
+      case MAP:
+        Map<Object, Object> objMap = (Map<Object, Object>) obj;
+        csvString.append(encodeToCSVMap(objMap, cols[i]));
+        break;
+      case ENUM:
+      case TEXT:
+        csvString.append(encodeToCSVString(obj.toString()));
+        break;
+      case BINARY:
+      case UNKNOWN:
+        csvString.append(encodeToCSVByteArray(getBytesFromByteBuffer(obj)));
+        break;
+      case FIXED_POINT:
+        csvString.append(encodeToCSVFixedPoint(obj, cols[i]));
+        break;
+      case FLOATING_POINT:
+        csvString.append(encodeToCSVFloatingPoint(obj, cols[i]));
+        break;
+      case DECIMAL:
+        // stored as string
+        csvString.append(encodeToCSVDecimal(obj));
+        break;
+      case DATE:
+        // stored as long
+        Long dateInMillis = (Long) obj;
+        csvString.append(encodeToCSVDate(new org.joda.time.LocalDate(dateInMillis)));
+        break;
+      case TIME:
+        // stored as long
+        Long timeInMillis = (Long) obj;
+        csvString.append(encodeToCSVTime(new org.joda.time.LocalTime(timeInMillis), cols[i]));
+        break;
+      case DATE_TIME:
+        // stored as long
+        Long dateTimeInMillis = (Long) obj;
+        csvString.append(encodeToCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), cols[i]));
+        break;
+      case BIT:
+        csvString.append(encodeToCSVBit(obj));
+        break;
+      default:
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+            "Column type from schema was not recognized for " + cols[i].getType());
+      }
+      if (i < cols.length - 1) {
+        csvString.append(CSV_SEPARATOR_CHARACTER);
+      }
+
+    }
+
+    return csvString.toString();
+  }
+
+  @SuppressWarnings("unchecked")
+  private Object[] toObject(GenericRecord record) {
+
+    if (data == null) {
+      return null;
+    }
+    Column[] cols = schema.getColumnsArray();
+    Object[] object = new Object[cols.length];
+
+    for (int i = 0; i < cols.length; i++) {
+      Object obj = record.get(cols[i].getName());
+      if (obj == null) {
+        throw new SqoopException(AVROIntermediateDataFormatError.AVRO_INTERMEDIATE_DATA_FORMAT_0001, " for " + cols[i].getName());
+      }
+      Integer nameIndex = schema.getColumnNameIndex(cols[i].getName());
+      Column column = cols[nameIndex];
+      switch (column.getType()) {
+      case ARRAY:
+      case SET:
+        object[nameIndex] = toObjectArray((List<Object>) obj);
+        break;
+      case MAP:
+        object[nameIndex] = obj;
+        break;
+      case ENUM:
+        // stored as enum symbol
+      case TEXT:
+        // stored as UTF8
+        object[nameIndex] = obj.toString();
+        break;
+      case BINARY:
+      case UNKNOWN:
+        // stored as byte buffer
+        object[nameIndex] = getBytesFromByteBuffer(obj);
+        break;
+      case FIXED_POINT:
+      case FLOATING_POINT:
+        // stored as java objects in avro as well
+        object[nameIndex] = obj;
+        break;
+      case DECIMAL:
+        // stored as string
+        object[nameIndex] = obj.toString();
+        break;
+      case DATE:
+        Long dateInMillis = (Long) obj;
+        object[nameIndex] = new org.joda.time.LocalDate(dateInMillis);
+        break;
+      case TIME:
+        Long timeInMillis = (Long) obj;
+        object[nameIndex] = new org.joda.time.LocalTime(timeInMillis);
+        break;
+      case DATE_TIME:
+        Long dateTimeInMillis = (Long) obj;
+        if (((org.apache.sqoop.schema.type.DateTime) column).hasTimezone()) {
+          object[nameIndex] = new org.joda.time.DateTime(dateTimeInMillis);
+        } else {
+          object[nameIndex] = new org.joda.time.LocalDateTime(dateTimeInMillis);
+        }
+        break;
+      case BIT:
+        object[nameIndex] = toBit(obj.toString());
+        break;
+      default:
+        throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
+            "Column type from schema was not recognized for " + cols[i].getType());
+      }
+
+    }
+    return object;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java
new file mode 100644
index 0000000..6af21a3
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormatError.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sqoop.connector.idf;
+
+import org.apache.sqoop.common.ErrorCode;
+
+public enum AVROIntermediateDataFormatError implements ErrorCode {
+  /** An unknown error has occurred. */
+  AVRO_INTERMEDIATE_DATA_FORMAT_0000("An unknown error has occurred."),
+
+  AVRO_INTERMEDIATE_DATA_FORMAT_0001("Missing key in the AVRO object.")
+
+  ;
+
+  private final String message;
+
+  private AVROIntermediateDataFormatError(String message) {
+    this.message = message;
+  }
+
+  public String getCode() {
+    return name();
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index be1147d..33b5d0a 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -152,7 +152,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
       returnValue = toMap(csvString);
       break;
     default:
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0004,
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
           "Column type from schema was not recognized for " + column.getType());
     }
     return returnValue;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
index a88db45..9aae251 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormatError.java
@@ -33,17 +33,14 @@ public enum CSVIntermediateDataFormatError implements ErrorCode {
   /** Error while escaping a row. */
   CSV_INTERMEDIATE_DATA_FORMAT_0003("An error has occurred while unescaping a row."),
 
-  /** Column type isn't known by Intermediate Data Format. */
-  CSV_INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
-
   /**
    * For arrays and maps we use JSON representation and incorrect representation
    * results in parse exception
    */
-  CSV_INTERMEDIATE_DATA_FORMAT_0005("JSON parse internal error."),
+  CSV_INTERMEDIATE_DATA_FORMAT_0004("JSON parse internal error."),
 
   /** Unsupported bit values */
-  CSV_INTERMEDIATE_DATA_FORMAT_0006("Unsupported bit value."),
+  CSV_INTERMEDIATE_DATA_FORMAT_0005("Unsupported bit value."),
 
   ;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
index adeb2ec..055b41c 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormat.java
@@ -18,11 +18,10 @@
  */
 package org.apache.sqoop.connector.idf;
 
-import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.isColumnListType;
+import static org.apache.sqoop.connector.common.SqoopIDFUtils.isColumnStringType;
 
-import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.AbstractComplexListType;
 import org.apache.sqoop.schema.type.Column;
 import org.apache.sqoop.schema.type.ColumnType;
 
@@ -30,9 +29,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
index bda75fc..1f583b2 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/IntermediateDataFormatError.java
@@ -32,6 +32,9 @@ public enum IntermediateDataFormatError implements ErrorCode {
 
   INTERMEDIATE_DATA_FORMAT_0003("JSON parse error"),
 
+  /** Column type isn't known by Intermediate Data Format. */
+  INTERMEDIATE_DATA_FORMAT_0004("Unknown column type."),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
index 9329cf8..90294f0 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/JSONIntermediateDataFormat.java
@@ -189,8 +189,8 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
       returnValue = Boolean.valueOf(removeQuotes(csvString));
       break;
     default:
-      throw new SqoopException(CSVIntermediateDataFormatError.CSV_INTERMEDIATE_DATA_FORMAT_0004,
- "Column type from schema was not recognized for " + column.getType());
+      throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
+          "Column type from schema was not recognized for " + column.getType());
     }
     return returnValue;
   }
@@ -261,19 +261,6 @@ public class JSONIntermediateDataFormat extends IntermediateDataFormat<JSONObjec
     return object;
   }
 
-  @SuppressWarnings("unchecked")
-  public static JSONArray toJSONArray(Object[] objectArray) {
-    JSONArray jsonArray = new JSONArray();
-    for (int i = 0; i < objectArray.length; i++) {
-      Object value = objectArray[i];
-      if (value instanceof Object[]) {
-        value = toJSONArray((Object[]) value);
-      }
-      jsonArray.add(value);
-    }
-    return jsonArray;
-  }
-
   private String toCSV(JSONObject json) {
     Column[] cols = this.schema.getColumnsArray();
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
new file mode 100644
index 0000000..b00b3b9
--- /dev/null
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.idf;
+
+import static org.apache.sqoop.connector.common.SqoopAvroUtils.createEnumSchema;
+import static org.apache.sqoop.connector.common.TestSqoopIDFUtils.getByteFieldString;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.sqoop.connector.common.SqoopAvroUtils;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Array;
+import org.apache.sqoop.schema.type.Binary;
+import org.apache.sqoop.schema.type.Bit;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.joda.time.LocalDateTime;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestAVROIntermediateDataFormat {
+
+  private AVROIntermediateDataFormat dataFormat;
+  private org.apache.avro.Schema avroSchema;
+  private final static String csvArray = "'[[11,11],[14,15]]'";
+  private final static String map = "'{\"testKey\":\"testValue\"}'";
+  private final static String csvSet = "'[[11,12],[14,15]]'";
+  private final static String csvDate = "'2014-10-01'";
+  private final static String csvDateTime = "'2014-10-01 12:00:00.000'";
+  private final static String csvTime = "'12:59:59'";
+  private Column enumCol;
+  // no time zone
+  private final static LocalDateTime dateTime = new org.joda.time.LocalDateTime(2014, 10, 01, 12, 0, 0);
+  private final static org.joda.time.LocalTime time = new org.joda.time.LocalTime(12, 59, 59);
+  private final static org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
+
+  @Before
+  public void setUp() {
+    createAvroIDF();
+  }
+
+  private void createAvroIDF() {
+    Schema sqoopSchema = new Schema("test");
+    Set<String> options = new HashSet<String>();
+    options.add("ENUM");
+    options.add("NUME");
+    enumCol = new org.apache.sqoop.schema.type.Enum("seven").setOptions(options);
+    sqoopSchema.addColumn(new FixedPoint("one")).addColumn(new FixedPoint("two", 2L, false)).addColumn(new Text("three"))
+        .addColumn(new Text("four")).addColumn(new Binary("five")).addColumn(new Text("six")).addColumn(enumCol)
+        .addColumn(new Array("eight", new Array("array", new FixedPoint("ft"))))
+        .addColumn(new org.apache.sqoop.schema.type.Map("nine", new Text("t1"), new Text("t2"))).addColumn(new Bit("ten"))
+        .addColumn(new org.apache.sqoop.schema.type.DateTime("eleven", true, false))
+        .addColumn(new org.apache.sqoop.schema.type.Time("twelve", false))
+        .addColumn(new org.apache.sqoop.schema.type.Date("thirteen"))
+        .addColumn(new org.apache.sqoop.schema.type.FloatingPoint("fourteen"))
+        .addColumn(new org.apache.sqoop.schema.type.Set("fifteen", new Array("set", new FixedPoint("ftw"))));
+    dataFormat = new AVROIntermediateDataFormat(sqoopSchema);
+    avroSchema = SqoopAvroUtils.createAvroSchema(sqoopSchema);
+  }
+
+  /**
+   * setCSVGetData setCSVGetObjectArray setCSVGetCSV
+   */
+  @Test
+  public void testInputAsCSVTextInAndDataOut() {
+
+    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+        + ",13.44," + csvSet;
+    dataFormat.setCSVTextData(csvText);
+    GenericRecord avroObject = createAvroGenericRecord();
+    assertEquals(avroObject.toString(), dataFormat.getData().toString());
+  }
+
+  @Test
+  public void testInputAsCSVTextInAndObjectArrayOut() {
+    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+        + ",13.44," + csvSet;
+    dataFormat.setCSVTextData(csvText);
+    assertEquals(dataFormat.getObjectData().length, 15);
+    assertObjectArray();
+
+  }
+
+  private void assertObjectArray() {
+    Object[] out = dataFormat.getObjectData();
+    assertEquals(10L, out[0]);
+    assertEquals(34, out[1]);
+    assertEquals("54", out[2]);
+    assertEquals("random data", out[3]);
+    assertEquals(-112, ((byte[]) out[4])[0]);
+    assertEquals(54, ((byte[]) out[4])[1]);
+    assertEquals("10", out[5]);
+    assertEquals("ENUM", out[6]);
+
+    Object[] givenArrayOne = new Object[2];
+    givenArrayOne[0] = 11;
+    givenArrayOne[1] = 11;
+    Object[] givenArrayTwo = new Object[2];
+    givenArrayTwo[0] = 14;
+    givenArrayTwo[1] = 15;
+    Object[] arrayOfArrays = new Object[2];
+    arrayOfArrays[0] = givenArrayOne;
+    arrayOfArrays[1] = givenArrayTwo;
+    Map<Object, Object> map = new HashMap<Object, Object>();
+    map.put("testKey", "testValue");
+    Object[] set0 = new Object[2];
+    set0[0] = 11;
+    set0[1] = 12;
+    Object[] set1 = new Object[2];
+    set1[0] = 14;
+    set1[1] = 15;
+    Object[] set = new Object[2];
+    set[0] = set0;
+    set[1] = set1;
+    out[14] = set;
+    assertEquals(arrayOfArrays.length, 2);
+    assertEquals(Arrays.deepToString(arrayOfArrays), Arrays.deepToString((Object[]) out[7]));
+    assertEquals(map, out[8]);
+    assertEquals(true, out[9]);
+    assertEquals(dateTime, out[10]);
+    assertEquals(time, out[11]);
+    assertEquals(date, out[12]);
+    assertEquals(13.44, out[13]);
+    assertEquals(set.length, 2);
+    assertEquals(Arrays.deepToString(set), Arrays.deepToString((Object[]) out[14]));
+
+  }
+
+  @Test
+  public void testInputAsCSVTextInCSVTextOut() {
+    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+        + ",13.44," + csvSet;
+    dataFormat.setCSVTextData(csvText);
+    assertEquals(csvText, dataFormat.getCSVTextData());
+  }
+
+  private GenericRecord createAvroGenericRecord() {
+    GenericRecord avroObject = new GenericData.Record(avroSchema);
+    avroObject.put("one", 10L);
+    avroObject.put("two", 34);
+    avroObject.put("three", new Utf8("54"));
+    avroObject.put("four", new Utf8("random data"));
+    // store byte array in byte buffer
+    byte[] b = new byte[] { (byte) -112, (byte) 54 };
+    avroObject.put("five", ByteBuffer.wrap(b));
+    avroObject.put("six", new Utf8(String.valueOf(0x0A)));
+    avroObject.put("seven", new GenericData.EnumSymbol(createEnumSchema(enumCol), "ENUM"));
+
+    List<Object> givenArrayOne = new ArrayList<Object>();
+    givenArrayOne.add(11);
+    givenArrayOne.add(11);
+    List<Object> givenArrayTwo = new ArrayList<Object>();
+    givenArrayTwo.add(14);
+    givenArrayTwo.add(15);
+    List<Object> arrayOfArrays = new ArrayList<Object>();
+
+    arrayOfArrays.add(givenArrayOne);
+    arrayOfArrays.add(givenArrayTwo);
+
+    Map<Object, Object> map = new HashMap<Object, Object>();
+    map.put("testKey", "testValue");
+
+    avroObject.put("eight", arrayOfArrays);
+    avroObject.put("nine", map);
+    avroObject.put("ten", true);
+
+    // expect dates as strings
+    avroObject.put("eleven", dateTime.toDate().getTime());
+    avroObject.put("twelve", time.toDateTimeToday().getMillis());
+    avroObject.put("thirteen", date.toDate().getTime());
+    avroObject.put("fourteen", 13.44);
+    List<Object> givenSetOne = new ArrayList<Object>();
+    givenSetOne.add(11);
+    givenSetOne.add(12);
+    List<Object> givenSetTwo = new ArrayList<Object>();
+    givenSetTwo.add(14);
+    givenSetTwo.add(15);
+    List<Object> set = new ArrayList<Object>();
+    set.add(givenSetOne);
+    set.add(givenSetTwo);
+    avroObject.put("fifteen", set);
+    return avroObject;
+  }
+
+  /**
+   * setDataGetCSV setDataGetObjectArray setDataGetData
+   */
+  @Test
+  public void testInputAsDataInAndCSVOut() {
+
+    String csvExpected = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+        + ",13.44," + csvSet;
+    dataFormat.setData(createAvroGenericRecord());
+    assertEquals(csvExpected, dataFormat.getCSVTextData());
+  }
+
+  @Test
+  public void testInputAsDataInAndObjectArrayOut() {
+    GenericRecord avroObject = createAvroGenericRecord();
+    dataFormat.setData(avroObject);
+    assertObjectArray();
+  }
+
+  @Test
+  public void testInputAsDataInAndDataOut() {
+    GenericRecord avroObject = createAvroGenericRecord();
+    dataFormat.setData(avroObject);
+    assertEquals(avroObject, dataFormat.getData());
+  }
+
+  private Object[] createObjectArray() {
+    Object[] out = new Object[15];
+    out[0] = 10L;
+    out[1] = 34;
+    out[2] = "54";
+    out[3] = "random data";
+    out[4] = new byte[] { (byte) -112, (byte) 54 };
+    out[5] = String.valueOf(0x0A);
+    out[6] = "ENUM";
+
+    Object[] givenArrayOne = new Object[2];
+    givenArrayOne[0] = 11;
+    givenArrayOne[1] = 11;
+    Object[] givenArrayTwo = new Object[2];
+    givenArrayTwo[0] = 14;
+    givenArrayTwo[1] = 15;
+
+    Object[] arrayOfArrays = new Object[2];
+    arrayOfArrays[0] = givenArrayOne;
+    arrayOfArrays[1] = givenArrayTwo;
+
+    Map<Object, Object> map = new HashMap<Object, Object>();
+    map.put("testKey", "testValue");
+
+    out[7] = arrayOfArrays;
+    out[8] = map;
+    out[9] = true;
+    out[10] = dateTime;
+    out[11] = time;
+    out[12] = date;
+
+    out[13] = 13.44;
+    Object[] set0 = new Object[2];
+    set0[0] = 11;
+    set0[1] = 12;
+    Object[] set1 = new Object[2];
+    set1[0] = 14;
+    set1[1] = 15;
+
+    Object[] set = new Object[2];
+    set[0] = set0;
+    set[1] = set1;
+    out[14] = set;
+    return out;
+  }
+
+  /**
+   * setObjectArrayGetData setObjectArrayGetCSV setObjectArrayGetObjectArray
+   */
+  @Test
+  public void testInputAsObjectArrayInAndDataOut() {
+
+    Object[] out = createObjectArray();
+    dataFormat.setObjectData(out);
+    GenericRecord avroObject = createAvroGenericRecord();
+    // SQOOP-SQOOP-1975: direct object compare will fail unless we use the Avro complex types
+    assertEquals(avroObject.toString(), dataFormat.getData().toString());
+
+  }
+
+  @Test
+  public void testInputAsObjectArrayInAndCSVOut() {
+    Object[] out = createObjectArray();
+    dataFormat.setObjectData(out);
+    String csvText = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54 }) + ",'"
+        + String.valueOf(0x0A) + "','ENUM'," + csvArray + "," + map + ",true," + csvDateTime + "," + csvTime + "," + csvDate
+        + ",13.44," + csvSet;
+    assertEquals(csvText, dataFormat.getCSVTextData());
+  }
+
+  @Test
+  public void testInputAsObjectArrayInAndObjectArrayOut() {
+    Object[] out = createObjectArray();
+    dataFormat.setObjectData(out);
+    assertObjectArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/3bb9c595/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4dbc48f..d1f0a45 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,6 +123,7 @@ limitations under the License.
     <jackson.core.version>2.2.2</jackson.core.version>
     <jackson.databind.version>2.2.2</jackson.databind.version>
     <jackson.annotations.version>2.2.2</jackson.annotations.version>
+    <avro.version>1.7.7</avro.version>
   </properties>
 
   <dependencies>
@@ -599,6 +600,11 @@ limitations under the License.
         <artifactId>hadoop-common</artifactId>
         <version>${hadoop.2.version}</version>
       </dependency>
+      <dependency>
+       <groupId>org.apache.avro</groupId>
+       <artifactId>avro</artifactId>
+       <version>${avro.version}</version>
+     </dependency>
    </dependencies>
   </dependencyManagement>