You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/06/29 00:54:06 UTC
svn commit: r1497954 [2/3] - in /pig/trunk: ./ .eclipse.templates/ ivy/
src/docs/src/documentation/content/xdocs/ src/org/apache/pig/builtin/
src/org/apache/pig/impl/util/avro/ test/ test/org/apache/pig/builtin/
test/org/apache/pig/builtin/avro/ test/o...
Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.pig.impl.util.avro;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.joda.time.DateTime;
+
+/**
+ * Utility classes for AvroStorage; contains static methods
+ * for converting between Avro and Pig objects.
+ *
+ */
+public class AvroStorageDataConversionUtilities {
+
+ /**
+ * Packs a Pig Tuple into an Avro record.
+ * @param t the Pig tuple to pack into the avro object
+ * @param s The avro schema for which to determine the type
+ * @return the avro record corresponding to the input tuple
+ * @throws IOException
+ */
+ public static GenericData.Record packIntoAvro(final Tuple t, final Schema s)
+ throws IOException {
+
+ try {
+ GenericData.Record record = new GenericData.Record(s);
+ for (Field f : s.getFields()) {
+ Object o = t.get(f.pos());
+ Schema innerSchema = f.schema();
+ if (AvroStorageSchemaConversionUtilities.isNullableUnion(innerSchema)) {
+ if (o == null) {
+ record.put(f.pos(), null);
+ continue;
+ }
+ innerSchema = AvroStorageSchemaConversionUtilities
+ .removeSimpleUnion(innerSchema);
+ }
+ switch(innerSchema.getType()) {
+ case RECORD:
+ record.put(f.pos(), packIntoAvro((Tuple) o, innerSchema));
+ break;
+ case ARRAY:
+ record.put(f.pos(), packIntoAvro((DataBag) o, innerSchema));
+ break;
+ case BYTES:
+ record.put(f.pos(), ByteBuffer.wrap(((DataByteArray) o).get()));
+ break;
+ case FIXED:
+ record.put(f.pos(), new GenericData.Fixed(
+ innerSchema, ((DataByteArray) o).get()));
+ break;
+ default:
+ if (t.getType(f.pos()) == DataType.DATETIME) {
+ record.put(f.pos(), ((DateTime) o).getMillis() );
+ } else {
+ record.put(f.pos(), o);
+ }
+ }
+ }
+ return record;
+ } catch (Exception e) {
+ throw new IOException(
+ "exception in AvroStorageDataConversionUtilities.packIntoAvro", e);
+ }
+ }
+
+ /**
+ * Packs a Pig DataBag into an Avro array.
+ * @param db the Pig databad to pack into the avro array
+ * @param s The avro schema for which to determine the type
+ * @return the avro array corresponding to the input bag
+ * @throws IOException
+ */
+ public static GenericData.Array<Object> packIntoAvro(
+ final DataBag db, final Schema s) throws IOException {
+
+ try {
+ GenericData.Array<Object> array
+ = new GenericData.Array<Object>(new Long(db.size()).intValue(), s);
+ for (Tuple t : db) {
+ if (s.getElementType() != null
+ && s.getElementType().getType() == Type.RECORD) {
+ array.add(packIntoAvro(t, s.getElementType()));
+ } else if (t.size() == 1) {
+ array.add(t.get(0));
+ } else {
+ throw new IOException(
+ "AvroStorageDataConversionUtilities.packIntoAvro: Can't pack "
+ + t + " into schema " + s);
+ }
+ }
+ return array;
+ } catch (Exception e) {
+ throw new IOException(
+ "exception in AvroStorageDataConversionUtilities.packIntoAvro", e);
+ }
+ }
+
+
+}
Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util.avro;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.pig.LoadPushDown.RequiredField;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.mortbay.log.Log;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Static methods for converting from Avro Schema object to Pig Schema objects,
+ * and vice versa.
+ */
+public class AvroStorageSchemaConversionUtilities {
+
+ /**
+ * Determines the pig object type of the Avro schema.
+ * @param s The avro schema for which to determine the type
+ * @return the byte representing the schema type
+ * @throws ExecException
+ * @see org.apache.avro.Schema.Type
+ */
+ public static byte getPigType(final Schema s) throws ExecException {
+ switch (s.getType()) {
+ case ARRAY:
+ return DataType.BAG;
+ case BOOLEAN:
+ return DataType.BOOLEAN;
+ case BYTES:
+ return DataType.BYTEARRAY;
+ case DOUBLE:
+ return DataType.DOUBLE;
+ case ENUM:
+ return DataType.CHARARRAY;
+ case FIXED:
+ return DataType.BYTEARRAY;
+ case FLOAT:
+ return DataType.FLOAT;
+ case INT:
+ return DataType.INTEGER;
+ case LONG:
+ return DataType.LONG;
+ case MAP:
+ return DataType.MAP;
+ case NULL:
+ return DataType.NULL;
+ case RECORD:
+ return DataType.TUPLE;
+ case STRING:
+ return DataType.CHARARRAY;
+ case UNION:
+ List<Schema> types = s.getTypes();
+ if (types.size() == 1) {
+ return getPigType(types.get(0));
+ } else if (types.size() == 2 && types.get(0).getType() == Type.NULL) {
+ return getPigType(types.get(1));
+ } else if (types.size() == 2 && types.get(1).getType() == Type.NULL) {
+ return getPigType(types.get(0));
+ } else if (isUnionOfSimpleTypes(s)) {
+ return DataType.BYTEARRAY;
+ }
+ throw new ExecException(
+ "Currently only supports element unions of a type and null (" + s.toString() +")");
+ default:
+ throw new ExecException("Unknown type: " + s.getType().toString());
+ }
+ }
+
+ public static boolean isUnionOfSimpleTypes(Schema s) {
+ List<Schema> types = s.getTypes();
+ if (types == null) {
+ return false;
+ }
+ for (Schema subSchema : types) {
+ switch (subSchema.getType()) {
+ case BOOLEAN:
+ case BYTES:
+ case DOUBLE:
+ case ENUM:
+ case FIXED:
+ case FLOAT:
+ case INT:
+ case LONG:
+ case NULL:
+ case STRING:
+ continue;
+ default:
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Translates an Avro schema to a Resource Schema (for Pig).
+ * @param s The avro schema for which to determine the type
+ * @param allowRecursiveSchema Flag indicating whether to
+ * throw an error if a recursive schema definition is found
+ * @throws IOException
+ * @return the corresponding pig schema
+ */
+ public static ResourceSchema avroSchemaToResourceSchema(
+ final Schema s, final Boolean allowRecursiveSchema)
+ throws IOException {
+ return avroSchemaToResourceSchema(s, Sets.<Schema> newHashSet(),
+ Maps.<String, ResourceSchema> newHashMap(),
+ allowRecursiveSchema);
+ }
+
+ /**
+ * Translates a field schema (avro) to a resource field schema (pig).
+ * @param f
+ * The avro schema field for which to determine the type
+ * @param namesInStack
+ * Set of already defined object names
+ * @param alreadyDefinedSchemas
+ * Map of schema names to resourceschema objects
+ * @param allowRecursiveSchema
+ * controls whether to throw an erro if the schema is recursive
+ * @throws IOException
+ * @return the corresponding pig resource schema field
+ */
+ private static ResourceSchema.ResourceFieldSchema fieldToResourceFieldSchema(
+ final Field f, final Set<Schema> schemasInStack,
+ final Map<String, ResourceSchema> alreadyDefinedSchemas,
+ final Boolean allowRecursiveSchema) throws IOException {
+
+ ResourceSchema.ResourceFieldSchema rf =
+ new ResourceSchema.ResourceFieldSchema();
+ rf.setName(f.name());
+ Schema fieldSchema = f.schema();
+ if (isNullableUnion(fieldSchema)) {
+ fieldSchema = removeSimpleUnion(fieldSchema);
+ }
+ // if this is a fixed schema, save the schema into the description to
+ // use later when writing out the field
+ if (fieldSchema.getType() == Type.FIXED) {
+ rf.setDescription(fieldSchema.toString());
+ } else {
+ rf.setDescription(f.doc());
+ }
+ byte pigType = getPigType(fieldSchema);
+ rf.setType(pigType);
+ switch (pigType) {
+ case DataType.BAG: {
+ ResourceSchema bagSchema = new ResourceSchema();
+ ResourceSchema.ResourceFieldSchema[] bagSchemaFields
+ = new ResourceSchema.ResourceFieldSchema[1];
+ bagSchemaFields[0] = new ResourceSchema.ResourceFieldSchema();
+ bagSchemaFields[0].setType(DataType.TUPLE);
+ bagSchemaFields[0].setDescription(fieldSchema.getDoc());
+ ResourceSchema innerResourceSchema = null;
+ Schema elementSchema = fieldSchema.getElementType();
+ if (isNullableUnion(elementSchema)) {
+ elementSchema = removeSimpleUnion(elementSchema);
+ }
+ switch (elementSchema.getType()) {
+ case RECORD:
+ case MAP:
+ case ARRAY:
+ innerResourceSchema = avroSchemaToResourceSchema(elementSchema,
+ schemasInStack, alreadyDefinedSchemas, allowRecursiveSchema);
+ bagSchemaFields[0].setName(elementSchema.getName());
+ break;
+ case UNION:
+ throw new IOException(
+ "Pig cannot translate avro schemas for complex unions");
+ default:
+ innerResourceSchema = new ResourceSchema();
+ ResourceSchema.ResourceFieldSchema[] tupleSchemaFields =
+ new ResourceSchema.ResourceFieldSchema[1];
+ tupleSchemaFields[0] = new ResourceSchema.ResourceFieldSchema();
+ tupleSchemaFields[0].setType(getPigType(elementSchema));
+ innerResourceSchema.setFields(tupleSchemaFields);
+ }
+
+ bagSchemaFields[0].setSchema(innerResourceSchema);
+ bagSchema.setFields(bagSchemaFields);
+ rf.setSchema(bagSchema);
+ }
+ break;
+ case DataType.MAP: {
+ Schema mapAvroSchema = fieldSchema.getValueType();
+ if (isNullableUnion(mapAvroSchema)) {
+ mapAvroSchema = removeSimpleUnion(mapAvroSchema);
+ }
+ ResourceSchema mapSchema = new ResourceSchema();
+ ResourceSchema.ResourceFieldSchema[] mapSchemaFields =
+ new ResourceSchema.ResourceFieldSchema[1];
+ if (mapAvroSchema.getType() == Type.RECORD) {
+ ResourceSchema innerResourceSchema =
+ avroSchemaToResourceSchema(fieldSchema.getValueType(), schemasInStack,
+ alreadyDefinedSchemas, allowRecursiveSchema);
+ mapSchemaFields[0] = new ResourceSchema.ResourceFieldSchema();
+ mapSchemaFields[0].setType(DataType.TUPLE);
+ mapSchemaFields[0].setName(mapAvroSchema.getName());
+ mapSchemaFields[0].setSchema(innerResourceSchema);
+ mapSchemaFields[0].setDescription(fieldSchema.getDoc());
+ } else {
+ mapSchemaFields[0] = new ResourceSchema.ResourceFieldSchema();
+ mapSchemaFields[0].setType(getPigType(mapAvroSchema));
+ }
+ mapSchema.setFields(mapSchemaFields);
+ rf.setSchema(mapSchema);
+ }
+ break;
+ case DataType.TUPLE:
+ if (alreadyDefinedSchemas.containsKey(fieldSchema.getFullName())) {
+ rf.setSchema(alreadyDefinedSchemas.get(fieldSchema.getFullName()));
+ } else {
+ ResourceSchema innerResourceSchema =
+ avroSchemaToResourceSchema(fieldSchema, schemasInStack,
+ alreadyDefinedSchemas, allowRecursiveSchema);
+ rf.setSchema(innerResourceSchema);
+ alreadyDefinedSchemas.put(
+ fieldSchema.getFullName(), innerResourceSchema);
+ }
+ break;
+ }
+
+ return rf;
+ }
+
+ /**
+ * Translates an Avro schema to a Resource Schema (for Pig). Internal method.
+ * @param s The avro schema for which to determine the type
+ * @param namesInStack Set of already defined object names
+ * @param alreadyDefinedSchemas Map of schema names to resourceschema objects
+ * @param allowRecursiveSchema controls whether to throw an error
+ * if the schema is recursive
+ * @throws IOException
+ * @return the corresponding pig schema
+ */
+ private static ResourceSchema avroSchemaToResourceSchema(final Schema s,
+ final Set<Schema> schemasInStack,
+ final Map<String, ResourceSchema> alreadyDefinedSchemas,
+ final Boolean allowRecursiveSchema) throws IOException {
+
+ ResourceSchema.ResourceFieldSchema[] resourceFields = null;
+ switch (s.getType()) {
+ case RECORD:
+ if (schemasInStack.contains(s)) {
+ if (allowRecursiveSchema) {
+ break; // don't define further fields in the schema
+ } else {
+ throw new IOException(
+ "Pig found recursive schema definition while processing"
+ + s.toString() + " encountered " + s.getFullName()
+ + " which was already seen in this stack: "
+ + schemasInStack.toString() + "\n");
+ }
+ }
+ schemasInStack.add(s);
+ resourceFields =
+ new ResourceSchema.ResourceFieldSchema[s.getFields().size()];
+ for (Field f : s.getFields()) {
+ resourceFields[f.pos()] = fieldToResourceFieldSchema(f,
+ schemasInStack, alreadyDefinedSchemas, allowRecursiveSchema);
+ }
+ schemasInStack.remove(s);
+ break;
+ default:
+ // wrap in a tuple
+ resourceFields = new ResourceSchema.ResourceFieldSchema[1];
+ Field f = new Schema.Field(s.getName(), s, s.getDoc(), null);
+ resourceFields[0] = fieldToResourceFieldSchema(f,
+ schemasInStack, alreadyDefinedSchemas, allowRecursiveSchema);
+ }
+ ResourceSchema rs = new ResourceSchema();
+ rs.setFields(resourceFields);
+
+ return rs;
+ }
+
+ /**
+ * Translated a ResourceSchema to an Avro Schema.
+ * @param rs Input schema.
+ * @param recordName Record name
+ * @param recordNameSpace Namespace
+ * @param definedRecordNames Map of already defined record names
+ * to schema objects
+ * @return the translated schema
+ * @throws IOException
+ */
+ public static Schema resourceSchemaToAvroSchema(final ResourceSchema rs,
+ String recordName, final String recordNameSpace,
+ final Map<String, List<Schema>> definedRecordNames,
+ final Boolean doubleColonsToDoubleUnderscores) throws IOException {
+
+ if (rs == null) {
+ return null;
+ }
+
+ recordName = toAvroName(recordName, doubleColonsToDoubleUnderscores);
+
+ List<Schema.Field> fields = new ArrayList<Schema.Field>();
+ Schema newSchema = Schema.createRecord(
+ recordName, null, recordNameSpace, false);
+ if (rs.getFields() != null) {
+ Integer i = 0;
+ for (ResourceSchema.ResourceFieldSchema rfs : rs.getFields()) {
+ String rfsName = toAvroName(rfs.getName(),
+ doubleColonsToDoubleUnderscores);
+ Schema fieldSchema = resourceFieldSchemaToAvroSchema(
+ rfsName, recordNameSpace, rfs.getType(),
+ rfs.getDescription().equals("autogenerated from Pig Field Schema")
+ ? null : rfs.getDescription(),
+ rfs.getSchema(), definedRecordNames,
+ doubleColonsToDoubleUnderscores);
+ fields.add(new Schema.Field((rfsName != null)
+ ? rfsName : recordName + "_" + i.toString(),
+ fieldSchema,
+ rfs.getDescription().equals(
+ "autogenerated from Pig Field Schema")
+ ? null : rfs.getDescription(), null));
+ i++;
+
+ }
+ newSchema.setFields(fields);
+ }
+
+ return newSchema;
+ }
+
+ /**
+ * Translates a name in a pig schema to an acceptable Avro name, or
+ * throws an error if the name can't be translated.
+ * @param name The variable name to translate.
+ * @param doubleColonsToDoubleUnderscores Indicates whether to translate
+ * double colons to underscores or throw an error if they are encountered.
+ * @return A name usable by Avro.
+ * @throws IOException If the name is not compatible with Avro.
+ */
+ private static String toAvroName(String name,
+ final Boolean doubleColonsToDoubleUnderscores) throws IOException {
+
+ if (name == null) {
+ return null;
+ }
+
+ if (doubleColonsToDoubleUnderscores) {
+ name = name.replace("::", "__");
+ }
+
+ if (name.matches("[A-Za-z_][A-Za-z0-9_]*")) {
+ return name;
+ } else {
+ throw new IOException(
+ "Pig Schema contains a name that is not allowed in Avro");
+ }
+ }
+
+ /**
+ * Returns a Union Schema composed of {@in} and null.
+ * @param in the schema to combine with null
+ * @return the new union schema
+ */
+ private static Schema createNullableUnion(final Schema in) {
+ return Schema.createUnion(Lists.newArrayList(Schema.create(Type.NULL), in));
+ }
+
+ /**
+ * Returns a Union Schema composed of {@in} and null.
+ * @param t the avro raw type to combine with null
+ * @return the new union schema
+ */
+ private static Schema createNullableUnion(final Type t) {
+ return createNullableUnion(Schema.create(t));
+ }
+
+ /**
+ * Creates a new Avro schema object from the input ResouceSchema.
+ * @param name object name
+ * @param nameSpace namespace for avro object
+ * @param type type of avro object
+ * @param description description for new avro object
+ * @param schema pig schema for the object
+ * @param definedRecordNames already defined record names
+ * @param doubleColonsToDoubleUnderscores whether to translate double
+ * colons in Pig names to underscores in avro names.
+ * @return the avro schema object
+ * @throws IOException If there is a translation error.
+ */
+ private static Schema resourceFieldSchemaToAvroSchema(
+ final String name, final String nameSpace,
+ final byte type, final String description,
+ final ResourceSchema schema,
+ final Map<String, List<Schema>> definedRecordNames,
+ final Boolean doubleColonsToDoubleUnderscores)
+ throws IOException {
+
+ switch (type) {
+ case DataType.BAG:
+ Schema innerBagSchema = resourceSchemaToAvroSchema(
+ schema.getFields()[0].getSchema(), name, null,
+ definedRecordNames,
+ doubleColonsToDoubleUnderscores);
+ return createNullableUnion(Schema.createArray(innerBagSchema));
+ case DataType.BIGCHARARRAY:
+ return createNullableUnion(Type.STRING);
+ case DataType.BOOLEAN:
+ return createNullableUnion(Type.BOOLEAN);
+ case DataType.BYTEARRAY:
+ Schema fixedSchema;
+ try {
+ fixedSchema = (new Schema.Parser()).parse(description);
+ } catch (Exception e) {
+ fixedSchema = null;
+ }
+ if (fixedSchema == null) {
+ return createNullableUnion(Type.BYTES);
+ } else {
+ return createNullableUnion(fixedSchema);
+ }
+ case DataType.CHARARRAY:
+ return createNullableUnion(Type.STRING);
+ case DataType.DATETIME:
+ return createNullableUnion(Type.LONG);
+ case DataType.DOUBLE:
+ return createNullableUnion(Type.DOUBLE);
+ case DataType.FLOAT:
+ return createNullableUnion(Type.FLOAT);
+ case DataType.INTEGER:
+ return createNullableUnion(Type.INT);
+ case DataType.LONG:
+ return createNullableUnion(Type.LONG);
+ case DataType.MAP:
+ byte innerType = schema.getFields()[0].getType();
+ String desc = schema.getFields()[0].getDescription();
+ if (desc.equals("autogenerated from Pig Field Schema")) {
+ desc = null;
+ }
+ Schema innerSchema;
+ if (DataType.isComplex(innerType)) {
+ innerSchema = createNullableUnion(
+ Schema.createMap(resourceSchemaToAvroSchema(
+ schema.getFields()[0].getSchema(),
+ name, nameSpace, definedRecordNames,
+ doubleColonsToDoubleUnderscores)));
+ } else {
+ innerSchema = createNullableUnion(
+ Schema.createMap(resourceFieldSchemaToAvroSchema(
+ name, nameSpace, innerType,
+ desc, null, definedRecordNames,
+ doubleColonsToDoubleUnderscores)));
+ }
+ return innerSchema;
+ case DataType.NULL:
+ return Schema.create(Type.NULL);
+ case DataType.TUPLE:
+ Schema returnSchema = createNullableUnion(
+ resourceSchemaToAvroSchema(schema, name, null,
+ definedRecordNames, doubleColonsToDoubleUnderscores));
+ if (definedRecordNames.containsKey(name)) {
+ List<Schema> schemaList = definedRecordNames.get(name);
+ boolean notfound = true;
+ for (Schema cachedSchema : schemaList) {
+ if (returnSchema.equals(cachedSchema)) {
+ notfound = false;
+ }
+ break;
+ }
+ if (notfound) {
+ returnSchema = createNullableUnion(resourceSchemaToAvroSchema(
+ schema, name + "_" + new Integer(schemaList.size()).toString(),
+ null, definedRecordNames, doubleColonsToDoubleUnderscores));
+ definedRecordNames.get(name).add(returnSchema);
+ }
+ } else {
+ definedRecordNames.put(name, Lists.newArrayList(returnSchema));
+ }
+ return returnSchema;
+ case DataType.BYTE:
+ case DataType.ERROR:
+ case DataType.GENERIC_WRITABLECOMPARABLE:
+ case DataType.INTERNALMAP:
+ case DataType.UNKNOWN:
+ default:
+ throw new IOException(
+ "Don't know how to encode type "
+ + DataType.findTypeName(type) + " in schema "
+ + ((schema == null) ? "" : schema.toString())
+ + "\n");
+ }
+ }
+
+ /**
+ * Checks to see if an avro schema is a combination of
+ * null and another object.
+ * @param s The object to check
+ * @return whether it's a nullable union
+ */
+ public static boolean isNullableUnion(final Schema s) {
+ return (
+ s.getType() == Type.UNION
+ && ((s.getTypes().size() == 1)
+ || (s.getTypes().size() == 2
+ && (s.getTypes().get(0).getType() == Type.NULL
+ || s.getTypes().get(1).getType() == Type.NULL))));
+ }
+
+ /**
+ * Given an input schema that is a union of an avro schema
+ * and null (or just a union with one type), return the avro schema.
+ * @param s The input schema object
+ * @return The non-null part of the union
+ */
+ public static Schema removeSimpleUnion(final Schema s) {
+ if (s.getType() == Type.UNION) {
+ List<Schema> types = s.getTypes();
+ for (Schema t : types) {
+ if (t.getType() != Type.NULL) {
+ return t;
+ }
+ }
+ }
+ return s;
+ }
+
+ /**
+ * Takes an Avro Schema and a Pig RequiredFieldList and returns a new schema
+ * with only the requried fields, or no if the function can't extract only
+ * those fields. Useful for push down projections.
+ * @param oldSchema The avro schema from which to extract the schema
+ * @param rfl the Pig required field list
+ * @return the new schema, or null
+ */
+ public static Schema newSchemaFromRequiredFieldList(
+ final Schema oldSchema, final RequiredFieldList rfl) {
+ return newSchemaFromRequiredFieldList(oldSchema, rfl.getFields());
+ }
+
+ /**
+ * Takes an Avro Schema and a Pig RequiredFieldList and returns a new schema
+ * with only the required fields, or no if the function can't extract only
+ * those fields. Useful for push down projections.
+ * @param oldSchema The avro schema from which to extract the schema
+ * @param rfl List of required fields
+ * @return the new schema
+ */
+ public static Schema newSchemaFromRequiredFieldList(
+ final Schema oldSchema, final List<RequiredField> rfl) {
+ List<Schema.Field> fields = Lists.newArrayList();
+ for (RequiredField rf : rfl) {
+ Schema.Field f = oldSchema.getField(rf.getAlias());
+ if (f == null) {
+ return null;
+ }
+ try {
+ if (getPigType(f.schema()) != rf.getType()) {
+ return null;
+ }
+ } catch (ExecException e) {
+ Log.warn("ExecException caught in newSchemaFromRequiredFieldList", e);
+ return null;
+ }
+ if (rf.getSubFields() == null) {
+ fields.add(
+ new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultValue()));
+ } else {
+ Schema innerSchema =
+ newSchemaFromRequiredFieldList(f.schema(), rf.getSubFields());
+ if (innerSchema == null) {
+ return null;
+ } else {
+ fields.add(
+ new Schema.Field(
+ f.name(), innerSchema, f.doc(), f.defaultValue()));
+ }
+ }
+ }
+
+ Schema newSchema = Schema.createRecord(
+ oldSchema.getName(),
+ "subset of fields from " + oldSchema.getName()
+ + "; " + oldSchema.getDoc(),
+ oldSchema.getNamespace(), false);
+
+ newSchema.setFields(fields);
+ return newSchema;
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util.avro;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+/**
+ * Object that wraps an Avro object in a tuple.
+ * @param <T> The type of the Avro object
+ */
+public final class AvroTupleWrapper <T extends IndexedRecord>
+ implements Tuple {
+
+ /**
+ * The Avro object wrapped in the pig Tuple.
+ */
+ private T avroObject;
+
+ /**
+ * Creates a new AvroTupleWrapper object.
+ * @param o The object to wrap
+ */
+ public AvroTupleWrapper(final T o) {
+ avroObject = o;
+ }
+
+ @Override
+ public void write(final DataOutput o) throws IOException {
+ throw new IOException(
+ this.getClass().toString() + ".write called, but not implemented yet");
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public int compareTo(final Object o) {
+ if (o instanceof AvroTupleWrapper) {
+ return GenericData.get().compare(avroObject,
+ ((AvroTupleWrapper) o).avroObject,
+ avroObject.getSchema());
+ }
+ return -1;
+ }
+
+ @Override
+ public void append(final Object o) {
+ List<Field> fields = avroObject.getSchema().getFields();
+ avroObject.put(fields.size(), o);
+ Schema fieldSchema = null;
+ if (o instanceof String) {
+ fieldSchema = Schema.create(Type.STRING);
+ } else if (o instanceof Integer) {
+ fieldSchema = Schema.create(Type.INT);
+ } else if (o instanceof Long) {
+ fieldSchema = Schema.create(Type.LONG);
+ } else if (o instanceof Double) {
+ fieldSchema = Schema.create(Type.DOUBLE);
+ } else if (o instanceof Float) {
+ fieldSchema = Schema.create(Type.FLOAT);
+ } else if (o == null) {
+ fieldSchema = Schema.create(Type.NULL);
+ } else if (o instanceof Boolean) {
+ fieldSchema = Schema.create(Type.BOOLEAN);
+ } else if (o instanceof Map) {
+ fieldSchema = Schema.create(Type.MAP);
+ }
+ Field newField = new Field("FIELD_" + fields.size(), fieldSchema, "", null);
+ fields.add(newField);
+ avroObject.getSchema().setFields(fields);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object get(final int pos) throws ExecException {
+
+ Schema s = avroObject.getSchema().getFields().get(pos).schema();
+ Object o = avroObject.get(pos);
+
+ switch(s.getType()) {
+ case STRING:
+ // unwrap avro UTF8 encoding
+ return o.toString();
+ case MAP:
+ return new AvroMapWrapper((Map<CharSequence, Object>) o);
+ case RECORD:
+ return new AvroTupleWrapper<T>((T) o);
+ case ENUM:
+ return o.toString();
+ case ARRAY:
+ return new AvroBagWrapper<GenericData.Record>(
+ (GenericArray<GenericData.Record>) o);
+ case FIXED:
+ return new DataByteArray(((GenericData.Fixed) o).bytes());
+ case BYTES:
+ return new DataByteArray(((ByteBuffer) o).array());
+ case UNION:
+ if (o instanceof org.apache.avro.util.Utf8) {
+ return o.toString();
+ } else if (o instanceof IndexedRecord) {
+ return new AvroTupleWrapper<T>((T) o);
+ } else if (o instanceof GenericArray) {
+ return new AvroBagWrapper<GenericData.Record>(
+ (GenericArray<GenericData.Record>) o);
+ } else if (o instanceof Map) {
+ return new AvroMapWrapper((Map<CharSequence, Object>) o);
+ } else if (o instanceof GenericData.Fixed) {
+ return new DataByteArray(((GenericData.Fixed) o).bytes());
+ } else if (o instanceof ByteBuffer) {
+ return new DataByteArray(((ByteBuffer) o).array());
+ }
+ default:
+ return o;
+ }
+
+ }
+
+ @Override
+ public List<Object> getAll() {
+
+ List<Object> all = Lists.newArrayList();
+ for (Schema.Field f : avroObject.getSchema().getFields()) {
+ try {
+ all.add(get(f.pos()));
+ } catch (ExecException e) {
+ LogFactory.getLog(getClass()).error(
+ "could not process tuple with contents " + avroObject, e);
+ return null;
+ }
+ }
+ return all;
+ }
+
+ @Override
+ public long getMemorySize() {
+ return getMemorySize(avroObject);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private long getMemorySize(final IndexedRecord r) {
+ int total = 0;
+ final int bitsPerByte = 8;
+ for (Field f : r.getSchema().getFields()) {
+ switch (f.schema().getType()) {
+ case BOOLEAN:
+ case ENUM:
+ case INT:
+ total += Integer.SIZE << bitsPerByte;
+ break;
+ case DOUBLE:
+ total += Double.SIZE << bitsPerByte;
+ break;
+ case FLOAT:
+ total += Float.SIZE << bitsPerByte;
+ break;
+ case NULL:
+ break;
+ case STRING:
+ total += ((String) r.get(f.pos())).length()
+ * (Character.SIZE << bitsPerByte);
+ break;
+ case BYTES:
+ total += ((Byte[]) r.get(f.pos())).length;
+ break;
+ case RECORD:
+ total += new AvroTupleWrapper(
+ (IndexedRecord) r.get(f.pos())).getMemorySize();
+ break;
+ case ARRAY:
+ total += new AvroBagWrapper(
+ (GenericArray) r.get(f.pos())).getMemorySize();
+ break;
+ }
+ }
+ return total;
+ }
+
+
+
+ @Override
+ public byte getType(final int arg0) throws ExecException {
+ Schema s = avroObject.getSchema().getFields().get(arg0).schema();
+ return AvroStorageSchemaConversionUtilities.getPigType(s);
+ }
+
+ @Override
+ public boolean isNull(final int arg0) throws ExecException {
+ return avroObject == null || avroObject.get(arg0) == null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void reference(final Tuple arg0) {
+ avroObject = (T) ((AvroTupleWrapper<T>) arg0).avroObject;
+ }
+
+ @Override
+ public void set(final int arg0, final Object arg1) throws ExecException {
+ avroObject.put(arg0, arg1);
+ }
+
+ @Override
+ public int size() {
+ return avroObject.getSchema().getFields().size();
+ }
+
+ @Override
+ public String toDelimitedString(final String arg0) throws ExecException {
+ StringBuffer delimitedString = new StringBuffer();
+ boolean notfirst = false;
+ for (Field f : avroObject.getSchema().getFields()) {
+ if (notfirst) {
+ delimitedString.append(arg0);
+ notfirst = true;
+ }
+ Object val = avroObject.get(f.pos());
+ if (val == null) {
+ delimitedString.append("");
+ } else {
+ delimitedString.append(val.toString());
+ }
+ }
+ return delimitedString.toString();
+ }
+
+ @Override
+ public void readFields(final DataInput d) throws IOException {
+ throw new IOException(
+ this.getClass().toString()
+ + ".readFields called but not implemented yet");
+ }
+
+ @Override
+ public Iterator<Object> iterator() {
+ return Iterators.transform(avroObject.getSchema().getFields().iterator(),
+ new Function<Schema.Field, Object>() {
+ @Override
+ public Object apply(final Field f) {
+ return avroObject.get(f.pos());
+ }
+ }
+ );
+ }
+
+}
Added: pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java (added)
+++ pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,832 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.builtin;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.tool.DataFileWriteTool;
+import org.apache.avro.tool.Tool;
+import org.apache.avro.tool.TrevniCreateRandomTool;
+import org.apache.avro.tool.TrevniToJsonTool;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestAvroStorage {
+
+ final protected static Log LOG = LogFactory.getLog(TestAvroStorage.class);
+
+ final private static String basedir = "test/org/apache/pig/builtin/avro/";
+ final private static String outbasedir = System.getProperty("user.dir") + "/build/test/TestAvroStorage/";
+ final private static String[] datadir = {
+ "data/avro",
+ "data/avro/compressed",
+ "data/avro/compressed/snappy",
+ "data/avro/compressed/deflate",
+ "data/avro/uncompressed",
+ "data/avro/uncompressed/testdirectory",
+ "data/json/testdirectory",
+ "data/trevni",
+ "data/trevni/uncompressed",
+ };
+ final private static String[] avroSchemas = {
+ "arraysAsOutputByPig",
+ "arrays",
+ "recordsAsOutputByPig",
+ "recordsAsOutputByPigWithDates",
+ "records",
+ "recordsOfArrays",
+ "recordsOfArraysOfRecords",
+ "recordsSubSchema",
+ "recordsSubSchemaNullable",
+ "recordsWithDoubleUnderscores",
+ "recordsWithEnums",
+ "recordsWithFixed",
+ "recordsWithMaps",
+ "recordsWithMapsOfRecords",
+ "recordsWithNullableUnions",
+ "recordWithRepeatedSubRecords",
+ "recursiveRecord",
+ "projectionTest",
+ "recordsWithSimpleUnion",
+ "recordsWithSimpleUnionOutput",
+ };
+ final private static String[] trevniSchemas = {
+ "simpleRecordsTrevni",
+ };
+
+ private static PigServer pigServerLocal = null;
+
+ public static final PathFilter hiddenPathFilter = new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ private static String loadFileIntoString(String file) throws IOException {
+ return FileUtils.readFileToString(new File(file)).replace("\n", " ");
+ }
+
+ @BeforeClass
+ public static void setup() throws ExecException, IOException {
+ pigServerLocal = new PigServer(ExecType.LOCAL);
+ deleteDirectory(new File(outbasedir));
+ generateInputFiles();
+ }
+
+ @AfterClass
+ public static void teardown() throws IOException {
+ if(pigServerLocal != null) {
+ pigServerLocal.shutdown();
+ }
+ deleteInputFiles();
+ }
+
+ /**
+ * Generate input files for test cases
+ */
+ private static void generateInputFiles() throws IOException {
+ String data;
+ String json;
+ String avro;
+ String trevni;
+ String schema;
+
+ for (String fn : datadir) {
+ FileUtils.forceMkdir(new File(basedir + fn));
+ }
+
+ for (String fn : avroSchemas) {
+ schema = basedir + "schema/" + fn + ".avsc";
+ json = basedir + "data/json/" + fn + ".json";
+ avro = basedir + "data/avro/uncompressed/" + fn + ".avro";
+ LOG.info("creating " + avro);
+ generateAvroFile(schema, json, avro, null);
+ }
+
+ for (String fn : trevniSchemas) {
+ schema = basedir + "schema/" + fn + ".avsc";
+ trevni = basedir + "data/trevni/uncompressed/" + fn + ".trevni";
+ json = basedir + "data/json/" + fn + ".json";
+ LOG.info("creating " + trevni);
+ generateRandomTrevniFile(schema, "1000", trevni);
+ LOG.info("creating " + json);
+ convertTrevniToJsonFile(schema, trevni, json);
+
+ schema = basedir + "schema/" + fn + ".avsc";
+ json = basedir + "data/json/" + fn + ".json";
+ avro = basedir + "data/avro/uncompressed/" + fn + ".avro";
+ LOG.info("creating " + avro);
+ generateAvroFile(schema, json, avro, null);
+ }
+
+ PrintWriter w;
+ int itemSum = 0;
+ int recordCount = 0;
+ int evenFileNameItemSum = 0;
+ int evenFileNameRecordCount = 0;
+
+ for (int i = 0; i < 8; i++) {
+ schema = basedir + "schema/testDirectory.avsc";
+ json = basedir + "data/json/testdirectory/part-m-0000" + i;
+ avro = basedir + "data/avro/uncompressed/testdirectory/part-m-0000" + i + ".avro";
+ LOG.info("creating " + json);
+ w = new PrintWriter(new FileWriter(json));
+ for (int j = i*1000; j < (i+1)*1000; j++) {
+ itemSum += j;
+ recordCount += 1;
+ evenFileNameItemSum += ((i+1) % 2) * j;
+ evenFileNameRecordCount += (i+1) % 2;
+ w.println("{\"item\" : " + j + ", \"timestamp\" : " + System.currentTimeMillis() + " }\n");
+ }
+ w.close();
+ generateAvroFile(schema, json, avro, null);
+ }
+
+ schema = basedir + "schema/testDirectoryCounts.avsc";
+ json = basedir + "data/json/testDirectoryCounts.json";
+ avro = basedir + "data/avro/uncompressed/testDirectoryCounts.avro";
+ data = "{\"itemSum\" : {\"int\" : " + itemSum + "}, \"n\" : {\"int\" : " + recordCount + "} }";
+ LOG.info("creating " + json);
+ FileUtils.writeStringToFile(new File(json), data);
+ LOG.info("creating " + avro);
+ generateAvroFile(schema, json, avro, null);
+
+ schema = basedir + "schema/testDirectoryCounts.avsc";
+ json = basedir + "data/json/evenFileNameTestDirectoryCounts.json";
+ avro = basedir + "data/avro/uncompressed/evenFileNameTestDirectoryCounts.avro";
+ data = "{\"itemSum\" : {\"int\" : " + evenFileNameItemSum + "}, \"n\" : {\"int\" : " + evenFileNameRecordCount + "} }";
+ LOG.info("creating " + json);
+ FileUtils.writeStringToFile(new File(json), data);
+ LOG.info("creating " + avro);
+ generateAvroFile(schema, json, avro, null);
+
+ for (String codec : new String[] {"deflate", "snappy"}) {
+ for (String fn : new String[] {"records", "recordsAsOutputByPig"}) {
+ schema = basedir + "schema/" + fn + ".avsc";
+ json = basedir + "data/json/" + fn + ".json";
+ avro = basedir + "data/avro/compressed/" + codec + "/" + fn + ".avro";
+ LOG.info("creating " + avro);
+ generateAvroFile(schema, json, avro, codec);
+ }
+ }
+ }
+
+ /**
+ * Clean up generated input files
+ */
+ private static void deleteInputFiles() throws IOException {
+ // Delete auto-generated directories
+ for (String fn : datadir) {
+ LOG.info("Deleting " + basedir + fn);
+ FileUtils.deleteQuietly(new File(basedir + fn));
+ }
+ // Delete auto-generated json files
+ String json;
+ for (String fn : trevniSchemas) {
+ json = basedir + "data/json/" + fn + ".json";
+ LOG.info("Deleting " + json);
+ FileUtils.deleteQuietly(new File(json));
+ }
+ json = basedir + "data/json/testDirectoryCounts.json";
+ LOG.info("Deleting " + json);
+ FileUtils.deleteQuietly(new File(json));
+ json = basedir + "data/json/evenFileNameTestDirectoryCounts.json";
+ LOG.info("Deleting " + json);
+ FileUtils.deleteQuietly(new File(json));
+ }
+
+ private static void generateAvroFile(String schema, String json, String avro, String codec) throws IOException {
+ Tool tool = new DataFileWriteTool();
+ List<String> args = new ArrayList<String>();
+ args.add("--schema-file");
+ args.add(schema);
+ args.add(json);
+ if (codec != null) {
+ args.add("--codec");
+ args.add(codec);
+ }
+ try {
+ StringBuffer sb = new StringBuffer();
+ for (String a : args) {
+ sb.append(a);
+ sb.append(" ");
+ }
+ PrintStream out = new PrintStream(avro);
+ tool.run(System.in, out, System.err, args);
+ } catch (Exception e) {
+ LOG.info("Could not generate avro file: " + avro, e);
+ throw new IOException();
+ }
+ }
+
+ private static void generateRandomTrevniFile(String schema, String count, String trevni) throws IOException {
+ Tool tool = new TrevniCreateRandomTool();
+ List<String> args = new ArrayList<String>();
+ args.add(schema);
+ args.add(count);
+ args.add(trevni);
+ try {
+ tool.run(System.in, System.out, System.err, args);
+ } catch (Exception e) {
+ LOG.info("Could not generate trevni file: " + trevni, e);
+ throw new IOException();
+ }
+ }
+
+ private static void convertTrevniToJsonFile(String schema, String trevni, String json) throws IOException {
+ Tool tool = new TrevniToJsonTool();
+ List<String> args = new ArrayList<String>();
+ args.add(trevni);
+ try {
+ PrintStream out = new PrintStream(json);
+ tool.run(System.in, out, System.err, args);
+ } catch (Exception e) {
+ LOG.info("Could not generate json file: " + json, e);
+ throw new IOException();
+ }
+ }
+
+ private String createOutputName() {
+ final StackTraceElement[] st = Thread.currentThread().getStackTrace();
+ return outbasedir + st[2].getMethodName();
+ }
+
+ @Test public void testLoadRecords() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/records.avro";
+ final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
+ testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_1", "records",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecordsWithSimpleUnion() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/recordsWithSimpleUnion.avro";
+ final String check = basedir + "data/avro/uncompressed/recordsWithSimpleUnionOutput.avro";
+ testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_1", "records",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin -f " + basedir + "schema/recordsSubSchema.avsc",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+
+ @Test public void testProjection() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/records.avro";
+ final String check = basedir + "data/avro/uncompressed/projectionTest.avro";
+ testAvroStorage(true, basedir + "code/pig/projection_test.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_1", "projectionTest",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+
+ @Test public void testDates() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/records.avro";
+ final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPigWithDates.avro";
+ testAvroStorage(true, basedir + "code/pig/with_dates.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_1", "records",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecordsSpecifyFullSchema() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/records.avro";
+ final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
+ final String schema = loadFileIntoString(basedir + "schema/records.avsc");
+ testAvroStorage(true, basedir + "code/pig/identity_ai1_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "OUTFILE", createOutputName(),
+ "AVROSTORAGE_OUT_1", "records",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "AVROSTORAGE_IN_1", schema)
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecordsSpecifyFullSchemaFromFile() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/records.avro";
+ final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
+ testAvroStorage(true, basedir + "code/pig/identity.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "OUTFILE", createOutputName(),
+ "AVROSTORAGE_OUT_1", "records",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "AVROSTORAGE_IN_2", "-f " + basedir + "schema/records.avsc")
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecordsSpecifySubSchema() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/records.avro";
+ final String check = basedir + "data/avro/uncompressed/recordsSubSchema.avro";
+ testAvroStorage(true, basedir + "code/pig/identity_ai1_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "OUTFILE", createOutputName(),
+ "AVROSTORAGE_OUT_1", "records",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin -f " + basedir + "schema/recordsSubSchema.avsc",
+ "AVROSTORAGE_IN_1", loadFileIntoString(basedir + "schema/recordsSubSchema.avsc"))
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecordsSpecifySubSchemaFromFile() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/records.avro";
+ final String check = basedir + "data/avro/uncompressed/recordsSubSchema.avro";
+ testAvroStorage(true, basedir + "code/pig/identity_blank_first_args.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "OUTFILE", createOutputName(),
+ "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsSubSchema.avsc",
+ "AVROSTORAGE_IN_2", "-f " + basedir + "schema/recordsSubSchema.avsc")
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecordsSpecifySubSchemaFromExampleFile() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/records.avro";
+ final String check = basedir + "data/avro/uncompressed/recordsSubSchema.avro";
+ testAvroStorage(true, basedir + "code/pig/identity_blank_first_args.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "OUTFILE", createOutputName(),
+ "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsSubSchema.avsc",
+ "AVROSTORAGE_IN_2", "-e " + basedir + "data/avro/uncompressed/recordsSubSchema.avro")
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecordsOfArrays() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/recordsOfArrays.avro";
+ final String check = input;
+ testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsOfArrays.avsc",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecordsOfArraysOfRecords() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/recordsOfArraysOfRecords.avro";
+ final String check = input;
+ testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsOfArraysOfRecords.avsc",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecordsWithEnums() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/recordsWithEnums.avro";
+ final String check = input;
+ testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithEnums.avsc",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecordsWithFixed() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/recordsWithFixed.avro";
+ final String check = input;
+ testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithFixed.avsc",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecordsWithMaps() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/recordsWithMaps.avro";
+ final String check = input;
+ testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithMaps.avsc",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecordsWithMapsOfRecords() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/recordsWithMapsOfRecords.avro";
+ final String check = input;
+ testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithMapsOfRecords.avsc",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecordsWithNullableUnions() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/recordsWithNullableUnions.avro";
+ final String check = input;
+ testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordsWithNullableUnions.avsc",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadDeflateCompressedRecords() throws Exception {
+ final String input = basedir + "data/avro/compressed/deflate/records.avro";
+ final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
+ testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_1", "records",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadSnappyCompressedRecords() throws Exception {
+ final String input = basedir + "data/avro/compressed/snappy/records.avro";
+ final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
+ testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_1", "records",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testStoreDeflateCompressedRecords() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/records.avro";
+ final String check = basedir + "data/avro/compressed/deflate/recordsAsOutputByPig.avro";
+ testAvroStorage(true, basedir + "code/pig/identity_codec.pig",
+ ImmutableMap.<String,String>builder()
+ .put("INFILE",input)
+ .put("OUTFILE",createOutputName())
+ .put("AVROSTORAGE_OUT_1", "records")
+ .put("AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin")
+ .put("CODEC", "deflate")
+ .put("LEVEL", "6")
+ .build()
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testStoreSnappyCompressedRecords() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/records.avro";
+ final String check = basedir + "data/avro/compressed/snappy/recordsAsOutputByPig.avro";
+ testAvroStorage(true, basedir + "code/pig/identity_codec.pig",
+ ImmutableMap.<String,String>builder()
+ .put("INFILE",input)
+ .put("OUTFILE",createOutputName())
+ .put("AVROSTORAGE_OUT_1", "records")
+ .put("AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin")
+ .put("CODEC", "snappy")
+ .put("LEVEL", "6")
+ .build()
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecursiveRecords() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/recursiveRecord.avro";
+ testAvroStorage(false, basedir + "code/pig/recursive_tests.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_IN_2", "-n this.is.ignored.in.a.loadFunc",
+ "AVROSTORAGE_OUT_1", "recordsSubSchemaNullable",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "OUTFILE", createOutputName())
+ );
+ }
+
+ @Test public void testLoadRecursiveRecordsOptionOn() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/recursiveRecord.avro";
+ final String check = basedir + "data/avro/uncompressed/recordsSubSchemaNullable.avro";
+ testAvroStorage(true, basedir + "code/pig/recursive_tests.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_IN_2", "-r",
+ "AVROSTORAGE_OUT_1", "recordsSubSchemaNullable",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadRecordsWithRepeatedSubRecords() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
+ final String check = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
+ testAvroStorage(true, basedir + "code/pig/identity_just_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordWithRepeatedSubRecords.avsc",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadDirectory() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/testdirectory";
+ final String check = basedir + "data/avro/uncompressed/testDirectoryCounts.avro";
+ testAvroStorage(true, basedir + "code/pig/directory_test.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_1", "stats",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadGlob() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/testdirectory/part-m-0000*";
+ final String check = basedir + "data/avro/uncompressed/testDirectoryCounts.avro";
+ testAvroStorage(true, basedir + "code/pig/directory_test.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_1", "stats",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testPartialLoadGlob() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/testdirectory/part-m-0000{0,2,4,6}.avro";
+ final String check = basedir + "data/avro/uncompressed/evenFileNameTestDirectoryCounts.avro";
+ testAvroStorage(true, basedir + "code/pig/directory_test.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_1", "stats",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testDoubleUnderscore() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/records.avro";
+ final String check = basedir + "data/avro/uncompressed/recordsWithDoubleUnderscores.avro";
+ testAvroStorage(true, basedir + "code/pig/namesWithDoubleColons.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_1", "recordsWithDoubleUnderscores",
+ "AVROSTORAGE_OUT_2", "-d -n org.apache.pig.test.builtin",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testDoubleUnderscoreNoFlag() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/records.avro";
+ testAvroStorage(false, basedir + "code/pig/namesWithDoubleColons.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_1", "recordsWithDoubleUnderscores",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "OUTFILE", createOutputName())
+ );
+ }
+
+ @Test public void testLoadArrays() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/arrays.avro";
+ final String check = basedir + "data/avro/uncompressed/arraysAsOutputByPig.avro";
+ testAvroStorage(true, basedir + "code/pig/identity_ao2.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_1", "arrays",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadTrevniRecords() throws Exception {
+ final String input = basedir + "data/trevni/uncompressed/simpleRecordsTrevni.trevni";
+ final String check = basedir + "data/avro/uncompressed/simpleRecordsTrevni.avro";
+ testAvroStorage(true, basedir + "code/pig/trevni_to_avro.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/simpleRecordsTrevni.avsc",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test public void testLoadAndSaveTrevniRecords() throws Exception {
+ final String input = basedir + "data/trevni/uncompressed/simpleRecordsTrevni.trevni";
+ final String check = basedir + "data/avro/uncompressed/simpleRecordsTrevni.avro";
+
+ testAvroStorage(true, basedir + "code/pig/trevni_to_trevni.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/simpleRecordsTrevni.avsc",
+ "OUTFILE", createOutputName() + "Trevni")
+ );
+
+ testAvroStorage(true, basedir + "code/pig/trevni_to_avro.pig",
+ ImmutableMap.of(
+ "INFILE", createOutputName() + "Trevni",
+ "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/simpleRecordsTrevni.avsc",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(), check);
+ }
+
+ private static void deleteDirectory (File path) {
+ if ( path.exists()) {
+ File [] files = path.listFiles();
+ for (File file: files) {
+ if (file.isDirectory())
+ deleteDirectory(file);
+ file.delete();
+ }
+ }
+ }
+
+ private void testAvroStorage(boolean expectedToSucceed, String scriptFile, Map<String,String> parameterMap) throws IOException {
+ pigServerLocal.setBatchOn();
+
+ int numOfFailedJobs = 0;
+
+ try {
+ pigServerLocal.registerScript(scriptFile, parameterMap);
+ for (ExecJob job : pigServerLocal.executeBatch()) {
+ if (job.getStatus().equals(JOB_STATUS.FAILED)) {
+ numOfFailedJobs++;
+ }
+ }
+ } catch (Exception e) {
+ System.err.printf("Exception caught in testAvroStorage: %s\n", e);
+ numOfFailedJobs++;
+ }
+
+ if (expectedToSucceed) {
+ assertTrue("There was a failed job!", numOfFailedJobs == 0);
+ } else {
+ assertTrue("There was no failed job!", numOfFailedJobs > 0);
+ }
+ }
+
+ private void verifyResults(String outPath, String expectedOutpath) throws IOException {
+ verifyResults(outPath, expectedOutpath, null);
+ }
+
+ private void verifyResults(String outPath, String expectedOutpath, String expectedCodec) throws IOException {
+ FileSystem fs = FileSystem.getLocal(new Configuration()) ;
+
+ /* read in expected results*/
+ Set<GenericData.Record> expected = getExpected (expectedOutpath);
+
+ /* read in output results and compare */
+ Path output = new Path(outPath);
+ assertTrue("Output dir does not exists!", fs.exists(output)
+ && fs.getFileStatus(output).isDir());
+
+ Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
+ assertTrue("Split field dirs not found!", paths != null);
+
+ for (Path path : paths) {
+ Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
+ assertTrue("No files found for path: " + path.toUri().getPath(),
+ files != null);
+ for (Path filePath : files) {
+ assertTrue("This shouldn't be a directory", fs.isFile(filePath));
+
+ GenericDatumReader<GenericData.Record> reader = new GenericDatumReader<GenericData.Record>();
+
+ DataFileStream<GenericData.Record> in = new DataFileStream<GenericData.Record>(
+ fs.open(filePath), reader);
+ assertEquals("codec", expectedCodec, in.getMetaString("avro.codec"));
+ int count = 0;
+ while (in.hasNext()) {
+ GenericData.Record obj = in.next();
+ assertTrue("Avro result object found that's not expected: Found "
+ + (obj != null ? obj.getSchema() : "null") + ", " + obj.toString()
+ + "\nExpected " + (expected != null ? expected.toString() : "null") + "\n"
+ , expected.contains(obj));
+ count++;
+ }
+ in.close();
+ assertEquals(expected.size(), count);
+ }
+ }
+ }
+
+ private Set<GenericData.Record> getExpected (String pathstr ) throws IOException {
+
+ Set<GenericData.Record> ret = new TreeSet<GenericData.Record>(
+ new Comparator<GenericData.Record>() {
+ @Override
+ public int compare(Record o1, Record o2) {
+ return o1.toString().compareTo(o2.toString());
+ }}
+ );
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+
+ /* read in output results and compare */
+ Path output = new Path(pathstr);
+ assertTrue("Expected output does not exists!", fs.exists(output));
+
+ Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
+ assertTrue("Split field dirs not found!", paths != null);
+
+ for (Path path : paths) {
+ Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
+ assertTrue("No files found for path: " + path.toUri().getPath(), files != null);
+ for (Path filePath : files) {
+ assertTrue("This shouldn't be a directory", fs.isFile(filePath));
+
+ GenericDatumReader<GenericData.Record> reader = new GenericDatumReader<GenericData.Record>();
+
+ DataFileStream<GenericData.Record> in = new DataFileStream<GenericData.Record>(fs.open(filePath), reader);
+
+ while (in.hasNext()) {
+ GenericData.Record obj = in.next();
+ ret.add(obj);
+ }
+ in.close();
+ }
+ }
+ return ret;
+ }
+
+}
+
Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/directory_test.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/directory_test.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/directory_test.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/directory_test.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,6 @@
+in = LOAD '$INFILE' USING AvroStorage;
+out = FOREACH (GROUP in ALL) GENERATE
+ (int) SUM(in.item) as itemSum:int,
+ (int) COUNT_STAR(in) as n:int;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage('','$AVROSTORAGE_IN_2');
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');
Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ai1_ao2.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ai1_ao2.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ai1_ao2.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ai1_ao2.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage('$AVROSTORAGE_IN_1');
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ao2.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ao2.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ao2.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ao2.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage();
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_blank_first_args.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_blank_first_args.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_blank_first_args.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_blank_first_args.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage('','$AVROSTORAGE_IN_2');
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('','$AVROSTORAGE_OUT_2');
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_codec.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_codec.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_codec.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_codec.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,6 @@
+SET avro.output.codec $CODEC
+SET avro.mapred.deflate.level $LEVEL
+in = LOAD '$INFILE' USING AvroStorage();
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');
Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_just_ao2.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_just_ao2.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_just_ao2.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_just_ao2.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage();
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('','$AVROSTORAGE_OUT_2');
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/namesWithDoubleColons.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/namesWithDoubleColons.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/namesWithDoubleColons.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/namesWithDoubleColons.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,7 @@
+in = LOAD '$INFILE' USING AvroStorage('','');
+step1 = FOREACH in GENERATE TOTUPLE(key) as A, TOTUPLE(intValue) as C;
+step2 = FOREACH step1 GENERATE A, TOTUPLE(C) as B;
+step3 = FOREACH step2 GENERATE FLATTEN(A), FLATTEN(B);
+out = FOREACH step3 GENERATE A::key, FLATTEN(B::C);
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/projection_test.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/projection_test.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/projection_test.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/projection_test.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage();
+out = FOREACH in GENERATE $0, $1, $2;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/recursive_tests.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/recursive_tests.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/recursive_tests.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/recursive_tests.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage('', '$AVROSTORAGE_IN_2');
+out = FOREACH in GENERATE $0, $1;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1', '$AVROSTORAGE_OUT_2');
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_avro.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_avro.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_avro.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_avro.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING TrevniStorage;
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('','$AVROSTORAGE_OUT_2');
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_trevni.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_trevni.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_trevni.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_trevni.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING TrevniStorage;
+out = FOREACH in GENERATE *;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING TrevniStorage('','$AVROSTORAGE_OUT_2');
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/code/pig/with_dates.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/code/pig/with_dates.pig?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/code/pig/with_dates.pig (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/code/pig/with_dates.pig Fri Jun 28 22:54:05 2013
@@ -0,0 +1,4 @@
+in = LOAD '$INFILE' USING AvroStorage();
+out = FOREACH in GENERATE *, ToDate('2013-05-01Z', 'yyyy-MM-ddZ') AS date;
+RMF $OUTFILE;
+STORE out INTO '$OUTFILE' USING AvroStorage('$AVROSTORAGE_OUT_1','$AVROSTORAGE_OUT_2');
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/data/json/arrays.json
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/data/json/arrays.json?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/data/json/arrays.json (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/data/json/arrays.json Fri Jun 28 22:54:05 2013
@@ -0,0 +1,3 @@
+[1, 2, 3, 4, 5]
+[6]
+[]
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/data/json/arraysAsOutputByPig.json
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/data/json/arraysAsOutputByPig.json?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/data/json/arraysAsOutputByPig.json (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/data/json/arraysAsOutputByPig.json Fri Jun 28 22:54:05 2013
@@ -0,0 +1,23 @@
+{"array" :
+ {"array" :
+ [
+ {"array_0" : {"int" : 1}},
+ {"array_0" : {"int" : 2}},
+ {"array_0" : {"int" : 3}},
+ {"array_0" : {"int" : 4}},
+ {"array_0" : {"int" : 5}}
+ ]
+ }
+}
+{"array" : {
+ "array" :
+ [
+ {"array_0" : {"int" : 6}}
+ ]
+ }
+}
+{"array" : {
+ "array" :
+ []
+ }
+}
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/data/json/projectionTest.json
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/data/json/projectionTest.json?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/data/json/projectionTest.json (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/data/json/projectionTest.json Fri Jun 28 22:54:05 2013
@@ -0,0 +1,15 @@
+{
+ "key" : "A",
+ "intValue" : 1,
+ "longValue" : 1
+}
+{
+ "key" : "B",
+ "intValue" : 2,
+ "longValue" : 2
+}
+{
+ "key" : "C",
+ "intValue" : 3,
+ "longValue" : 3
+}
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordWithRepeatedSubRecords.json
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordWithRepeatedSubRecords.json?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordWithRepeatedSubRecords.json (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordWithRepeatedSubRecords.json Fri Jun 28 22:54:05 2013
@@ -0,0 +1,12 @@
+{"key" : "stuff in closet",
+ "value1" : {"thing" : "hat", "count" : 7},
+ "value2" : {"thing" : "coat", "count" : 2}
+ }
+{"key" : "stuff on desk",
+ "value1" : {"thing" : "stapler", "count" : 1},
+ "value2" : {"thing" : "PC", "count" : 0}
+ }
+{"key" : "examples",
+ "value1" : {"thing" : "interesting", "count" : 0},
+ "value2" : {"thing" : "dull", "count" : 10}
+ }
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/data/json/records.json
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/data/json/records.json?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/data/json/records.json (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/data/json/records.json Fri Jun 28 22:54:05 2013
@@ -0,0 +1,30 @@
+{
+ "key" : "A",
+ "intValue" : 1,
+ "longValue" : 1,
+ "booleanValue" : true,
+ "floatValue" : 1.0,
+ "doubleValue" : 1.0,
+ "bytesValue" : "\u00FF",
+ "nullValue" : null
+}
+{
+ "key" : "B",
+ "intValue" : 2,
+ "longValue" : 2,
+ "booleanValue" : true,
+ "floatValue" : 2.0,
+ "doubleValue" : 2.0,
+ "bytesValue" : "\u00FE",
+ "nullValue" : null
+}
+{
+ "key" : "C",
+ "intValue" : 3,
+ "longValue" : 3,
+ "booleanValue" : false,
+ "floatValue" : 3.0,
+ "doubleValue" : 3.0,
+ "bytesValue" : "\u00FD",
+ "nullValue" : null
+}
\ No newline at end of file
Added: pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsAsOutputByPig.json
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsAsOutputByPig.json?rev=1497954&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsAsOutputByPig.json (added)
+++ pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsAsOutputByPig.json Fri Jun 28 22:54:05 2013
@@ -0,0 +1,30 @@
+{
+ "key" : {"string": "A"},
+ "intValue" : {"int" : 1},
+ "longValue" : {"long" : 1},
+ "booleanValue" : {"boolean" : true},
+ "floatValue" : {"float" : 1.0},
+ "doubleValue" : {"double" : 1.0},
+ "bytesValue" : {"bytes" : "\u00FF"},
+ "nullValue" : null
+}
+{
+ "key" : {"string": "B"},
+ "intValue" : {"int" : 2},
+ "longValue" : {"long" :2},
+ "booleanValue" : {"boolean" : true},
+ "floatValue" : {"float" : 2.0},
+ "doubleValue" : {"double" : 2.0},
+ "bytesValue" : {"bytes" : "\u00FE"},
+ "nullValue" : null
+}
+{
+ "key" : {"string": "C"},
+ "intValue" : {"int" : 3},
+ "longValue" : {"long" :3},
+ "booleanValue" : {"boolean" : false},
+ "floatValue" : {"float" : 3.0},
+ "doubleValue" : {"double" : 3.0},
+ "bytesValue" : {"bytes" : "\u00FD"},
+ "nullValue" : null
+}
\ No newline at end of file