You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/06/29 00:54:06 UTC

svn commit: r1497954 [2/3] - in /pig/trunk: ./ .eclipse.templates/ ivy/ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/builtin/ src/org/apache/pig/impl/util/avro/ test/ test/org/apache/pig/builtin/ test/org/apache/pig/builtin/avro/ test/o...

Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.pig.impl.util.avro;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.joda.time.DateTime;
+
+/**
+ * Utility classes for AvroStorage; contains static methods
+ * for converting between Avro and Pig objects.
+ *
+ */
+public class AvroStorageDataConversionUtilities {
+
+  /**
+   * Packs a Pig Tuple into an Avro record.
+   * @param t the Pig tuple to pack into the avro object
+   * @param s The avro schema for which to determine the type
+   * @return the avro record corresponding to the input tuple
+   * @throws IOException
+   */
+  public static GenericData.Record packIntoAvro(final Tuple t, final Schema s)
+      throws IOException {
+
+    try {
+      GenericData.Record record = new GenericData.Record(s);
+      for (Field f : s.getFields()) {
+        Object o = t.get(f.pos());
+        Schema innerSchema = f.schema();
+        if (AvroStorageSchemaConversionUtilities.isNullableUnion(innerSchema)) {
+          if (o == null) {
+            record.put(f.pos(), null);
+            continue;
+          }
+          innerSchema = AvroStorageSchemaConversionUtilities
+              .removeSimpleUnion(innerSchema);
+        }
+        switch(innerSchema.getType()) {
+        case RECORD:
+          record.put(f.pos(), packIntoAvro((Tuple) o, innerSchema));
+          break;
+        case ARRAY:
+          record.put(f.pos(), packIntoAvro((DataBag) o, innerSchema));
+          break;
+        case BYTES:
+          record.put(f.pos(), ByteBuffer.wrap(((DataByteArray) o).get()));
+          break;
+        case FIXED:
+          record.put(f.pos(), new GenericData.Fixed(
+              innerSchema, ((DataByteArray) o).get()));
+          break;
+        default:
+          if (t.getType(f.pos()) == DataType.DATETIME) {
+            record.put(f.pos(), ((DateTime) o).getMillis() );
+          } else {
+            record.put(f.pos(), o);
+          }
+        }
+      }
+      return record;
+    } catch (Exception e) {
+      throw new IOException(
+          "exception in AvroStorageDataConversionUtilities.packIntoAvro", e);
+    }
+  }
+
+  /**
+   * Packs a Pig DataBag into an Avro array.
+   * @param db the Pig databad to pack into the avro array
+   * @param s The avro schema for which to determine the type
+   * @return the avro array corresponding to the input bag
+   * @throws IOException
+   */
+  public static GenericData.Array<Object> packIntoAvro(
+      final DataBag db, final Schema s) throws IOException {
+
+    try {
+      GenericData.Array<Object> array
+        = new GenericData.Array<Object>(new Long(db.size()).intValue(), s);
+      for (Tuple t : db) {
+        if (s.getElementType() != null
+            && s.getElementType().getType() == Type.RECORD) {
+          array.add(packIntoAvro(t, s.getElementType()));
+        } else if (t.size() == 1) {
+          array.add(t.get(0));
+        } else {
+          throw new IOException(
+              "AvroStorageDataConversionUtilities.packIntoAvro: Can't pack "
+                  + t + " into schema " + s);
+        }
+      }
+      return array;
+    } catch (Exception e) {
+      throw new IOException(
+          "exception in AvroStorageDataConversionUtilities.packIntoAvro", e);
+    }
+  }
+
+
+}

Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util.avro;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.pig.LoadPushDown.RequiredField;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.mortbay.log.Log;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Static methods for converting from Avro Schema object to Pig Schema objects,
+ * and vice versa.
+ */
+public class AvroStorageSchemaConversionUtilities {
+
+  /**
+   * Determines the pig object type of the Avro schema.
+   * @param s The avro schema for which to determine the type
+   * @return the byte representing the schema type
+   * @throws ExecException
+   * @see org.apache.avro.Schema.Type
+   */
+  public static byte getPigType(final Schema s) throws ExecException {
+    switch (s.getType()) {
+    case ARRAY:
+      return DataType.BAG;
+    case BOOLEAN:
+      return DataType.BOOLEAN;
+    case BYTES:
+      return DataType.BYTEARRAY;
+    case DOUBLE:
+      return DataType.DOUBLE;
+    case ENUM:
+      return DataType.CHARARRAY;
+    case FIXED:
+      return DataType.BYTEARRAY;
+    case FLOAT:
+      return DataType.FLOAT;
+    case INT:
+      return DataType.INTEGER;
+    case LONG:
+      return DataType.LONG;
+    case MAP:
+      return DataType.MAP;
+    case NULL:
+      return DataType.NULL;
+    case RECORD:
+      return DataType.TUPLE;
+    case STRING:
+      return DataType.CHARARRAY;
+    case UNION:
+      List<Schema> types = s.getTypes();
+      if (types.size() == 1) {
+        return getPigType(types.get(0));
+      } else if (types.size() == 2 && types.get(0).getType() == Type.NULL) {
+          return getPigType(types.get(1));
+      } else if (types.size() == 2 && types.get(1).getType() == Type.NULL) {
+          return getPigType(types.get(0));
+      } else if (isUnionOfSimpleTypes(s)) {
+          return DataType.BYTEARRAY;
+      }
+      throw new ExecException(
+          "Currently only supports element unions of a type and null (" + s.toString() +")");
+    default:
+      throw new ExecException("Unknown type: " + s.getType().toString());
+    }
+  }
+
+  public static boolean isUnionOfSimpleTypes(Schema s) {
+      List<Schema> types = s.getTypes();
+      if (types == null) {
+          return false;
+      }
+      for (Schema subSchema : types) {
+          switch (subSchema.getType()) {
+          case BOOLEAN: 
+          case BYTES: 
+          case DOUBLE:
+          case ENUM:
+          case FIXED:
+          case FLOAT:
+          case INT:
+          case LONG:
+          case NULL:
+          case STRING:
+              continue;
+          default:    
+              return false;
+          }
+      }
+      return true;
+  }
+  
+  /**
+   * Translates an Avro schema to a Resource Schema (for Pig).
+   * @param s The avro schema for which to determine the type
+   * @param allowRecursiveSchema Flag indicating whether to
+   * throw an error if a recursive schema definition is found
+   * @throws IOException
+   * @return the corresponding pig schema
+   */
+  public static ResourceSchema avroSchemaToResourceSchema(
+      final Schema s, final Boolean allowRecursiveSchema)
+      throws IOException {
+    return avroSchemaToResourceSchema(s, Sets.<Schema> newHashSet(),
+        Maps.<String, ResourceSchema> newHashMap(),
+        allowRecursiveSchema);
+  }
+
+  /**
+   * Translates a field schema (avro) to a resource field schema (pig).
+   * @param f
+   *          The avro schema field for which to determine the type
+   * @param namesInStack
+   *          Set of already defined object names
+   * @param alreadyDefinedSchemas
+   *          Map of schema names to resourceschema objects
+   * @param allowRecursiveSchema
+   *          controls whether to throw an erro if the schema is recursive
+   * @throws IOException
+   * @return the corresponding pig resource schema field
+   */
+  private static ResourceSchema.ResourceFieldSchema fieldToResourceFieldSchema(
+      final Field f, final Set<Schema> schemasInStack,
+      final Map<String, ResourceSchema> alreadyDefinedSchemas,
+      final Boolean allowRecursiveSchema) throws IOException {
+
+    ResourceSchema.ResourceFieldSchema rf =
+        new ResourceSchema.ResourceFieldSchema();
+    rf.setName(f.name());
+    Schema fieldSchema = f.schema();
+    if (isNullableUnion(fieldSchema)) {
+      fieldSchema = removeSimpleUnion(fieldSchema);
+    }
+    // if this is a fixed schema, save the schema into the description to
+    // use later when writing out the field
+    if (fieldSchema.getType() == Type.FIXED) {
+      rf.setDescription(fieldSchema.toString());
+    } else {
+      rf.setDescription(f.doc());
+    }
+    byte pigType = getPigType(fieldSchema);
+    rf.setType(pigType);
+    switch (pigType) {
+    case DataType.BAG: {
+      ResourceSchema bagSchema = new ResourceSchema();
+      ResourceSchema.ResourceFieldSchema[] bagSchemaFields
+        = new ResourceSchema.ResourceFieldSchema[1];
+      bagSchemaFields[0] = new ResourceSchema.ResourceFieldSchema();
+      bagSchemaFields[0].setType(DataType.TUPLE);
+      bagSchemaFields[0].setDescription(fieldSchema.getDoc());
+      ResourceSchema innerResourceSchema = null;
+      Schema elementSchema = fieldSchema.getElementType();
+      if (isNullableUnion(elementSchema)) {
+        elementSchema = removeSimpleUnion(elementSchema);
+      }
+      switch (elementSchema.getType()) {
+      case RECORD:
+      case MAP:
+      case ARRAY:
+        innerResourceSchema = avroSchemaToResourceSchema(elementSchema,
+            schemasInStack, alreadyDefinedSchemas, allowRecursiveSchema);
+        bagSchemaFields[0].setName(elementSchema.getName());
+        break;
+      case UNION:
+        throw new IOException(
+            "Pig cannot translate avro schemas for complex unions");
+      default:
+        innerResourceSchema = new ResourceSchema();
+        ResourceSchema.ResourceFieldSchema[] tupleSchemaFields =
+            new ResourceSchema.ResourceFieldSchema[1];
+        tupleSchemaFields[0] = new ResourceSchema.ResourceFieldSchema();
+        tupleSchemaFields[0].setType(getPigType(elementSchema));
+        innerResourceSchema.setFields(tupleSchemaFields);
+      }
+
+      bagSchemaFields[0].setSchema(innerResourceSchema);
+      bagSchema.setFields(bagSchemaFields);
+      rf.setSchema(bagSchema);
+    }
+      break;
+    case DataType.MAP: {
+      Schema mapAvroSchema = fieldSchema.getValueType();
+      if (isNullableUnion(mapAvroSchema)) {
+        mapAvroSchema = removeSimpleUnion(mapAvroSchema);
+      }
+      ResourceSchema mapSchema = new ResourceSchema();
+      ResourceSchema.ResourceFieldSchema[] mapSchemaFields =
+          new ResourceSchema.ResourceFieldSchema[1];
+      if (mapAvroSchema.getType() == Type.RECORD) {
+        ResourceSchema innerResourceSchema =
+            avroSchemaToResourceSchema(fieldSchema.getValueType(), schemasInStack,
+            alreadyDefinedSchemas, allowRecursiveSchema);
+        mapSchemaFields[0] = new ResourceSchema.ResourceFieldSchema();
+        mapSchemaFields[0].setType(DataType.TUPLE);
+        mapSchemaFields[0].setName(mapAvroSchema.getName());
+        mapSchemaFields[0].setSchema(innerResourceSchema);
+        mapSchemaFields[0].setDescription(fieldSchema.getDoc());
+      } else {
+        mapSchemaFields[0] = new ResourceSchema.ResourceFieldSchema();
+        mapSchemaFields[0].setType(getPigType(mapAvroSchema));
+      }
+      mapSchema.setFields(mapSchemaFields);
+      rf.setSchema(mapSchema);
+    }
+      break;
+    case DataType.TUPLE:
+      if (alreadyDefinedSchemas.containsKey(fieldSchema.getFullName())) {
+        rf.setSchema(alreadyDefinedSchemas.get(fieldSchema.getFullName()));
+      } else {
+        ResourceSchema innerResourceSchema =
+            avroSchemaToResourceSchema(fieldSchema, schemasInStack,
+            alreadyDefinedSchemas, allowRecursiveSchema);
+        rf.setSchema(innerResourceSchema);
+        alreadyDefinedSchemas.put(
+            fieldSchema.getFullName(), innerResourceSchema);
+      }
+      break;
+    }
+
+    return rf;
+  }
+
+  /**
+   * Translates an Avro schema to a Resource Schema (for Pig). Internal method.
+   * @param s The avro schema for which to determine the type
+   * @param namesInStack Set of already defined object names
+   * @param alreadyDefinedSchemas Map of schema names to resourceschema objects
+   * @param allowRecursiveSchema controls whether to throw an error
+   * if the schema is recursive
+   * @throws IOException
+   * @return the corresponding pig schema
+   */
+  private static ResourceSchema avroSchemaToResourceSchema(final Schema s,
+      final Set<Schema> schemasInStack,
+      final Map<String, ResourceSchema> alreadyDefinedSchemas,
+      final Boolean allowRecursiveSchema) throws IOException {
+
+    ResourceSchema.ResourceFieldSchema[] resourceFields = null;
+    switch (s.getType()) {
+    case RECORD:
+      if (schemasInStack.contains(s)) {
+        if (allowRecursiveSchema) {
+          break; // don't define further fields in the schema
+        } else {
+          throw new IOException(
+              "Pig found recursive schema definition while processing"
+              + s.toString() + " encountered " + s.getFullName()
+              + " which was already seen in this stack: "
+              + schemasInStack.toString() + "\n");
+        }
+      }
+      schemasInStack.add(s);
+      resourceFields =
+            new ResourceSchema.ResourceFieldSchema[s.getFields().size()];
+      for (Field f : s.getFields()) {
+        resourceFields[f.pos()] = fieldToResourceFieldSchema(f,
+            schemasInStack, alreadyDefinedSchemas, allowRecursiveSchema);
+      }
+      schemasInStack.remove(s);
+      break;
+    default:
+      // wrap in a tuple
+      resourceFields = new ResourceSchema.ResourceFieldSchema[1];
+      Field f = new Schema.Field(s.getName(), s, s.getDoc(), null);
+      resourceFields[0] = fieldToResourceFieldSchema(f,
+          schemasInStack, alreadyDefinedSchemas, allowRecursiveSchema);
+    }
+    ResourceSchema rs = new ResourceSchema();
+    rs.setFields(resourceFields);
+
+    return rs;
+  }
+  
+  /**
+   * Translated a ResourceSchema to an Avro Schema.
+   * @param rs Input schema.
+   * @param recordName Record name
+   * @param recordNameSpace Namespace
+   * @param definedRecordNames Map of already defined record names
+   * to schema objects
+   * @return the translated schema
+   * @throws IOException
+   */
+  public static Schema resourceSchemaToAvroSchema(final ResourceSchema rs,
+      String recordName, final String recordNameSpace,
+      final Map<String, List<Schema>> definedRecordNames,
+      final Boolean doubleColonsToDoubleUnderscores) throws IOException {
+
+    if (rs == null) {
+      return null;
+    }
+
+    recordName = toAvroName(recordName, doubleColonsToDoubleUnderscores);
+
+    List<Schema.Field> fields = new ArrayList<Schema.Field>();
+    Schema newSchema = Schema.createRecord(
+        recordName, null, recordNameSpace, false);
+    if (rs.getFields() != null) {
+      Integer i = 0;
+      for (ResourceSchema.ResourceFieldSchema rfs : rs.getFields()) {
+        String rfsName = toAvroName(rfs.getName(),
+            doubleColonsToDoubleUnderscores);
+        Schema fieldSchema = resourceFieldSchemaToAvroSchema(
+            rfsName, recordNameSpace, rfs.getType(),
+            rfs.getDescription().equals("autogenerated from Pig Field Schema")
+              ? null : rfs.getDescription(),
+            rfs.getSchema(), definedRecordNames,
+            doubleColonsToDoubleUnderscores);
+        fields.add(new Schema.Field((rfsName != null)
+              ? rfsName : recordName + "_" + i.toString(),
+            fieldSchema,
+            rfs.getDescription().equals(
+                "autogenerated from Pig Field Schema")
+                ? null : rfs.getDescription(), null));
+        i++;
+
+      }
+      newSchema.setFields(fields);
+    }
+
+    return newSchema;
+  }
+
+  /**
+   * Translates a name in a pig schema to an acceptable Avro name, or
+   * throws an error if the name can't be translated.
+   * @param name The variable name to translate.
+   * @param doubleColonsToDoubleUnderscores Indicates whether to translate
+   * double colons to underscores or throw an error if they are encountered.
+   * @return A name usable by Avro.
+   * @throws IOException If the name is not compatible with Avro.
+   */
+  private static String toAvroName(String name,
+      final Boolean doubleColonsToDoubleUnderscores) throws IOException {
+
+    if (name == null) {
+      return null;
+    }
+
+    if (doubleColonsToDoubleUnderscores) {
+      name = name.replace("::", "__");
+    }
+
+    if (name.matches("[A-Za-z_][A-Za-z0-9_]*")) {
+      return name;
+    } else {
+      throw new IOException(
+          "Pig Schema contains a name that is not allowed in Avro");
+    }
+  }
+
+  /**
+   * Returns a Union Schema composed of {@in} and null.
+   * @param in the schema to combine with null
+   * @return the new union schema
+   */
+  private static Schema createNullableUnion(final Schema in) {
+    return Schema.createUnion(Lists.newArrayList(Schema.create(Type.NULL), in));
+  }
+
+  /**
+   * Returns a Union Schema composed of {@in} and null.
+   * @param t the avro raw type to combine with null
+   * @return the new union schema
+   */
+  private static Schema createNullableUnion(final Type t) {
+    return createNullableUnion(Schema.create(t));
+  }
+
+  /**
+   * Creates a new Avro schema object from the input ResouceSchema.
+   * @param name object name
+   * @param nameSpace namespace for avro object
+   * @param type type of avro object
+   * @param description description for new avro object
+   * @param schema pig schema for the object
+   * @param definedRecordNames already defined record names
+   * @param doubleColonsToDoubleUnderscores whether to translate double
+   *   colons in Pig  names to underscores in avro names.
+   * @return the avro schema object
+   * @throws IOException If there is a translation error.
+   */
+  private static Schema resourceFieldSchemaToAvroSchema(
+      final String name, final String nameSpace,
+      final byte type, final String description,
+      final ResourceSchema schema,
+      final Map<String, List<Schema>> definedRecordNames,
+      final Boolean doubleColonsToDoubleUnderscores)
+          throws IOException {
+
+    switch (type) {
+    case DataType.BAG:
+      Schema innerBagSchema = resourceSchemaToAvroSchema(
+          schema.getFields()[0].getSchema(), name, null,
+          definedRecordNames,
+          doubleColonsToDoubleUnderscores);
+      return createNullableUnion(Schema.createArray(innerBagSchema));
+    case DataType.BIGCHARARRAY:
+      return createNullableUnion(Type.STRING);
+    case DataType.BOOLEAN:
+      return createNullableUnion(Type.BOOLEAN);
+    case DataType.BYTEARRAY:
+      Schema fixedSchema;
+      try {
+        fixedSchema = (new Schema.Parser()).parse(description);
+      } catch (Exception e) {
+        fixedSchema = null;
+      }
+      if (fixedSchema == null) {
+        return createNullableUnion(Type.BYTES);
+      } else {
+        return createNullableUnion(fixedSchema);
+      }
+    case DataType.CHARARRAY:
+      return createNullableUnion(Type.STRING);
+    case DataType.DATETIME:
+        return createNullableUnion(Type.LONG);
+    case DataType.DOUBLE:
+      return createNullableUnion(Type.DOUBLE);
+    case DataType.FLOAT:
+      return createNullableUnion(Type.FLOAT);
+    case DataType.INTEGER:
+      return createNullableUnion(Type.INT);
+    case DataType.LONG:
+      return createNullableUnion(Type.LONG);
+    case DataType.MAP:
+      byte innerType = schema.getFields()[0].getType();
+      String desc = schema.getFields()[0].getDescription();
+      if (desc.equals("autogenerated from Pig Field Schema")) {
+        desc = null;
+      }
+      Schema innerSchema;
+      if (DataType.isComplex(innerType)) {
+        innerSchema = createNullableUnion(
+            Schema.createMap(resourceSchemaToAvroSchema(
+                schema.getFields()[0].getSchema(),
+                name, nameSpace, definedRecordNames,
+                doubleColonsToDoubleUnderscores)));
+      } else {
+        innerSchema = createNullableUnion(
+            Schema.createMap(resourceFieldSchemaToAvroSchema(
+                name, nameSpace, innerType,
+                desc, null, definedRecordNames,
+                doubleColonsToDoubleUnderscores)));
+      }
+      return innerSchema;
+    case DataType.NULL:
+      return Schema.create(Type.NULL);
+    case DataType.TUPLE:
+      Schema returnSchema = createNullableUnion(
+          resourceSchemaToAvroSchema(schema, name, null,
+              definedRecordNames, doubleColonsToDoubleUnderscores));
+      if (definedRecordNames.containsKey(name)) {
+        List<Schema> schemaList = definedRecordNames.get(name);
+        boolean notfound = true;
+        for (Schema cachedSchema : schemaList) {
+          if (returnSchema.equals(cachedSchema)) {
+            notfound = false;
+          }
+          break;
+        }
+        if (notfound) {
+          returnSchema = createNullableUnion(resourceSchemaToAvroSchema(
+              schema, name + "_" + new Integer(schemaList.size()).toString(),
+              null, definedRecordNames, doubleColonsToDoubleUnderscores));
+          definedRecordNames.get(name).add(returnSchema);
+        }
+      } else {
+        definedRecordNames.put(name, Lists.newArrayList(returnSchema));
+      }
+      return returnSchema;
+    case DataType.BYTE:
+    case DataType.ERROR:
+    case DataType.GENERIC_WRITABLECOMPARABLE:
+    case DataType.INTERNALMAP:
+    case DataType.UNKNOWN:
+    default:
+      throw new IOException(
+          "Don't know how to encode type "
+              + DataType.findTypeName(type) + " in schema "
+              + ((schema == null) ? "" : schema.toString()) 
+              + "\n");
+    }
+  }
+
+  /**
+   * Checks to see if an avro schema is a combination of
+   * null and another object.
+   * @param s The object to check
+   * @return whether it's a nullable union
+   */
+  public static boolean isNullableUnion(final Schema s) {
+    return (
+        s.getType() == Type.UNION
+        && ((s.getTypes().size() == 1)
+            || (s.getTypes().size() == 2
+             && (s.getTypes().get(0).getType() == Type.NULL
+                || s.getTypes().get(1).getType() == Type.NULL))));
+  }
+
+  /**
+   * Given an input schema that is a union of an avro schema
+   * and null (or just a union with one type), return the avro schema.
+   * @param s The input schema object
+   * @return The non-null part of the union
+   */
+  public static Schema removeSimpleUnion(final Schema s) {
+    if (s.getType() == Type.UNION) {
+      List<Schema> types = s.getTypes();
+      for (Schema t : types) {
+        if (t.getType() != Type.NULL) {
+          return t;
+        }
+      }
+    }
+    return s;
+  }
+
+  /**
+   * Takes an Avro Schema and a Pig RequiredFieldList and returns a new schema
+   * with only the requried fields, or no if the function can't extract only
+   * those fields. Useful for push down projections.
+   * @param oldSchema The avro schema from which to extract the schema
+   * @param rfl the Pig required field list
+   * @return the new schema, or null
+   */
+  public static Schema newSchemaFromRequiredFieldList(
+      final Schema oldSchema, final RequiredFieldList rfl) {
+    return newSchemaFromRequiredFieldList(oldSchema, rfl.getFields());
+  }
+
+  /**
+   * Takes an Avro Schema and a Pig RequiredFieldList and returns a new schema
+   * with only the required fields, or no if the function can't extract only
+   * those fields. Useful for push down projections.
+   * @param oldSchema The avro schema from which to extract the schema
+   * @param rfl List of required fields
+   * @return the new schema
+   */
+  public static Schema newSchemaFromRequiredFieldList(
+      final Schema oldSchema, final List<RequiredField> rfl) {
+    List<Schema.Field> fields = Lists.newArrayList();
+    for (RequiredField rf : rfl) {
+      Schema.Field f = oldSchema.getField(rf.getAlias());
+      if (f == null) {
+        return null;
+      }
+      try {
+        if (getPigType(f.schema()) != rf.getType()) {
+          return null;
+        }
+      } catch (ExecException e) {
+        Log.warn("ExecException caught in newSchemaFromRequiredFieldList", e);
+        return null;
+      }
+      if (rf.getSubFields() == null) {
+        fields.add(
+            new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultValue()));
+      } else {
+        Schema innerSchema =
+            newSchemaFromRequiredFieldList(f.schema(), rf.getSubFields());
+        if (innerSchema == null) {
+          return null;
+        } else {
+          fields.add(
+              new Schema.Field(
+                  f.name(), innerSchema, f.doc(), f.defaultValue()));
+        }
+      }
+    }
+
+    Schema newSchema = Schema.createRecord(
+        oldSchema.getName(),
+        "subset of fields from " + oldSchema.getName()
+            + "; " + oldSchema.getDoc(),
+        oldSchema.getNamespace(), false);
+
+    newSchema.setFields(fields);
+    return newSchema;
+  }
+
+}

Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util.avro;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+/**
+ * Object that wraps an Avro object in a tuple.
+ * @param <T> The type of the Avro object
+ */
+public final class AvroTupleWrapper <T extends IndexedRecord>
+    implements Tuple {
+
+  /**
+   * The Avro object wrapped in the pig Tuple.
+   */
+  private T avroObject;
+
+  /**
+   * Creates a new AvroTupleWrapper object.
+   * @param o The object to wrap
+   */
+  public AvroTupleWrapper(final T o) {
+    avroObject = o;
+  }
+
+  @Override
+  public void write(final DataOutput o) throws IOException {
+    throw new IOException(
+        this.getClass().toString() + ".write called, but not implemented yet");
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public int compareTo(final Object o) {
+    if (o instanceof AvroTupleWrapper) {
+      return GenericData.get().compare(avroObject,
+          ((AvroTupleWrapper) o).avroObject,
+          avroObject.getSchema());
+    }
+    return -1;
+  }
+
+  @Override
+  public void append(final Object o) {
+    List<Field> fields = avroObject.getSchema().getFields();
+    avroObject.put(fields.size(), o);
+    Schema fieldSchema = null;
+    if (o instanceof String) {
+      fieldSchema = Schema.create(Type.STRING);
+    } else if (o instanceof Integer) {
+      fieldSchema = Schema.create(Type.INT);
+    } else if (o instanceof Long) {
+      fieldSchema = Schema.create(Type.LONG);
+    } else if (o instanceof Double) {
+      fieldSchema = Schema.create(Type.DOUBLE);
+    } else if (o instanceof Float) {
+      fieldSchema = Schema.create(Type.FLOAT);
+    } else if (o == null) {
+      fieldSchema = Schema.create(Type.NULL);
+    } else if (o instanceof Boolean) {
+      fieldSchema = Schema.create(Type.BOOLEAN);
+    } else if (o instanceof Map) {
+      fieldSchema = Schema.create(Type.MAP);
+    }
+    Field newField = new Field("FIELD_" + fields.size(), fieldSchema, "", null);
+    fields.add(newField);
+    avroObject.getSchema().setFields(fields);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Object get(final int pos) throws ExecException {
+
+    Schema s = avroObject.getSchema().getFields().get(pos).schema();
+    Object o = avroObject.get(pos);
+
+    switch(s.getType()) {
+      case STRING:
+        // unwrap avro UTF8 encoding
+        return o.toString();
+      case MAP:
+        return new AvroMapWrapper((Map<CharSequence, Object>) o);
+      case RECORD:
+        return new AvroTupleWrapper<T>((T) o);
+      case ENUM:
+        return o.toString();
+      case ARRAY:
+        return new AvroBagWrapper<GenericData.Record>(
+            (GenericArray<GenericData.Record>) o);
+      case FIXED:
+        return new DataByteArray(((GenericData.Fixed) o).bytes());
+      case BYTES:
+        return new DataByteArray(((ByteBuffer) o).array());
+      case UNION:
+        if (o instanceof org.apache.avro.util.Utf8) {
+          return o.toString();
+        } else if (o instanceof IndexedRecord) {
+          return new AvroTupleWrapper<T>((T) o);
+        } else if (o instanceof GenericArray) {
+          return new AvroBagWrapper<GenericData.Record>(
+              (GenericArray<GenericData.Record>) o);
+        } else if (o instanceof Map) {
+          return new AvroMapWrapper((Map<CharSequence, Object>) o);
+        } else if (o instanceof GenericData.Fixed) {
+          return new DataByteArray(((GenericData.Fixed) o).bytes());
+        } else if (o instanceof ByteBuffer) {
+          return new DataByteArray(((ByteBuffer) o).array());
+        }
+      default:
+        return o;
+    }
+
+  }
+
+  @Override
+  public List<Object> getAll() {
+
+    List<Object> all = Lists.newArrayList();
+    for (Schema.Field f : avroObject.getSchema().getFields()) {
+      try {
+        all.add(get(f.pos()));
+      } catch (ExecException e) {
+        LogFactory.getLog(getClass()).error(
+            "could not process tuple with contents " + avroObject, e);
+        return null;
+      }
+    }
+    return all;
+  }
+
+  @Override
+  public long getMemorySize() {
+    return getMemorySize(avroObject);
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private long getMemorySize(final IndexedRecord r) {
+    int total = 0;
+    final int bitsPerByte = 8;
+    for (Field f : r.getSchema().getFields()) {
+      switch (f.schema().getType()) {
+      case BOOLEAN:
+      case ENUM:
+      case INT:
+        total += Integer.SIZE << bitsPerByte;
+        break;
+      case DOUBLE:
+        total += Double.SIZE << bitsPerByte;
+        break;
+      case FLOAT:
+        total += Float.SIZE << bitsPerByte;
+        break;
+      case NULL:
+        break;
+      case STRING:
+        total += ((String) r.get(f.pos())).length()
+           * (Character.SIZE << bitsPerByte);
+        break;
+      case BYTES:
+        total += ((Byte[]) r.get(f.pos())).length;
+        break;
+      case RECORD:
+        total += new AvroTupleWrapper(
+            (IndexedRecord) r.get(f.pos())).getMemorySize();
+        break;
+      case ARRAY:
+        total += new AvroBagWrapper(
+            (GenericArray) r.get(f.pos())).getMemorySize();
+        break;
+      }
+    }
+    return total;
+  }
+
+
+
+  @Override
+  public byte getType(final int arg0) throws ExecException {
+    Schema s = avroObject.getSchema().getFields().get(arg0).schema();
+    return AvroStorageSchemaConversionUtilities.getPigType(s);
+  }
+
+  @Override
+  public boolean isNull(final int arg0) throws ExecException {
+    return avroObject == null || avroObject.get(arg0) == null;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void reference(final Tuple arg0) {
+    avroObject = (T) ((AvroTupleWrapper<T>) arg0).avroObject;
+  }
+
+  @Override
+  public void set(final int arg0, final Object arg1) throws ExecException {
+    avroObject.put(arg0, arg1);
+  }
+
+  @Override
+  public int size() {
+    return avroObject.getSchema().getFields().size();
+  }
+
+  @Override
+  public String toDelimitedString(final String arg0) throws ExecException {
+    StringBuffer delimitedString = new StringBuffer();
+    boolean notfirst = false;
+    for (Field f : avroObject.getSchema().getFields()) {
+      if (notfirst) {
+        delimitedString.append(arg0);
+        notfirst = true;
+      }
+      Object val = avroObject.get(f.pos());
+      if (val == null) {
+          delimitedString.append("");
+      } else {
+          delimitedString.append(val.toString());
+      }
+    }
+    return delimitedString.toString();
+  }
+
+  @Override
+  public void readFields(final DataInput d) throws IOException {
+    throw new IOException(
+        this.getClass().toString()
+        +  ".readFields called but not implemented yet");
+  }
+
+  @Override
+  public Iterator<Object> iterator() {
+    return Iterators.transform(avroObject.getSchema().getFields().iterator(),
+        new Function<Schema.Field, Object>() {
+            @Override
+            public Object apply(final Field f) {
+              return avroObject.get(f.pos());
+            }
+          }
+        );
+  }
+
+}

Added: pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java (added)
+++ pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,832 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.builtin;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.tool.DataFileWriteTool;
+import org.apache.avro.tool.Tool;
+import org.apache.avro.tool.TrevniCreateRandomTool;
+import org.apache.avro.tool.TrevniToJsonTool;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestAvroStorage {
+
+    final protected static Log LOG = LogFactory.getLog(TestAvroStorage.class);
+
+    final private static String basedir = "test/org/apache/pig/builtin/avro/";
+    final private static String outbasedir = System.getProperty("user.dir") + "/build/test/TestAvroStorage/";
+    final private static String[] datadir = {
+        "data/avro",
+        "data/avro/compressed",
+        "data/avro/compressed/snappy",
+        "data/avro/compressed/deflate",
+        "data/avro/uncompressed",
+        "data/avro/uncompressed/testdirectory",
+        "data/json/testdirectory",
+        "data/trevni",
+        "data/trevni/uncompressed",
+    };
+    final private static String[] avroSchemas = {
+        "arraysAsOutputByPig",
+        "arrays",
+        "recordsAsOutputByPig",
+        "recordsAsOutputByPigWithDates",
+        "records",
+        "recordsOfArrays",
+        "recordsOfArraysOfRecords",
+        "recordsSubSchema",
+        "recordsSubSchemaNullable",
+        "recordsWithDoubleUnderscores",
+        "recordsWithEnums",
+        "recordsWithFixed",
+        "recordsWithMaps",
+        "recordsWithMapsOfRecords",
+        "recordsWithNullableUnions",
+        "recordWithRepeatedSubRecords",
+        "recursiveRecord",
+        "projectionTest",
+        "recordsWithSimpleUnion",
+        "recordsWithSimpleUnionOutput",
+    };
+    final private static String[] trevniSchemas = {
+        "simpleRecordsTrevni",
+    };
+
+    private static PigServer pigServerLocal = null;
+
+    public static final PathFilter hiddenPathFilter = new PathFilter() {
+        public boolean accept(Path p) {
+          String name = p.getName();
+          return !name.startsWith("_") && !name.startsWith(".");
+        }
+      };
+
+    private static String loadFileIntoString(String file) throws IOException {
+      return FileUtils.readFileToString(new File(file)).replace("\n", " ");
+    }
+
+    @BeforeClass
+    public static void setup() throws ExecException, IOException {
+        pigServerLocal = new PigServer(ExecType.LOCAL);
+        deleteDirectory(new File(outbasedir));
+        generateInputFiles();
+    }
+
+    @AfterClass
+    public static void teardown() throws IOException {
+        if(pigServerLocal != null) {
+            pigServerLocal.shutdown();
+        }
+        deleteInputFiles();
+    }
+
+    /**
+     * Generate input files for test cases
+     */
+    private static void generateInputFiles() throws IOException {
+        String data;
+        String json;
+        String avro;
+        String trevni;
+        String schema;
+
+        for (String fn : datadir) {
+            FileUtils.forceMkdir(new File(basedir + fn));
+        }
+
+        for (String fn : avroSchemas) {
+            schema = basedir + "schema/" + fn + ".avsc";
+            json = basedir + "data/json/" + fn + ".json";
+            avro = basedir + "data/avro/uncompressed/" + fn + ".avro";
+            LOG.info("creating " + avro);
+            generateAvroFile(schema, json, avro, null);
+        }
+
+        for (String fn : trevniSchemas) {
+            schema = basedir + "schema/" + fn + ".avsc";
+            trevni = basedir + "data/trevni/uncompressed/" + fn + ".trevni";
+            json = basedir + "data/json/" + fn + ".json";
+            LOG.info("creating " + trevni);
+            generateRandomTrevniFile(schema, "1000", trevni);
+            LOG.info("creating " + json);
+            convertTrevniToJsonFile(schema, trevni, json);
+
+            schema = basedir + "schema/" + fn + ".avsc";
+            json = basedir + "data/json/" + fn + ".json";
+            avro = basedir + "data/avro/uncompressed/" + fn + ".avro";
+            LOG.info("creating " + avro);
+            generateAvroFile(schema, json, avro, null);
+        }
+
+        PrintWriter w;
+        int itemSum = 0;
+        int recordCount = 0;
+        int evenFileNameItemSum = 0;
+        int evenFileNameRecordCount = 0;
+
+        for (int i = 0; i < 8; i++) {
+            schema = basedir + "schema/testDirectory.avsc";
+            json = basedir + "data/json/testdirectory/part-m-0000" + i;
+            avro = basedir + "data/avro/uncompressed/testdirectory/part-m-0000" + i + ".avro";
+            LOG.info("creating " + json);
+            w = new PrintWriter(new FileWriter(json));
+            for (int j = i*1000; j < (i+1)*1000; j++) {
+                itemSum += j;
+                recordCount += 1;
+                evenFileNameItemSum += ((i+1) % 2) * j;
+                evenFileNameRecordCount += (i+1) % 2;
+                w.println("{\"item\" : " + j + ", \"timestamp\" : " + System.currentTimeMillis() +  " }\n");
+            }
+            w.close();
+            generateAvroFile(schema, json, avro, null);
+        }
+
+        schema = basedir + "schema/testDirectoryCounts.avsc";
+        json = basedir + "data/json/testDirectoryCounts.json";
+        avro = basedir + "data/avro/uncompressed/testDirectoryCounts.avro";
+        data = "{\"itemSum\" : {\"int\" : " + itemSum + "}, \"n\" : {\"int\" : " + recordCount + "} }";
+        LOG.info("creating " + json);
+        FileUtils.writeStringToFile(new File(json), data);
+        LOG.info("creating " + avro);
+        generateAvroFile(schema, json, avro, null);
+
+        schema = basedir + "schema/testDirectoryCounts.avsc";
+        json = basedir + "data/json/evenFileNameTestDirectoryCounts.json";
+        avro = basedir + "data/avro/uncompressed/evenFileNameTestDirectoryCounts.avro";
+        data = "{\"itemSum\" : {\"int\" : " + evenFileNameItemSum + "}, \"n\" : {\"int\" : " + evenFileNameRecordCount + "} }";
+        LOG.info("creating " + json);
+        FileUtils.writeStringToFile(new File(json), data);
+        LOG.info("creating " + avro);
+        generateAvroFile(schema, json, avro, null);
+
+        for (String codec : new String[] {"deflate", "snappy"}) {
+            for (String fn : new String[] {"records",  "recordsAsOutputByPig"}) {
+                schema = basedir + "schema/" + fn + ".avsc";
+                json = basedir + "data/json/" + fn + ".json";
+                avro = basedir + "data/avro/compressed/" + codec + "/" + fn + ".avro";
+                LOG.info("creating " + avro);
+                generateAvroFile(schema, json, avro, codec);
+            }
+        }
+    }
+
+    /**
+     * Clean up generated input files
+     */
+    private static void deleteInputFiles() throws IOException {
+        // Delete auto-generated directories
+        for (String fn : datadir) {
+            LOG.info("Deleting " + basedir + fn);
+            FileUtils.deleteQuietly(new File(basedir + fn));
+        }
+        // Delete auto-generated json files
+        String json;
+        for (String fn : trevniSchemas) {
+            json = basedir + "data/json/" + fn + ".json";
+            LOG.info("Deleting " + json);
+            FileUtils.deleteQuietly(new File(json));
+        }
+        json = basedir + "data/json/testDirectoryCounts.json";
+        LOG.info("Deleting " + json);
+        FileUtils.deleteQuietly(new File(json));
+        json = basedir + "data/json/evenFileNameTestDirectoryCounts.json";
+        LOG.info("Deleting " + json);
+        FileUtils.deleteQuietly(new File(json));
+    }
+
+    private static void generateAvroFile(String schema, String json, String avro, String codec) throws IOException {
+        Tool tool = new DataFileWriteTool();
+        List<String> args = new ArrayList<String>();
+        args.add("--schema-file");
+        args.add(schema);
+        args.add(json);
+        if (codec != null) {
+            args.add("--codec");
+            args.add(codec);
+        }
+        try {
+            StringBuffer sb = new StringBuffer();
+            for (String a : args) {
+                sb.append(a);
+                sb.append(" ");
+            }
+            PrintStream out = new PrintStream(avro);
+            tool.run(System.in, out, System.err, args);
+        } catch (Exception e) {
+            LOG.info("Could not generate avro file: " + avro, e);
+            throw new IOException();
+        }
+    }
+
+    private static void generateRandomTrevniFile(String schema, String count, String trevni) throws IOException {
+        Tool tool = new TrevniCreateRandomTool();
+        List<String> args = new ArrayList<String>();
+        args.add(schema);
+        args.add(count);
+        args.add(trevni);
+        try {
+            tool.run(System.in, System.out, System.err, args);
+        } catch (Exception e) {
+            LOG.info("Could not generate trevni file: " + trevni, e);
+            throw new IOException();
+        }
+    }
+
+    private static void convertTrevniToJsonFile(String schema, String trevni, String json) throws IOException {
+        Tool tool = new TrevniToJsonTool();
+        List<String> args = new ArrayList<String>();
+        args.add(trevni);
+        try {
+            PrintStream out = new PrintStream(json);
+            tool.run(System.in, out, System.err, args);
+        } catch (Exception e) {
+            LOG.info("Could not generate json file: " + json, e);
+            throw new IOException();
+        }
+    }
+
+    private String createOutputName() {
+        final StackTraceElement[] st = Thread.currentThread().getStackTrace();
+        return outbasedir + st[2].getMethodName();
+    }
+
+    @Test public void testLoadRecords() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/records.avro";
+      final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
+      testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_1", "records",
+              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadRecordsWithSimpleUnion() throws Exception {
+        final String input = basedir + "data/avro/uncompressed/recordsWithSimpleUnion.avro";
+        final String check = basedir + "data/avro/uncompressed/recordsWithSimpleUnionOutput.avro";
+        testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
+            ImmutableMap.of(
+                 "INFILE",            input,
+                 "AVROSTORAGE_OUT_1", "records",
+                 "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin -f " + basedir + "schema/recordsSubSchema.avsc",
+                 "OUTFILE",           createOutputName())
+          );
+        verifyResults(createOutputName(),check);
+      }
+
+    
+    @Test public void testProjection() throws Exception {
+        final String input = basedir + "data/avro/uncompressed/records.avro";
+        final String check = basedir + "data/avro/uncompressed/projectionTest.avro";
+        testAvroStorage(true, basedir + "code/pig/projection_test.pig",
+            ImmutableMap.of(
+                "INFILE",           input,
+                "AVROSTORAGE_OUT_1", "projectionTest",
+                "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+                "OUTFILE",          createOutputName())
+          );
+        verifyResults(createOutputName(),check);
+      }
+
+    
+    @Test public void testDates() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/records.avro";
+      final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPigWithDates.avro";
+      testAvroStorage(true, basedir + "code/pig/with_dates.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_1", "records",
+              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+    
+    @Test public void testLoadRecordsSpecifyFullSchema() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/records.avro";
+      final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
+      final String schema = loadFileIntoString(basedir + "schema/records.avsc");
+      testAvroStorage(true, basedir + "code/pig/identity_ai1_ao2.pig",
+          ImmutableMap.of(
+               "INFILE",            input,
+               "OUTFILE",           createOutputName(),
+               "AVROSTORAGE_OUT_1", "records",
+               "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+               "AVROSTORAGE_IN_1",  schema)
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadRecordsSpecifyFullSchemaFromFile() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/records.avro";
+      final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
+      testAvroStorage(true, basedir + "code/pig/identity.pig",
+          ImmutableMap.of(
+               "INFILE",            input,
+               "OUTFILE",           createOutputName(),
+               "AVROSTORAGE_OUT_1", "records",
+               "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+               "AVROSTORAGE_IN_2",  "-f " + basedir + "schema/records.avsc")
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadRecordsSpecifySubSchema() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/records.avro";
+      final String check = basedir + "data/avro/uncompressed/recordsSubSchema.avro";
+      testAvroStorage(true, basedir + "code/pig/identity_ai1_ao2.pig",
+          ImmutableMap.of(
+               "INFILE",            input,
+               "OUTFILE",           createOutputName(),
+               "AVROSTORAGE_OUT_1", "records",
+               "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin -f " + basedir + "schema/recordsSubSchema.avsc",
+               "AVROSTORAGE_IN_1",  loadFileIntoString(basedir + "schema/recordsSubSchema.avsc"))
+        );
+      verifyResults(createOutputName(),check);
+    }    
+    
+    @Test public void testLoadRecordsSpecifySubSchemaFromFile() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/records.avro";
+      final String check = basedir + "data/avro/uncompressed/recordsSubSchema.avro";
+      testAvroStorage(true, basedir + "code/pig/identity_blank_first_args.pig",
+          ImmutableMap.of(
+               "INFILE",            input,
+               "OUTFILE",           createOutputName(),
+               "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsSubSchema.avsc",
+               "AVROSTORAGE_IN_2",  "-f " + basedir + "schema/recordsSubSchema.avsc")
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadRecordsSpecifySubSchemaFromExampleFile() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/records.avro";
+      final String check = basedir + "data/avro/uncompressed/recordsSubSchema.avro";
+      testAvroStorage(true, basedir + "code/pig/identity_blank_first_args.pig",
+          ImmutableMap.of(
+               "INFILE",            input,
+               "OUTFILE",           createOutputName(),
+               "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsSubSchema.avsc",
+               "AVROSTORAGE_IN_2",  "-e " + basedir + "data/avro/uncompressed/recordsSubSchema.avro")
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadRecordsOfArrays() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/recordsOfArrays.avro";
+      final String check = input;
+      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+          ImmutableMap.of(
+              "INFILE",             input,
+              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsOfArrays.avsc",
+              "OUTFILE",            createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadRecordsOfArraysOfRecords() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/recordsOfArraysOfRecords.avro";
+      final String check = input;
+      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+          ImmutableMap.of(
+              "INFILE",             input,
+              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsOfArraysOfRecords.avsc",
+              "OUTFILE",            createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadRecordsWithEnums() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/recordsWithEnums.avro";
+      final String check = input;
+      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+          ImmutableMap.of(
+              "INFILE",             input,
+              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithEnums.avsc",
+              "OUTFILE",            createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadRecordsWithFixed() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/recordsWithFixed.avro";
+      final String check = input;
+      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+          ImmutableMap.of(
+              "INFILE",             input,
+              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithFixed.avsc",
+              "OUTFILE",            createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadRecordsWithMaps() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/recordsWithMaps.avro";
+      final String check = input;
+      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+          ImmutableMap.of(
+              "INFILE",             input,
+              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithMaps.avsc",
+              "OUTFILE",            createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadRecordsWithMapsOfRecords() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/recordsWithMapsOfRecords.avro";
+      final String check = input;
+      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+          ImmutableMap.of(
+              "INFILE",             input,
+              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithMapsOfRecords.avsc",
+              "OUTFILE",            createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadRecordsWithNullableUnions() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/recordsWithNullableUnions.avro";
+      final String check = input;
+      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+          ImmutableMap.of(
+              "INFILE",             input,
+              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithNullableUnions.avsc",
+              "OUTFILE",            createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadDeflateCompressedRecords() throws Exception {
+      final String input = basedir + "data/avro/compressed/deflate/records.avro";
+      final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
+      testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_1", "records",
+              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadSnappyCompressedRecords() throws Exception {
+      final String input = basedir + "data/avro/compressed/snappy/records.avro";
+      final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
+      testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_1", "records",
+              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testStoreDeflateCompressedRecords() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/records.avro";
+      final String check = basedir + "data/avro/compressed/deflate/recordsAsOutputByPig.avro";
+      testAvroStorage(true, basedir + "code/pig/identity_codec.pig",
+          ImmutableMap.<String,String>builder()
+            .put("INFILE",input)
+            .put("OUTFILE",createOutputName())
+            .put("AVROSTORAGE_OUT_1", "records")
+            .put("AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin")
+            .put("CODEC", "deflate")
+            .put("LEVEL", "6")
+            .build()
+           );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testStoreSnappyCompressedRecords() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/records.avro";
+      final String check = basedir + "data/avro/compressed/snappy/recordsAsOutputByPig.avro";
+      testAvroStorage(true, basedir + "code/pig/identity_codec.pig",
+          ImmutableMap.<String,String>builder()
+          .put("INFILE",input)
+          .put("OUTFILE",createOutputName())
+          .put("AVROSTORAGE_OUT_1", "records")
+          .put("AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin")
+          .put("CODEC", "snappy")
+          .put("LEVEL", "6")
+          .build()
+         );
+     verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadRecursiveRecords() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/recursiveRecord.avro";
+      testAvroStorage(false, basedir + "code/pig/recursive_tests.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_IN_2", "-n this.is.ignored.in.a.loadFunc",
+              "AVROSTORAGE_OUT_1", "recordsSubSchemaNullable",
+              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+              "OUTFILE",          createOutputName())
+        );
+    }
+
+    @Test public void testLoadRecursiveRecordsOptionOn() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/recursiveRecord.avro";
+      final String check = basedir + "data/avro/uncompressed/recordsSubSchemaNullable.avro";
+      testAvroStorage(true, basedir + "code/pig/recursive_tests.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_IN_2", "-r",
+              "AVROSTORAGE_OUT_1", "recordsSubSchemaNullable",
+              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadRecordsWithRepeatedSubRecords() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
+      final String check = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
+      testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordWithRepeatedSubRecords.avsc",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadDirectory() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/testdirectory";
+      final String check = basedir + "data/avro/uncompressed/testDirectoryCounts.avro";
+      testAvroStorage(true, basedir + "code/pig/directory_test.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_1", "stats",
+              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadGlob() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/testdirectory/part-m-0000*";
+      final String check = basedir + "data/avro/uncompressed/testDirectoryCounts.avro";
+      testAvroStorage(true, basedir + "code/pig/directory_test.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_1", "stats",
+              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testPartialLoadGlob() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/testdirectory/part-m-0000{0,2,4,6}.avro";
+      final String check = basedir + "data/avro/uncompressed/evenFileNameTestDirectoryCounts.avro";
+      testAvroStorage(true, basedir + "code/pig/directory_test.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_1", "stats",
+              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testDoubleUnderscore() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/records.avro";
+      final String check = basedir + "data/avro/uncompressed/recordsWithDoubleUnderscores.avro";
+      testAvroStorage(true, basedir + "code/pig/namesWithDoubleColons.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_1", "recordsWithDoubleUnderscores",
+              "AVROSTORAGE_OUT_2", "-d -n org.apache.pig.test.builtin",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testDoubleUnderscoreNoFlag() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/records.avro";
+      testAvroStorage(false, basedir + "code/pig/namesWithDoubleColons.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_1", "recordsWithDoubleUnderscores",
+              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+              "OUTFILE",          createOutputName())
+        );
+    }
+
+    @Test public void testLoadArrays() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/arrays.avro";
+      final String check = basedir + "data/avro/uncompressed/arraysAsOutputByPig.avro";
+      testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_1", "arrays",
+              "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadTrevniRecords() throws Exception {
+      final String input = basedir + "data/trevni/uncompressed/simpleRecordsTrevni.trevni";
+      final String check = basedir + "data/avro/uncompressed/simpleRecordsTrevni.avro";
+      testAvroStorage(true, basedir + "code/pig/trevni_to_avro.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/simpleRecordsTrevni.avsc",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test public void testLoadAndSaveTrevniRecords() throws Exception {
+      final String input = basedir + "data/trevni/uncompressed/simpleRecordsTrevni.trevni";
+      final String check = basedir + "data/avro/uncompressed/simpleRecordsTrevni.avro";
+
+      testAvroStorage(true, basedir + "code/pig/trevni_to_trevni.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/simpleRecordsTrevni.avsc",
+              "OUTFILE",          createOutputName() + "Trevni")
+        );
+
+      testAvroStorage(true, basedir + "code/pig/trevni_to_avro.pig",
+          ImmutableMap.of(
+              "INFILE",           createOutputName() + "Trevni",
+              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/simpleRecordsTrevni.avsc",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(), check);
+    }
+
+    private static void deleteDirectory (File path) {
+        if ( path.exists()) {
+            File [] files = path.listFiles();
+            for (File file: files) {
+                if (file.isDirectory())
+                    deleteDirectory(file);
+                file.delete();
+            }
+        }
+    }
+
+    private void testAvroStorage(boolean expectedToSucceed, String scriptFile, Map<String,String> parameterMap) throws IOException {
+        pigServerLocal.setBatchOn();
+
+        int numOfFailedJobs = 0;
+
+        try {
+          pigServerLocal.registerScript(scriptFile, parameterMap);
+          for (ExecJob job : pigServerLocal.executeBatch()) {
+            if (job.getStatus().equals(JOB_STATUS.FAILED)) {
+                  numOfFailedJobs++;
+              }
+          }
+        } catch (Exception e) {
+          System.err.printf("Exception caught in testAvroStorage: %s\n", e);
+          numOfFailedJobs++;
+        }
+
+        if (expectedToSucceed) {
+          assertTrue("There was a failed job!", numOfFailedJobs == 0);
+        } else {
+          assertTrue("There was no failed job!", numOfFailedJobs > 0);
+        }
+    }
+
+    private void verifyResults(String outPath, String expectedOutpath) throws IOException {
+        verifyResults(outPath, expectedOutpath, null);
+    }
+
+    private void verifyResults(String outPath, String expectedOutpath, String expectedCodec) throws IOException {
+        FileSystem fs = FileSystem.getLocal(new Configuration()) ;
+
+        /* read in expected results*/
+        Set<GenericData.Record> expected = getExpected (expectedOutpath);
+
+        /* read in output results and compare */
+        Path output = new Path(outPath);
+        assertTrue("Output dir does not exists!", fs.exists(output)
+                && fs.getFileStatus(output).isDir());
+
+        Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
+        assertTrue("Split field dirs not found!", paths != null);
+
+        for (Path path : paths) {
+          Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
+          assertTrue("No files found for path: " + path.toUri().getPath(),
+                  files != null);
+          for (Path filePath : files) {
+            assertTrue("This shouldn't be a directory", fs.isFile(filePath));
+
+            GenericDatumReader<GenericData.Record> reader = new GenericDatumReader<GenericData.Record>();
+
+            DataFileStream<GenericData.Record> in = new DataFileStream<GenericData.Record>(
+                                            fs.open(filePath), reader);
+            assertEquals("codec", expectedCodec, in.getMetaString("avro.codec"));
+            int count = 0;
+            while (in.hasNext()) {
+                GenericData.Record obj = in.next();
+                assertTrue("Avro result object found that's not expected: Found " 
+                        + (obj != null ? obj.getSchema() : "null") + ", " + obj.toString()
+                        + "\nExpected " + (expected != null ? expected.toString() : "null") + "\n"
+                        , expected.contains(obj));
+                count++;
+            }
+            in.close();
+            assertEquals(expected.size(), count);
+          }
+        }
+      }
+
+    private Set<GenericData.Record> getExpected (String pathstr ) throws IOException {
+
+        Set<GenericData.Record> ret = new TreeSet<GenericData.Record>(
+                new Comparator<GenericData.Record>() {
+                    @Override
+                    public int compare(Record o1, Record o2) {
+                        return o1.toString().compareTo(o2.toString());
+                    }}
+                );
+        FileSystem fs = FileSystem.getLocal(new Configuration());
+
+        /* read in output results and compare */
+        Path output = new Path(pathstr);
+        assertTrue("Expected output does not exists!", fs.exists(output));
+
+        Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
+        assertTrue("Split field dirs not found!", paths != null);
+
+        for (Path path : paths) {
+            Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
+            assertTrue("No files found for path: " + path.toUri().getPath(), files != null);
+            for (Path filePath : files) {
+                assertTrue("This shouldn't be a directory", fs.isFile(filePath));
+
+                GenericDatumReader<GenericData.Record> reader = new GenericDatumReader<GenericData.Record>();
+
+                DataFileStream<GenericData.Record> in = new DataFileStream<GenericData.Record>(fs.open(filePath), reader);
+
+                while (in.hasNext()) {
+                    GenericData.Record obj = in.next();
+                    ret.add(obj);
+                }
+                in.close();
+            }
+        }
+        return ret;
+  }
+
+}
+

Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/directory_test.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/directory_test.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/directory_test.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/directory_test.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,6 @@
+in = LOAD '$INFILE' USING AvroStorage;
+out = FOREACH (GROUP in ALL) GENERATE 
+    (int) SUM(in.item) as itemSum:int,
+    (int) COUNT_STAR(in) as n:int;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage('','$AVROSTORAGE_IN_2');
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');

Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ai1_ao2.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ai1_ao2.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ai1_ao2.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ai1_ao2.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage('$AVROSTORAGE_IN_1');
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ao2.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ao2.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ao2.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ao2.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage();
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_blank_first_args.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_blank_first_args.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_blank_first_args.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_blank_first_args.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage('','$AVROSTORAGE_IN_2');
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('','$AVROSTORAGE_OUT_2');
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_codec.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_codec.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_codec.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_codec.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,6 @@
+SET avro.output.codec $CODEC
+SET avro.mapred.deflate.level $LEVEL
+in = LOAD '$INFILE' USING AvroStorage();
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');

Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_just_ao2.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_just_ao2.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_just_ao2.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_just_ao2.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage();
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('','$AVROSTORAGE_OUT_2');
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/namesWithDoubleColons.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/namesWithDoubleColons.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/namesWithDoubleColons.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/namesWithDoubleColons.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,7 @@
+in = LOAD '$INFILE' USING AvroStorage('','');
+step1 = FOREACH in GENERATE TOTUPLE(key) as A, TOTUPLE(intValue) as C;
+step2 = FOREACH step1 GENERATE A, TOTUPLE(C) as B;
+step3 = FOREACH step2 GENERATE FLATTEN(A), FLATTEN(B);
+out = FOREACH step3 GENERATE A::key, FLATTEN(B::C);
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/projection_test.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/projection_test.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/projection_test.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/projection_test.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage();
+out = FOREACH in GENERATE $0, $1, $2;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/recursive_tests.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/recursive_tests.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/recursive_tests.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/recursive_tests.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage('', '$AVROSTORAGE_IN_2');
+out = FOREACH in GENERATE $0, $1;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1', '$AVROSTORAGE_OUT_2');
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_avro.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_avro.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_avro.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_avro.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING TrevniStorage;
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('','$AVROSTORAGE_OUT_2');
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_trevni.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_trevni.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_trevni.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_trevni.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING TrevniStorage;
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING TrevniStorage('','$AVROSTORAGE_OUT_2');
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/with_dates.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/with_dates.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/with_dates.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/with_dates.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage();
+out = FOREACH in GENERATE *, ToDate('2013-05-01Z', 'yyyy-MM-ddZ') AS date;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/data/json/arrays.json
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/data/json/arrays.json?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/data/json/arrays.json (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/data/json/arrays.json Fri Jun 28 22:54:05 2013
@@ -0,0 +1,3 @@
+[1, 2, 3, 4, 5]
+[6]
+[]
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/data/json/arraysAsOutputByPig.json
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/data/json/arraysAsOutputByPig.json?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/data/json/arraysAsOutputByPig.json (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/data/json/arraysAsOutputByPig.json Fri Jun 28 22:54:05 2013
@@ -0,0 +1,23 @@
+{"array" : 
+    {"array" :
+        [
+            {"array_0" : {"int" : 1}},
+            {"array_0" : {"int" : 2}},
+            {"array_0" : {"int" : 3}},
+            {"array_0" : {"int" : 4}},
+            {"array_0" : {"int" : 5}}
+        ]
+    }
+}
+{"array" : {
+    "array" :
+        [
+            {"array_0" : {"int" : 6}}
+        ]
+    }
+}
+{"array" : {
+    "array" :
+        []
+    }
+}
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/data/json/projectionTest.json
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/data/json/projectionTest.json?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/data/json/projectionTest.json (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/data/json/projectionTest.json Fri Jun 28 22:54:05 2013
@@ -0,0 +1,15 @@
+{
+ "key" : "A",
+ "intValue" : 1,
+ "longValue" : 1
+}
+{
+ "key" : "B",
+ "intValue" : 2,
+ "longValue" : 2
+}
+{
+ "key" : "C",
+ "intValue" : 3,
+ "longValue" : 3
+}
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordWithRepeatedSubRecords.json
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordWithRepeatedSubRecords.json?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordWithRepeatedSubRecords.json (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordWithRepeatedSubRecords.json Fri Jun 28 22:54:05 2013
@@ -0,0 +1,12 @@
+{"key" : "stuff in closet",
+ "value1" : {"thing" : "hat", "count" : 7},
+ "value2" : {"thing" : "coat", "count" : 2}
+ }
+{"key" : "stuff on desk",
+ "value1" : {"thing" : "stapler", "count" : 1},
+ "value2" : {"thing" : "PC", "count" : 0}
+ }
+{"key" : "examples",
+ "value1" : {"thing" : "interesting", "count" : 0},
+ "value2" : {"thing" : "dull", "count" : 10}
+ }
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/data/json/records.json
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/data/json/records.json?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/data/json/records.json (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/data/json/records.json Fri Jun 28 22:54:05 2013
@@ -0,0 +1,30 @@
+{
+ "key" : "A",
+ "intValue" : 1,
+ "longValue" : 1,
+ "booleanValue" : true,
+ "floatValue" : 1.0,
+ "doubleValue" : 1.0,
+ "bytesValue" : "\u00FF",
+ "nullValue" : null
+}
+{
+ "key" : "B",
+ "intValue" : 2,
+ "longValue" : 2,
+ "booleanValue" : true,
+ "floatValue" : 2.0,
+ "doubleValue" : 2.0,
+ "bytesValue" : "\u00FE",
+ "nullValue" : null
+}
+{
+ "key" : "C",
+ "intValue" : 3,
+ "longValue" : 3,
+ "booleanValue" : false,
+ "floatValue" : 3.0,
+ "doubleValue" : 3.0,
+ "bytesValue" : "\u00FD",
+ "nullValue" : null
+}
\ No newline at end of file

Added: pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsAsOutputByPig.json
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsAsOutputByPig.json?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsAsOutputByPig.json (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsAsOutputByPig.json Fri Jun 28 22:54:05 2013
@@ -0,0 +1,30 @@
+{
+ "key" : {"string": "A"},
+ "intValue" : {"int" : 1},
+ "longValue" : {"long" : 1},
+ "booleanValue" : {"boolean" : true},
+ "floatValue" : {"float" : 1.0},
+ "doubleValue" : {"double" : 1.0},
+ "bytesValue" : {"bytes" : "\u00FF"},
+ "nullValue" : null
+}
+{
+ "key" : {"string": "B"},
+ "intValue" : {"int" : 2},
+ "longValue" : {"long" :2},
+ "booleanValue" : {"boolean" : true},
+ "floatValue" : {"float" : 2.0},
+ "doubleValue" : {"double" : 2.0},
+ "bytesValue" : {"bytes" : "\u00FE"},
+ "nullValue" : null
+}
+{
+ "key" : {"string": "C"},
+ "intValue" : {"int" : 3},
+ "longValue" : {"long" :3},
+ "booleanValue" : {"boolean" : false},
+ "floatValue" : {"float" : 3.0},
+ "doubleValue" : {"double" : 3.0},
+ "bytesValue" : {"bytes" : "\u00FD"},
+ "nullValue" : null
+}
\ No newline at end of file