You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/01/31 22:18:25 UTC

svn commit: r1065790 [1/2] - in /pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/ contrib/piggybank/java/src/test/java/org/apache/pig/p...

Author: daijy
Date: Mon Jan 31 21:18:24 2011
New Revision: 1065790

URL: http://svn.apache.org/viewvc?rev=1065790&view=rev
Log:
PIG-1748: Add load/store function AvroStorage for avro data

Added:
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASCommons.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASFsInput.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASLog.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumWriter.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordWriter.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorage.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testArrayDefault.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testArrayWithSchema.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordSplit1.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordSplit2.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordWithFieldSchema.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/test_array.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/test_record.avro   (with props)
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/ivy.xml
    pig/trunk/ivy/libraries.properties

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1065790&r1=1065789&r2=1065790&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jan 31 21:18:24 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1748: Add load/store function AvroStorage for avro data (guolin2001, jghoman via daijy)
+
 PIG-1769: Consistency for HBaseStorage (dvryaboy)
 
 PIG-1786: Move describe/nested describe to new logical plan (daijy)

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASCommons.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASCommons.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASCommons.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASCommons.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+
+/**
+ * This is utility class for this package
+ */
+public class ASCommons {
+
+    public static Schema BooleanSchema = Schema.create(Schema.Type.BOOLEAN);
+    public static Schema LongSchema = Schema.create(Schema.Type.LONG);
+    public static Schema FloatSchema = Schema.create(Schema.Type.FLOAT);
+    public static Schema DoubleSchema = Schema.create(Schema.Type.DOUBLE);
+    public static Schema IntSchema = Schema.create(Schema.Type.INT);
+    public static Schema StringSchema = Schema.create(Schema.Type.STRING);
+    public static Schema BytesSchema = Schema.create(Schema.Type.BYTES);
+    public static Schema NullSchema = Schema.create(Schema.Type.NULL);
+
+    private static final String NONAME = "NONAME";
+    private static final String PIG_TUPLE_WRAPPER = "PIG_WRAPPER";
+
+    /** ignore hdfs files with prefix "_" and "." */
+    public static PathFilter PATH_FILTER = new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+            return !path.getName().startsWith("_")
+                        && !path.getName().startsWith(".");
+        }
+    };
+
+    static String getDummyFieldName(int index) {
+        return NONAME + "_" + index;
+    }
+
+    /** create  an avro field using the given schema */
+    public static Field createUDField(int index, Schema s) {
+        return new Field(getDummyFieldName(index), s, null, null);
+    }
+
+    /** create an avro field with null schema (it is a space holder) */
+    public static Schema createUDPartialRecordSchema() {
+        return Schema.createRecord(NONAME, null, null, false);
+    }
+
+    /** check whether a schema is a space holder (using field name) */
+    public static boolean isUDPartialRecordSchema(Schema s) {
+        return s.getName().equals(NONAME);
+    }
+
+    /** get field schema given index number */
+    public static Field getUDField(Schema s, int index) {
+        return s.getField(getDummyFieldName(index));
+    }
+
+    /**
+     * get input paths to job config 
+     */
+    public static void addInputPaths(String pathString, Job job)
+                                    throws IOException {
+        Configuration conf = job.getConfiguration();
+        FileSystem fs = FileSystem.get(conf);
+        
+        Path path = new Path(pathString);
+        FileStatus pathStatus = fs.getFileStatus(path);
+        
+        List<FileStatus> input = new LinkedList<FileStatus>();
+        input.add(pathStatus);
+
+        while (!input.isEmpty()) {
+
+            FileStatus status = input.remove(0);
+            Path p = status.getPath();
+            
+            if (!status.isDir() ) {
+                ASLog.details("Add input path:" + p);
+                FileInputFormat.addInputPath(job, p);
+            } 
+            else {
+                /*list all sub-dirs*/
+                FileStatus[] ss = fs.listStatus(p, PATH_FILTER); 
+            
+                if (ss.length > 0) {
+                    if (noDir(ss) ) {
+                        ASLog.details("Add input path:" + p);
+                        FileInputFormat.addInputPath(job, p);
+                    }
+                    else
+                        input.addAll(Arrays.asList(ss));
+                }
+            }
+        }
+    }
+
+    /** check whether there is NO directory in the input file (status) list*/
+    public static boolean noDir(FileStatus [] ss)
+    throws IOException {
+        for (FileStatus s : ss) {
+            if (s.isDir())
+                return false;
+        }
+        return true;
+    }
+    
+    /** get last file of a hdfs path if it is  a directory; 
+     *   or return the file itself if path is a file
+     */
+    public static Path getLast(String path, FileSystem fs) 
+    throws IOException {
+        return getLast(new Path(path), fs);
+    }
+
+    /** get last file of a hdfs path if it is  a directory; 
+     *   or return the file itself if path is a file
+     */
+    public static Path getLast(Path path, FileSystem fs) 
+    throws IOException {
+
+        FileStatus[] statuses = fs.listStatus(path, PATH_FILTER);
+
+        if (statuses.length == 0) {
+            return path;
+        } else {
+            Arrays.sort(statuses);
+            return statuses[statuses.length - 1].getPath();
+        }
+    }
+
+    /**
+     * Wrap an avro schema as a nullable union if needed. 
+     * For instance, wrap schema "int" as ["null", "int"]
+     */
+    public static Schema wrapAsUnion(Schema schema, boolean nullable) {
+        if (nullable) {
+            /* if schema is an acceptable union, then return itself */
+            if (schema.getType().equals(Schema.Type.UNION)
+                    && isAcceptableUnion(schema))
+                return schema;
+            else
+                return Schema.createUnion(Arrays.asList(NullSchema, schema));
+        } else
+            /*do not wrap it if not */
+            return schema;
+    }
+
+    /** determine whether the input schema contains recursive records */
+    public static boolean containRecursiveRecord(Schema s) {
+        /*initialize empty set of defined record names*/
+        Set<String> set = new HashSet<String> ();
+        return containRecursiveRecord(s, set);
+    }
+  
+    /**
+     * Called by {@link #containRecursiveRecord(Schema)} and it recursively checks 
+     * whether the input schema contains recursive records. 
+     */
+    protected static boolean containRecursiveRecord(
+                          Schema s, Set<String> definedRecordNames) {
+        
+        /* if it is a record, check itself and all fields*/
+        if (s.getType().equals(Schema.Type.RECORD)) {
+            String name = s.getName();
+            if (definedRecordNames.contains(name)) return true;
+            
+            /* add its own name into defined record set*/
+            definedRecordNames.add(s.getName());
+            
+            /* check all fields */
+            List<Field> fields = s.getFields();
+            for (Field field: fields) {
+                Schema fs = field.schema();
+                if (containRecursiveRecord(fs, definedRecordNames)) 
+                    return true;
+            }
+            return false;
+        }
+        
+        /* if it is an array, check its element type */
+        else if (s.getType().equals(Schema.Type.ARRAY)) {
+            Schema fs = s.getElementType();
+            return containRecursiveRecord(fs, definedRecordNames);
+        }
+        
+        /*if it is a map, check its value type */
+        else if (s.getType().equals(Schema.Type.MAP)) {
+            Schema vs = s.getValueType();
+            return containRecursiveRecord(vs, definedRecordNames);
+        }
+        
+        /* if it is a union, check all possible types */
+        else if (s.getType().equals(Schema.Type.UNION)) {
+            List<Schema> types = s.getTypes();
+            for (Schema type: types) {
+                if (containRecursiveRecord(type, definedRecordNames)) 
+                    return true;
+            }
+            return false;
+        }
+        
+        /* return false for other cases */
+        else {
+            return false;
+        }
+    }
+    
+    /** determine whether the input schema contains generic unions */
+    public static boolean containGenericUnion(Schema s) {
+             
+        /* if it is a record, check all fields*/
+        if (s.getType().equals(Schema.Type.RECORD)) {
+            List<Field> fields = s.getFields();
+            for (Field field: fields) {
+                Schema fs = field.schema();
+                if (containGenericUnion(fs)) 
+                    return true;
+            }
+            return false;
+        }
+        
+        /* if it is an array, check its element type */
+        else if (s.getType().equals(Schema.Type.ARRAY)) {
+            Schema fs = s.getElementType();
+            return  containGenericUnion(fs) ;
+        }
+        
+        /*if it is a map, check its value type */
+        else if (s.getType().equals(Schema.Type.MAP)) {
+            Schema vs = s.getValueType();
+            return containGenericUnion(vs) ;
+        }
+        
+        /* if it is a union, check all possible types and itself */
+        else if (s.getType().equals(Schema.Type.UNION)) {
+            List<Schema> types = s.getTypes();
+            for (Schema type: types) {
+                if (containGenericUnion(type) ) 
+                    return true;
+            }
+            /* check whether itself is acceptable (null-union) */
+            return  ! isAcceptableUnion (s);
+        }
+        
+        /* return false for other cases */
+        else {
+            return false;
+        }
+    }
+    
+    /** determine whether a union is a nullable union; 
+     * note that this function doesn't check containing
+     * types of the input union recursively. */
+    public static boolean isAcceptableUnion(Schema in) {
+
+        if (! in.getType().equals(Schema.Type.UNION)) {
+            return false;
+        }
+
+        List<Schema> types = in.getTypes();
+        if (types.size() <= 1) {
+            return true;
+        } else if (types.size() > 2) {
+            return false; /*contains more than 2 types */
+        } else {
+            /* one of two types is NULL */
+            return   types.get(0).getType().equals(Schema.Type.NULL)
+                          || types.get(1) .getType().equals(Schema.Type.NULL);
+        }
+    }
+
+    /** wrap a pig schema as tuple */
+    public static ResourceFieldSchema wrapAsTuple(
+                                    ResourceFieldSchema subFieldSchema)
+    throws IOException {
+        ResourceSchema listSchema = new ResourceSchema();
+        listSchema.setFields(new ResourceFieldSchema[] { subFieldSchema });
+
+        ResourceFieldSchema tupleWrapper = new ResourceFieldSchema();
+        tupleWrapper.setType(DataType.TUPLE);
+        tupleWrapper.setName(PIG_TUPLE_WRAPPER);
+        tupleWrapper.setSchema(listSchema);
+
+        return tupleWrapper;
+    }
+
+    /** check whether it is just a wrapped tuple */
+    public static boolean isTupleWrapper(ResourceFieldSchema pigSchema) {
+        return pigSchema.getType() == DataType.TUPLE
+                   && pigSchema.getName().equals(ASCommons.PIG_TUPLE_WRAPPER);
+    }
+
+    /** extract schema from a nullable union */
+    public static Schema getAcceptedType(Schema in) {
+        if (!isAcceptableUnion(in)) 
+            throw new RuntimeException (
+                    "Cannot call this function on a unacceptable union");
+    
+        List<Schema> types = in.getTypes();
+        switch (types.size()) {
+        case 0: 
+            return null; /*union with no type*/
+        case 1: 
+            return types.get(0); /*union with one type*/
+        case 2:
+            return  (types.get(0).getType().equals(Schema.Type.NULL))
+                        ? types.get(1) 
+                        : types.get(0);
+        default:
+            return null;
+        } 
+    }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASFsInput.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASFsInput.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASFsInput.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASFsInput.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.avro.file.SeekableInput;
+
+/** Adapt an {@link FSDataInputStream} to {@link SeekableInput}. */
+public class ASFsInput implements Closeable, SeekableInput {
+    private final FSDataInputStream stream;
+    private final long len;
+
+    /** Construct given a path and a configuration. */
+    public ASFsInput(Path path, TaskAttemptContext context) throws IOException {
+        this.stream = path.getFileSystem(context.getConfiguration()).open(path);
+        this.len = path.getFileSystem(context.getConfiguration())
+                                        .getFileStatus(path).getLen();
+    }
+
+    public long length() {
+        return len;
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException {
+        return stream.read(b, off, len);
+    }
+
+    public void seek(long p) throws IOException {
+        stream.seek(p);
+    }
+
+    public long tell() throws IOException {
+        return stream.getPos();
+    }
+
+    public void close() throws IOException {
+        stream.close();
+    }
+}

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASLog.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASLog.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASLog.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ASLog.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+/**
+ *  Simple logging utils of this package
+ */
+public class ASLog {
+
+    static private int debugLevel = -1;
+    
+    final static public int INFO = 2;
+    final static public int FUNC_CALL = 3;
+    final static public int DETAILS = 5;
+    
+    static void setDebugLevel (int l) {
+        debugLevel = l;
+    }
+    
+    static void info (String msg) {
+        if (debugLevel >= INFO) 
+            System.out.println("INFO:" + msg);
+    }
+    
+    static void funcCall (String msg) {
+        if (debugLevel >= FUNC_CALL) 
+            System.out.println("DEBUG:" + msg);
+    }
+    
+    static void details (String msg) {
+        if (debugLevel >= DETAILS)
+            System.out.println("DEBUG:" + msg);
+    }
+    
+    static void debug(int level, String msg) {
+        if (debugLevel >= level)
+            System.out.println("DEBUG:" + msg);
+    }
+    
+    static void warn( String msg) {
+        System.out.println("WARNING:" + msg);
+    } 
+    
+    static void error(String msg) {
+        System.err.println("ERROR:" + msg);
+    }
+    
+}
+

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.data.DataType;
+
+/**
+ * This class converts Avro schema to Pig schema
+ */
+public class AvroSchema2Pig {
+
+    public static String RECORD = "RECORD";
+    public static String FIELD = "FIELD";
+    public static String ARRAY_FIELD = "ARRAY_ELEM";
+    public static String MAP_VALUE_FIELD = "m_value";
+
+    /**
+     * Wrap a pig type to a field schema
+     */
+    public static ResourceFieldSchema getPigSchema(byte pigType,
+                                    String fieldName) {
+        return new ResourceFieldSchema( new FieldSchema(fieldName, pigType));
+    }
+
+    /**
+     * Convert an Avro schema to a Pig schema
+     */
+    public static ResourceSchema convert(Schema schema) throws IOException {
+
+        if (ASCommons.containGenericUnion(schema))
+            throw new IOException (
+                                   "We don't accept schema containing generic unions.");
+        
+        if (ASCommons.containRecursiveRecord(schema))
+            throw new IOException (
+                                   "We don't accept schema containing recursive records.");
+        
+        ResourceFieldSchema inSchema = inconvert(schema, FIELD);
+
+        ResourceSchema tupleSchema;
+        if (inSchema.getType() == DataType.TUPLE) {
+            tupleSchema = inSchema.getSchema();
+        } else { // other typs
+            ResourceFieldSchema tupleWrapper = ASCommons.wrapAsTuple(inSchema);
+
+            ResourceSchema topSchema = new ResourceSchema();
+            topSchema.setFields(new ResourceFieldSchema[] { tupleWrapper });
+
+            tupleSchema = topSchema;
+
+        }
+        return tupleSchema;
+    }
+
+    /**
+     * Convert a schema with field name to a pig schema
+     */
+     private static ResourceFieldSchema inconvert(Schema in, String fieldName)
+                                    throws IOException {
+
+        ASLog.details("InConvert avro schema with field name "
+                                        + fieldName);
+
+        Schema.Type avroType = in.getType();
+        ResourceFieldSchema fieldSchema = new ResourceFieldSchema();
+        fieldSchema.setName(fieldName);
+
+        if (avroType.equals(Schema.Type.RECORD)) {
+
+            ASLog.details( "convert to a pig tuple");
+
+            fieldSchema.setType(DataType.TUPLE);
+            ResourceSchema tupleSchema = new ResourceSchema();
+            List<Schema.Field> fields = in.getFields();
+            ResourceFieldSchema[] childFields = new ResourceFieldSchema[fields
+                                            .size()];
+            int index = 0;
+            for (Schema.Field field : fields) {
+                childFields[index++] = inconvert(field.schema(), field.name());
+            }
+
+            tupleSchema.setFields(childFields);
+            fieldSchema.setSchema(tupleSchema);
+
+        } else if (avroType.equals(Schema.Type.ARRAY)) {
+
+            ASLog.details( "convert array to a pig bag");
+            fieldSchema.setType(DataType.BAG);
+            Schema elemSchema = in.getElementType();
+            ResourceFieldSchema subFieldSchema = inconvert(elemSchema,
+                                            ARRAY_FIELD);
+            add2BagSchema(fieldSchema, subFieldSchema);
+
+        } else if (avroType.equals(Schema.Type.MAP)) {
+
+            ASLog.details( "convert map to a pig map");
+            fieldSchema.setType(DataType.MAP);
+
+        } else if (avroType.equals(Schema.Type.UNION)) {
+
+            if (ASCommons.isAcceptableUnion(in)) {
+                Schema acceptSchema = ASCommons.getAcceptedType(in);
+                ResourceFieldSchema realFieldSchema = inconvert(acceptSchema,
+                                                null);
+                fieldSchema.setType(realFieldSchema.getType());
+                fieldSchema.setSchema(realFieldSchema.getSchema());
+            } else
+                throw new IOException("Do not support generic union:" + in);
+
+        } else if (avroType.equals(Schema.Type.FIXED)) {
+             fieldSchema.setType(DataType.BYTEARRAY);
+        } else if (avroType.equals(Schema.Type.BOOLEAN)) {
+            fieldSchema.setType(DataType.BOOLEAN);
+        } else if (avroType.equals(Schema.Type.BYTES)) {
+            fieldSchema.setType(DataType.BYTEARRAY);
+        } else if (avroType.equals(Schema.Type.DOUBLE)) {
+            fieldSchema.setType(DataType.DOUBLE);
+        } else if (avroType.equals(Schema.Type.ENUM)) {
+            fieldSchema.setType(DataType.CHARARRAY);
+        } else if (avroType.equals(Schema.Type.FLOAT)) {
+            fieldSchema.setType(DataType.FLOAT);
+        } else if (avroType.equals(Schema.Type.INT)) {
+            fieldSchema.setType(DataType.INTEGER);
+        } else if (avroType.equals(Schema.Type.LONG)) {
+            fieldSchema.setType(DataType.LONG);
+        } else if (avroType.equals(Schema.Type.STRING)) {
+            fieldSchema.setType(DataType.CHARARRAY);
+        } else if (avroType.equals(Schema.Type.NULL)) { 
+            // value of NULL is always NULL
+            fieldSchema.setType(DataType.INTEGER);
+        } else {
+            throw new IOException("Unsupported avro type:" + avroType);
+        }
+        return fieldSchema;
+    }
+
+     /**
+      * Add a field schema to a bag schema
+      */
+    static protected void add2BagSchema(ResourceFieldSchema fieldSchema,
+                                    ResourceFieldSchema subFieldSchema)
+                                    throws IOException {
+
+        ResourceFieldSchema wrapped = (subFieldSchema.getType() == DataType.TUPLE) 
+                                                              ? subFieldSchema
+                                                              : ASCommons.wrapAsTuple(subFieldSchema);
+
+        ResourceSchema listSchema = new ResourceSchema();
+        listSchema.setFields(new ResourceFieldSchema[] { wrapped });
+
+        fieldSchema.setSchema(listSchema);
+
+    }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+
+/**
+ * This class creats two maps out of a given Avro schema. And it supports
+ * looking up avro schemas using either type name or field name.
+ * 
+ * 1. map[type name] = > avro schema
+ * 2. map[field name] => avro schema
+ *
+ */
+public class AvroSchemaManager {
+
+    /**map[field name] => schema */
+    Map<String, Schema> name2Schema = null;
+    /**map[type name]=> schema*/
+    Map<String, Schema> typeName2Schema = null;
+
+    /**
+     * Construct with a given schema
+     */
+    public AvroSchemaManager(Schema schema) {
+
+        name2Schema = new HashMap<String, Schema>();
+        typeName2Schema = new HashMap<String, Schema>();
+
+        init(null, schema, false);
+    }
+
+    private boolean isNamedSchema(Schema schema) {
+        Type type = schema.getType();
+        return type.equals(Type.RECORD) || type.equals(Type.ENUM)
+                                        || type.equals(Type.FIXED);
+    }
+
+    /**
+     * Initialize given a schema
+     */
+    protected void init(String namespace, Schema schema, 
+                                    boolean ignoreNameMap) {
+
+        /* put to map[type name]=>schema */
+        if (isNamedSchema(schema)) {
+            String typeName = schema.getName();
+            if (typeName2Schema.containsKey(typeName))
+                ASLog.warn("Duplicate schemas defined for type:"
+                                                + typeName
+                                                + ". will ignore the second one:"
+                                                + schema);
+            else {
+                ASLog.details("add " + schema.getName() + "=" + schema
+                                                + " to type2Schema");
+                typeName2Schema.put(schema.getName(), schema);
+            }
+        }
+
+        /* put field schema to map[field name]=>schema*/
+        if (schema.getType().equals(Type.RECORD)) {
+
+            List<Field> fields = schema.getFields();
+            for (Field field : fields) {
+
+                Schema fieldSchema = field.schema();
+                String name = (namespace == null) ? field.name() 
+                                           : namespace + "." + field.name();
+
+                if (!ignoreNameMap) {
+                    if (name2Schema.containsKey(name))
+                        ASLog.warn("Duplicate schemas defined for alias:"
+                                                        + name
+                                                        + ". Will ignore the second one:"
+                                                        + fieldSchema);
+                    else {
+                        ASLog.details( "add " + name + "="
+                                                        + fieldSchema
+                                                        + " to name2Schema");
+                        name2Schema.put(name, fieldSchema);
+                    }
+                }
+
+                init(name, fieldSchema, ignoreNameMap);
+            }
+        } else if (schema.getType().equals(Type.UNION)) {
+
+            if (ASCommons.isAcceptableUnion(schema)) {
+                Schema realSchema = ASCommons.getAcceptedType(schema);
+                init(namespace, realSchema, ignoreNameMap);
+            } else {
+                List<Schema> list = schema.getTypes();
+                for (Schema s : list) {
+                    init(namespace, s, true);
+                }
+            }
+        } else if (schema.getType().equals(Type.ARRAY)) {
+            Schema elemSchema = schema.getElementType();
+            init(namespace, elemSchema, true);
+        } else if (schema.getType().equals(Type.MAP)) {
+            Schema valueSchema = schema.getValueType();
+            init(namespace, valueSchema, true);
+        }
+    }
+
+    /**
+     * Look up schema using type name or field name
+     */
+    public Schema getSchema(String name) {
+        Schema schema = typeName2Schema.get(name);
+        schema = (schema == null) ? name2Schema.get(name) : schema;
+        return schema;
+
+    }
+}

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pig.Expression;
+import org.apache.pig.FileInputLoadFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+/**
+ * AvroStorage is used to load/store Avro data <br/>
+ * Document can be found <a href='http://snaprojects.jira.com/wiki/display/HTOOLS/AvroStorage+-+Pig+support+for+Avro+data'>here</a>
+ */
+public class AvroStorage extends FileInputLoadFunc implements
+        StoreFuncInterface, LoadMetadata {
+
+    /* storeFunc parameters */
+    private static final String NOTNULL = "NOTNULL"; 
+    private static final String AVRO_OUTPUT_SCHEMA_PROPERTY = "avro_output_schema";
+    private static final String SCHEMA_DELIM = "#";
+    private static final String SCHEMA_KEYVALUE_DELIM = "@";
+    
+    /* FIXME: we use this variable to distinguish schemas specified
+     * by different AvroStorage calls and will remove this once 
+     * StoreFunc provides access to Pig output schema in backend. 
+     * Default value is 0.
+     */
+    private int storeFuncIndex = 0;
+    private PigAvroRecordWriter writer = null;    /* avro record writer */
+    private Schema outputAvroSchema = null;  /* output avro schema */ 
+    /* indicate whether data is nullable */
+    private boolean nullable = true;                    
+    
+    /* loadFunc parameters */
+    private PigAvroRecordReader reader = null;   /* avro record writer */
+    private Schema inputAvroSchema = null;    /* input avro schema */
+
+
+    /* implement LoadFunc */
+    
+    /**
+     * Set input location and obtain input schema. 
+     * 
+     * FIXME: currently we assume all avro files under the same "location" 
+     * share the same schema and will throw exception if not. 
+     */
+    @Override
+    public void setLocation(String location, Job job) throws IOException {
+        ASCommons.addInputPaths(location, job);
+        if (inputAvroSchema == null) {
+            inputAvroSchema = getAvroSchema(location, job);
+        }
+    }
+
+    protected Schema getAvroSchema(String location, Job job) 
+    throws IOException {
+        Configuration conf = job.getConfiguration();
+        FileSystem fs = FileSystem.get(conf);
+        Path path = new Path(location);
+        return getAvroSchema(path, fs);
+    }
+
+    /**
+     * Get avro schema of input path. There are three cases:
+     * 1. if path is a file, then return its avro schema;
+     * 2. if path is a first-level directory (no sub-directories), then 
+     * return the avro schema of one underlying file; 
+     * 3. if path contains sub-directories, then recursively check 
+     * whether all of them share the same schema and return it 
+     * if so or throw an exception if not.
+     * 
+     * @param path                          input path
+     * @param fs                              file system  
+     * @return                                  avro schema of data
+     * @throws IOException            if underlying sub-directories do not
+     *                                                 share the same schema; or if input path
+     *                                                 is empty or does not exist
+     *   
+     */
+    @SuppressWarnings("deprecation")
+    protected Schema getAvroSchema(Path path, FileSystem fs) 
+    throws IOException {
+        /* if path is first level directory or is a file */
+        if (!fs.isDirectory(path)) {
+            return getSchema(path, fs);
+        }
+        
+        FileStatus[] ss = fs.listStatus(path, ASCommons.PATH_FILTER);
+        Schema schema = null;
+        if (ss.length > 0) {
+            if (ASCommons.noDir (ss))  
+                return getSchema(path, fs);
+            
+            /*otherwise, check whether schemas of underlying directories are the
+             * same */
+            for (FileStatus s : ss) {
+                Schema newSchema = getAvroSchema(s.getPath(), fs);
+                if (schema == null)
+                    schema = newSchema;
+                else if (!schema.equals(newSchema)) {
+                    throw new IOException( "Input path is " + path
+                                                    + ". Sub-direcotry " + s.getPath()
+                                                    + " contains different schema " 
+                                                    + newSchema 
+                                                    + " than " + schema);
+               
+                }
+            }
+        }
+        
+        if (schema == null)
+            throw new IOException("Cannot get avro schema! Input path " + path 
+                                                + " might be empty.");
+        return schema;
+    }
+
+    /**
+     * This method is called by {@link #getAvroSchema}. The default implementation 
+     * returns the schema of an avro file; or the schema of the last file in a first-level 
+     * directory (it does not contain sub-directories).
+     * 
+     * @param path                          path of a file or first level directory
+     * @param fs                              file system
+     * @return                                  avro schema
+     * @throws IOException
+     */
+    protected Schema getSchema(Path path, FileSystem fs) throws IOException {
+        /* get path of the last file */
+        Path lastFile = ASCommons.getLast(path, fs);
+
+        /* read in file and obtain schema */
+        GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
+        InputStream hdfsInputStream = fs.open(lastFile);
+        DataFileStream<Object> avroDataStream = new DataFileStream<Object>(
+                                        hdfsInputStream, avroReader);
+        Schema ret = avroDataStream.getSchema();
+        avroDataStream.close();
+        return ret;
+    }
+
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public InputFormat getInputFormat() throws IOException {
+        ASLog.funcCall("getInputFormat");
+        return new PigAvroInputFormat(inputAvroSchema);
+    }
+
+    @SuppressWarnings("rawtypes") 
+    @Override
+    public void prepareToRead(RecordReader reader,
+                                                PigSplit split)
+    throws IOException {
+        ASLog.funcCall("prepareToRead");
+        this.reader = (PigAvroRecordReader) reader;
+    }
+
+    @Override
+    public Tuple getNext() throws IOException {
+        try {
+            boolean notDone = reader.nextKeyValue();
+            if (!notDone) {
+                return null;
+            }
+            return (Tuple) this.reader.getCurrentValue();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    
+    /* implement LoadMetadata */
+    
+    /**
+     * Get avro schema from "location" and return the converted
+     * PigSchema.
+     */
+    @Override
+    public ResourceSchema getSchema(String location, Job job)
+                                    throws IOException {
+
+        /* get avro schema */
+        ASLog.funcCall( "getSchema");
+        if (inputAvroSchema == null) {
+            inputAvroSchema = getAvroSchema(location, job);
+        }
+        ASLog.details( "avro input schema:"  + inputAvroSchema);
+
+        /* convert to pig schema */
+        ResourceSchema pigSchema = AvroSchema2Pig.convert(inputAvroSchema);
+        ASLog.details("pig input schema:" + pigSchema);
+        return pigSchema;
+    }
+
+    @Override
+    public ResourceStatistics getStatistics(String location, Job job)
+                                    throws IOException {
+        return null;
+    }
+
+    @Override
+    public String[] getPartitionKeys(String location, Job job)
+                                    throws IOException {
+        return null;
+    }
+
+    @Override
+    public void setPartitionFilter(Expression partitionFilter)
+                                    throws IOException {
+
+    }
+
+ 
+    /* implement StoreFunc*/
+ 
+    /**
+     * Empty constructor. Output schema is derived from pig schema.
+     */
+    public AvroStorage() {
+        outputAvroSchema = null;
+        nullable = true;
+        ASLog.setDebugLevel(0);
+    }
+
+    /**
+     * Constructor of quoted string list
+     * 
+     * @param parts                         quoted string list
+     * @throws IOException
+     * @throws ParseException
+     */
+    public AvroStorage(String[] parts) throws IOException, ParseException {
+        
+        outputAvroSchema = null;
+        nullable = true;
+
+        /*parse input parameters */
+        Map<String, Object> map = parts.length > 1 ? parseStringList(parts)
+                                        : parseJsonString(parts[0]);
+
+        init(map);  /* initialize */
+    }
+
+    /**
+     * build a property map from a json object 
+     * 
+     * @param jsonString                json object in string format
+     * @return                                  a property map
+     * @throws IOException
+     * @throws ParseException
+     */
+    @SuppressWarnings("unchecked")
+    protected Map<String, Object> parseJsonString(String jsonString)
+                                    throws IOException, ParseException {
+
+        /*parse the json object */
+        JSONParser parser = new JSONParser();
+        JSONObject obj = (JSONObject) parser.parse(jsonString);
+
+        Set<Entry<String, Object>> entries = obj.entrySet();
+        for (Entry<String, Object> entry : entries) {
+
+            String key = entry.getKey();
+            Object value = entry.getValue();
+
+            if (key.equalsIgnoreCase("debug") || key.equalsIgnoreCase("index")) {
+                /* convert long values to integer */
+                int v = (Integer) ((Long) value).intValue();
+                obj.put(key, v);
+            }
+            else if (key.equalsIgnoreCase("schema") || key.matches("field\\d+")) {
+               /* convert avro schema (as json object) to string */
+                obj.put(key, value.toString().trim());
+            }
+ 
+        }
+        return obj;
+    }
+
+    /**
+     * build a property map from a string list 
+     * 
+     * @param parts                         input string list
+     * @return                                  a property map
+     * @throws IOException
+     * @throws ParseException
+     */
+    protected Map<String, Object> parseStringList(String[] parts)
+                                    throws IOException {
+
+        Map<String, Object> map = new HashMap<String, Object>();
+
+        for (int i = 0; i < parts.length - 1; i += 2) {
+            String name = parts[i].trim();
+            String value = parts[i+1].trim();
+            if (name.equalsIgnoreCase("debug")
+                         || name.equalsIgnoreCase("index")) {
+                /* store value as integer */
+                map.put(name, Integer.parseInt(value));
+            } else if (name.equalsIgnoreCase("data")
+                         || name.equalsIgnoreCase("same")
+                         || name.equalsIgnoreCase("schema")
+                         || name.matches("field\\d+")) {
+                /* store value as string */
+                map.put(name, value);
+            } else if (name.equalsIgnoreCase("nullable")) {
+                /* store value as boolean */
+                map.put(name, Boolean.getBoolean(value));
+            } else
+                throw new IOException("Invalid parameter:" + name);
+        }
+        return map;
+    }
+
+    /**
+     * Initialize output avro schema using input property map
+     */
+    protected void init(Map<String, Object> inputs) throws IOException {
+
+        /*used to store field schemas */
+        List<Field> fields = null;
+
+        /* set debug level */
+        if (inputs.containsKey("debug")) {
+            ASLog.setDebugLevel((Integer) inputs.get("debug"));
+        }
+
+        /* initialize schema manager, if any */
+        AvroSchemaManager schemaManager = null;
+        if (inputs.containsKey("data")) {
+            Path path = new Path((String) inputs.get("data"));
+            ASLog.details( "data path=" + path.toUri().toString());
+            FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
+            Schema schema = getAvroSchema(path, fs);
+            schemaManager = new AvroSchemaManager(schema);
+        }
+
+        /* iterate input property map */
+        for (Entry<String, Object> entry : inputs.entrySet()) {
+            String name = entry.getKey().trim();
+            Object value = entry.getValue();
+
+            if (name.equalsIgnoreCase("index")) { 
+                /* set index of store function */
+                storeFuncIndex = (Integer) value;
+            } else if (name.equalsIgnoreCase("same")) { 
+                /* use schema in the specified path as output schema */
+                Path path = new Path( ((String) value).trim());
+                ASLog.details("data path=" + path.toUri().toString());
+                FileSystem fs = FileSystem.get(path.toUri(),
+                                                new Configuration());
+                outputAvroSchema = getAvroSchema(path, fs);
+            } else if (name.equalsIgnoreCase("nullable")) {
+                nullable = (Boolean) value;
+            } else if (name.equalsIgnoreCase("schema")) {
+                outputAvroSchema = Schema.parse((String) value);
+            } else if (name.matches("field\\d+")) { 
+                /*set schema of dth field */
+                if (fields == null)
+                    fields = new ArrayList<Field>();
+
+                int index = Integer.parseInt(name.substring("field".length()));
+                String content = ((String) value).trim();
+                Field field = null;
+                if (content.equalsIgnoreCase(NOTNULL)) {
+                    /* null means deriving avro schema from pig schema but not null*/
+                    field = ASCommons.createUDField(index, null); 
+                } else if (content.startsWith("def:")) {
+                    if (schemaManager == null)
+                        throw new IOException(
+                                 "Please specify data parameter (using \"data\") before this one.");
+
+                    String alias = content.substring("def:".length());
+                    Schema s = schemaManager.getSchema(alias);
+                    if (s == null)
+                        throw new IOException(
+                                                        "Cannot find matching schema for alias:"
+                                                                                        + alias);
+                    /* use pre-defined schema*/
+                    field = ASCommons.createUDField(index, s); 
+
+                    ASLog.details( "Use pre-defined schema(" + alias
+                                                    + "): " + s + " for field "
+                                                    + index);
+                } else {
+                    Schema schema = null;
+                    try {
+                        schema = Schema.parse(content);
+                    } catch (RuntimeException e) {
+                        /* might be primary schema like int or long */
+                        schema = Schema.parse("\"" + content + "\"");
+                    }
+
+                    field = ASCommons.createUDField(index, schema);
+                }
+
+                fields.add(field);
+            } else if (!name.equalsIgnoreCase("data")
+                                  && !name.equalsIgnoreCase("debug")) { 
+                throw new IOException("Invalid parameter:" + name);
+            }
+        }
+
+        /* if schemas of some fields are set */
+        if (fields != null && outputAvroSchema == null) {
+            outputAvroSchema = ASCommons.createUDPartialRecordSchema();
+            outputAvroSchema.setFields(fields);
+        }
+
+        /* print warning if both output and nullable are specified;
+         * and nullable will be ignored.*/
+        if (outputAvroSchema != null) {
+            if (!nullable) {
+                ASLog.warn("Invalid parameter--nullable cannot be false while "
+                                                + "output schema is not null. Will ignore nullable.\n\n");
+                nullable = true;
+            }
+        }
+
+    }
+
+    @Override
+    public String relToAbsPathForStoreLocation(String location, Path curDir)
+                                    throws IOException {
+        return LoadFunc.getAbsolutePath(location, curDir);
+    }
+
+    @Override
+    public void setStoreLocation(String location, Job job) throws IOException {
+        ASLog.details("output location=" + location);
+        FileOutputFormat.setOutputPath(job, new Path(location));
+    }
+
+    /**
+     * Append newly specified schema 
+     */
+    @Override
+    public void checkSchema(ResourceSchema s) throws IOException {
+        ASLog.funcCall( "Check schema");
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(ResourceSchema.class);
+        String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
+        ASLog.details( "Previously defined schemas=" + prevSchemaStr);
+
+        String key = getSchemaKey();
+        Map<String, String> schemaMap = (prevSchemaStr != null) 
+                                                                ? parseSchemaMap(prevSchemaStr)
+                                                                : null;
+
+        if (schemaMap != null && schemaMap.containsKey(key)) {
+            ASLog.warn("Duplicate value for key-" + key
+                                            + ". Will ignore the new schema.");
+            return;
+        }
+
+        /* validate and convert output schema */
+        Schema schema = outputAvroSchema != null 
+                                      ? PigSchema2Avro.validateAndConvert(outputAvroSchema, s)
+                                      : PigSchema2Avro.convert(s, nullable);
+
+        ASLog.info("key=" + key + " outputSchema=" + schema);
+
+        String schemaStr = schema.toString();
+        String append = key + SCHEMA_KEYVALUE_DELIM + schemaStr;
+
+        String newSchemaStr = (schemaMap != null) 
+                                                ? prevSchemaStr + SCHEMA_DELIM + append 
+                                                : append;
+        property.setProperty(AVRO_OUTPUT_SCHEMA_PROPERTY, newSchemaStr);
+        ASLog.details( "New schemas=" + newSchemaStr);
+
+    }
+
+    private String getSchemaKey() {
+        return Integer.toString(storeFuncIndex);
+    }
+
+    private Map<String, String> parseSchemaMap(String input) throws IOException {
+        ASLog.details("Parse schema map from " + input);
+        String[] entries = input.split(SCHEMA_DELIM);
+        Map<String, String> map = new HashMap<String, String>();
+        for (String entry : entries) {
+
+            ASLog.details("Entry = " + entry);
+            if (entry.length() == 0)
+                continue;
+
+            String[] parts = entry.split(SCHEMA_KEYVALUE_DELIM);
+            if (parts.length != 2)
+                throw new IOException("Expect 2 fields in " + entry);
+            map.put(parts[0], parts[1]);
+        }
+        return map;
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public OutputFormat getOutputFormat() throws IOException {
+        ASLog.funcCall("getOutputFormat");
+
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(ResourceSchema.class);
+        String allSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
+        Map<String, String> map = (allSchemaStr != null) 
+                                                    ? parseSchemaMap(allSchemaStr)
+                                                    : null;
+
+        String key = getSchemaKey();
+        Schema schema = (map == null || !map.containsKey(key)) 
+                                      ? outputAvroSchema
+                                      : Schema.parse(map.get(key));
+
+        if (schema == null)
+            throw new IOException("Output schema is null!");
+        ASLog.details("Output schema=" + schema);
+
+        return new PigAvroOutputFormat(schema);
+    }
+
+    @SuppressWarnings("rawtypes") 
+    @Override
+    public  void  prepareToWrite(RecordWriter writer)
+    throws IOException {
+        this.writer = (PigAvroRecordWriter) writer;
+    }
+
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) {
+
+    }
+
+    @Override
+    public void cleanupOnFailure(String location, Job job) throws IOException {
+        StoreFunc.cleanupOnFailureImpl(location, job);
+    }
+
+    @Override
+    public void putNext(Tuple t) throws IOException {
+        try {
+            this.writer.write(NullWritable.get(), t);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.ResolvingDecoder;
+import org.apache.avro.util.Utf8;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * An avro GenericDatumReader which reads in avro data and 
+ * converts them to pig data: tuples, bags, etc.
+ * 
+ */
+public class PigAvroDatumReader extends GenericDatumReader<Object> {
+
+    /** 
+     * Construct where the writer's and reader's schemas are the same. 
+     */
+    public PigAvroDatumReader(Schema schema) {
+        super(schema);
+    }
+
+    /** 
+     * Construct given writer's and reader's schema. 
+     */
+    public PigAvroDatumReader(Schema writer, Schema reader) 
+    throws IOException {
+        super(writer, reader);
+    }
+
+    /**
+     * Called to read a record instance. Overridden to read a pig tuple.
+     */
+    @Override
+    protected Object readRecord(Object old, Schema expected, ResolvingDecoder in)
+    throws IOException {
+
+        /* create an empty tuple */
+        Tuple tuple = (Tuple) newRecord(old, expected);
+
+        for (Field f : in.readFieldOrder()) {
+            tuple.append(read(null, f.schema(), in));
+        }
+
+        return tuple;
+    }
+
+    /**
+     * Called to read a map instance. Overridden to read a pig map.
+     */
+    protected Object readMap(Object old, Schema expected, ResolvingDecoder in)
+    throws IOException {
+        Schema eValue = expected.getValueType();
+        long l = in.readMapStart();
+        Object map = newMap(old, (int) l);
+        if (l > 0) {
+            do {
+                for (int i = 0; i < l; i++) {
+                    addToMap(map, readString(null, ASCommons.StringSchema, in),
+                                      read(null, eValue, in));
+                }
+            } while ((l = in.mapNext()) > 0);
+        }
+        return map;
+    }
+
+    /**
+     * Called to create an enum value. Overridden to create a pig string.
+     */
+    @Override
+    protected Object createEnum(String symbol, Schema schema) {
+        return symbol;
+    }
+
+    /**
+     * Called by the default implementation of {@link #readArray} to retrieve a
+     * value from a reused instance.
+     */
+    @Override
+    protected Object peekArray(Object array) {
+        return null;
+    }
+
+    /**
+     * Called by the default implementation of {@link #readArray} to add a
+     * value. Overridden to append to pig bag.
+     */
+    @Override
+    protected void addToArray(Object array, long pos, Object e) {
+        if (e instanceof Tuple) {
+            ((DataBag) array).add((Tuple) e);
+        } else {
+            Tuple t = new DefaultTuple();
+            t.append(e);
+            ((DataBag) array).add(t);
+        }
+    }
+
+    /**
+     * Called to read a fixed value. Overridden to read a pig byte array.
+     */
+    @Override
+    protected Object readFixed(Object old, Schema expected, Decoder in)
+                                    throws IOException {
+        GenericFixed fixed = (GenericFixed) super.readFixed(old, expected, in);
+        DataByteArray byteArray = new DataByteArray(fixed.bytes());
+        return byteArray;
+     }
+
+    /**
+     * Called to create new record instances. Overridden to return a new tuple.
+     */
+    @Override
+    protected Object newRecord(Object old, Schema schema) {
+        return TupleFactory.getInstance().newTuple();
+    }
+
+    /**
+     * Called to create new array instances. Overridden to return a new bag.
+     */
+    @Override
+    protected Object newArray(Object old, int size, Schema schema) {
+        return BagFactory.getInstance().newDefaultBag();
+
+    }
+
+    /**
+     * Called to read strings. Overridden to return a pig string.
+     */
+    @Override
+    protected Object readString(Object old, Schema expected, Decoder in)
+                                    throws IOException {
+        Utf8 str = (Utf8) super.readString(old, expected, in);
+        return str.toString();
+
+    }
+
+    /**
+     * Called to read byte arrays. Overridden to return a pig byte array.
+     */
+    @Override
+    protected Object readBytes(Object old, Decoder in) throws IOException {
+        ByteBuffer buf = (ByteBuffer) super.readBytes(old, in);
+        DataByteArray byteArray = new DataByteArray(buf.array());
+        return byteArray;
+    }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumWriter.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumWriter.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumWriter.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumWriter.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+/**
+ * An avro GenericDatumWriter to write pig data as Avro data.
+ *
+ */
+public class PigAvroDatumWriter extends GenericDatumWriter<Object> {
+
+    /**
+     * construct with output schema
+     */
+    public PigAvroDatumWriter(Schema schema) {
+        setSchema(schema);
+    }
+
+    @Override
+    protected void write(Schema schema, Object datum, Encoder out)
+                                    throws IOException {
+        try {
+
+            /**
+             * In case users want to get rid of Pig tuple wrapper
+             */
+            if (!schema.getType().equals(Schema.Type.RECORD)
+                    && !schema.getType().equals(Schema.Type.UNION)
+                    && datum instanceof Tuple
+                    && unwrappedInstanceOf(schema, datum)) {
+                Tuple t = (Tuple) datum;
+                if (t.size() > 1)
+                    throw new IOException(  "Incompatible schema:" + schema
+                                                            + " \n for data " + datum);
+                write(schema, t.get(0), out);
+                return;
+            }
+
+            switch (schema.getType()) {
+            case FIXED:
+                writeFixed(schema, datum, out);
+                break;
+            case ENUM:
+                writeEnum(schema, datum, out);
+                break;
+            case STRING:
+                writeString(schema, datum, out);
+                break;
+            case BYTES:
+                writeBytes(datum, out);
+                break;
+            case BOOLEAN:
+                writeBoolean(datum, out);
+                break;
+            case UNION:
+                writeUnion(schema, datum, out);
+                break;
+            case LONG:
+                writeLong(datum, out);
+                break;
+            case FLOAT:
+                writeFloat(datum, out);
+                break;
+            case DOUBLE:
+                writeDouble(datum, out);
+                break;
+            case ARRAY: /*falls through*/
+            case MAP: /*falls through*/
+            case RECORD: /*falls through*/
+            case INT: /*falls through*/
+            case NULL:/*falls through*/
+            default:
+                super.write(schema, datum, out);
+            }
+        } catch (NullPointerException e) {
+            throw npe(e, " of " + schema.getName());
+        }
+    }
+
+    /**
+     * Called to write union. 
+     */
+    protected void writeUnion(Schema schema, Object datum, Encoder out)
+                                    throws IOException {
+        int index = resolveUnion(schema, datum);
+        out.writeIndex(index);
+        write(schema.getTypes().get(index), datum, out);
+    }
+
+    /**
+     * Called to resolve union. 
+     */
+    protected int resolveUnion(Schema union, Object datum) throws IOException {
+        int i = 0;
+        for (Schema type : union.getTypes()) {
+            if (type.getType().equals(Schema.Type.UNION))
+                throw new IOException(
+                        "A union cannot immediately contain other unions.");
+            if (instanceOf(type, datum))
+                return i;
+            i++;
+        }
+        throw new RuntimeException("Datum " + datum + " is not in union "
+                                        + union);
+    }
+
+    /**
+     * Recursively check whether "datum" is an instance of "schema" and called 
+     * by {@link #resolveUnion(Schema,Object)},
+     * {@link #unwrappedInstanceOf(Schema,Object)}. 
+     * 
+     */
+    protected boolean instanceOf(Schema schema, Object datum)
+                                    throws IOException {
+
+        try {
+            switch (schema.getType()) {
+            case RECORD:
+                if (datum instanceof Tuple) {
+                    Tuple tuple = (Tuple) datum;
+                    List<Field> fields = schema.getFields();
+                    if (fields.size() != tuple.size()) {
+                        return false;
+                    }
+                    for (int i = 0; i < fields.size(); i++) {
+                        if (!instanceOf(fields.get(i).schema(), tuple.get(i)))
+                            return false;
+                    }
+                    return true;
+                }
+                return false;
+
+            case UNION:
+                @SuppressWarnings("unused")
+                int index = resolveUnion(schema, datum);
+                return true;
+            case ENUM:
+                return datum instanceof String
+                            && schema.hasEnumSymbol(((String) datum))
+                            || unwrappedInstanceOf(schema, datum);
+            case ARRAY:
+                return datum instanceof DataBag
+                            ||  unwrappedInstanceOf(schema, datum);
+            case MAP:
+                return datum instanceof Map
+                            || unwrappedInstanceOf(schema, datum);
+            case FIXED:
+                return datum instanceof DataByteArray
+                            && ((DataByteArray) datum).size() 
+                                    == schema.getFixedSize()
+                            || unwrappedInstanceOf(schema, datum);
+            case STRING:
+                return datum instanceof String
+                            || unwrappedInstanceOf(schema, datum);
+            case BYTES:
+                return datum instanceof DataByteArray
+                            || unwrappedInstanceOf(schema, datum);
+            case INT:
+                return datum instanceof Integer
+                            || unwrappedInstanceOf(schema, datum);
+            case LONG:
+                return datum instanceof Long
+                            || datum instanceof Integer
+                            || unwrappedInstanceOf(schema, datum);
+            case FLOAT:
+                return datum instanceof Float
+                            || datum instanceof Integer
+                            || datum instanceof Long
+                            || unwrappedInstanceOf(schema, datum);
+            case DOUBLE:
+                return datum instanceof Double
+                            || datum instanceof Float
+                            || datum instanceof Integer
+                            || datum instanceof Long
+                            || unwrappedInstanceOf(schema, datum);
+            case BOOLEAN:
+                return datum instanceof Boolean
+                            || datum instanceof Integer
+                            || unwrappedInstanceOf(schema, datum);
+            case NULL:
+                return datum == null;
+            default:
+                throw new RuntimeException("Unexpected type: " + schema);
+            }
+        } catch (ExecException e) {
+            e.printStackTrace(System.err);
+            throw new RuntimeException("ExecException:" + e.getMessage());
+        }
+    }
+
+    /**
+     * Check whether "datum" is an instance of "schema" after stripping
+     * the tuple wrapper.
+     */
+    private boolean unwrappedInstanceOf(Schema schema, Object datum)
+                                    throws IOException {
+        try {
+            if (!(datum instanceof Tuple))
+                return false;
+
+            Tuple tuple = (Tuple) datum;
+            if (tuple.size() != 1)
+                return false;
+
+            switch (schema.getType()) {
+            case ENUM:
+            case ARRAY:
+            case MAP:
+            case FIXED:
+            case STRING:
+            case BYTES:
+            case INT:
+            case LONG:
+            case FLOAT:
+            case DOUBLE:
+            case BOOLEAN:
+                return instanceOf(schema, tuple.get(0));
+            default:
+                throw new IOException("Invalid type:" + schema.getType());
+            }
+        } catch (Exception e) {
+            e.printStackTrace(System.err);
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * Write double. Users can cast long, float and integer to double.
+     * 
+     */
+    protected void writeDouble(Object datum, Encoder out) throws IOException {
+        double num;
+        if (datum instanceof Integer) {
+            num = ((Integer) datum).doubleValue();
+        } else if (datum instanceof Long) {
+            num = ((Long) datum).doubleValue();
+        } else if (datum instanceof Float) {
+            num = ((Float) datum).doubleValue();
+        } else if (datum instanceof Double) {
+            num = (Double) datum;
+        } else
+            throw new IOException("Cannot convert to double:"
+                                            + datum.getClass());
+
+        out.writeDouble(num);
+
+    }
+
+    /**
+     * Write float. Users can cast long and integer into float.
+     * 
+     */
+    protected void writeFloat(Object datum, Encoder out) throws IOException {
+        float num;
+        if (datum instanceof Integer) {
+            num = ((Integer) datum).floatValue();
+        } else if (datum instanceof Long) {
+            num = ((Long) datum).floatValue();
+        } else if (datum instanceof Float) {
+            num = (Float) datum;
+        } else
+            throw new IOException("Cannot convert to float:" + datum.getClass());
+
+        out.writeFloat(num);
+
+    }
+
+    /**
+     * Write long. Users can cast integer into long.
+     * 
+     */
+    protected void writeLong(Object datum, Encoder out) throws IOException {
+        long num;
+        if (datum instanceof Integer) {
+            num = ((Integer) datum).longValue();
+        } else if (datum instanceof Long) {
+            num = (Long) datum;
+        } else
+            throw new IOException("Cannot convert to long:" + datum.getClass());
+
+        out.writeLong(num);
+    }
+
+    /**
+     * Write boolean. Users can cast an integer into boolean.
+     * 
+     */
+    protected void writeBoolean(Object datum, Encoder out) throws IOException {
+
+        if (datum instanceof Boolean) {
+            out.writeBoolean((Boolean) datum);
+        } else if (datum instanceof Integer) {
+            out.writeBoolean(((Integer) datum) != 0);
+        } else
+            throw new RuntimeException("Unsupported type boolean:"
+                                            + datum.getClass());
+
+    }
+
+    private NullPointerException npe(NullPointerException e, String s) {
+        NullPointerException result = new NullPointerException(e.getMessage()
+                                        + s);
+        result.initCause(e.getCause() == null ? e : e.getCause());
+        return result;
+    }
+
+  /**
+     * Called to write a bytes. 
+     */
+    @Override
+    protected void writeBytes(Object datum, org.apache.avro.io.Encoder out)
+                                    throws IOException {
+        if (datum instanceof DataByteArray) {
+            out.writeBytes(((DataByteArray) datum).get());
+        } else
+            throw new RuntimeException("Unsupported type bytes:"
+                                            + datum.getClass());
+    }
+
+    /**
+     * Called to write a fixed value. 
+     */
+    @Override
+    protected void writeFixed(Schema schema, Object datum,
+                                    org.apache.avro.io.Encoder out)
+                                    throws IOException {
+        if (datum instanceof DataByteArray) {
+            final byte[] bytes = ((DataByteArray) datum).get();
+            out.writeFixed(bytes, 0, bytes.length);
+        } else
+            throw new RuntimeException("Unsupported type fixed:"
+                                            + datum.getClass());
+
+    }
+
+    /**
+     * Called by the implementation of {@link #writeRecord} to retrieve
+     * a record field value.
+     */
+    @Override
+    protected Object getField(Object record, String name, int pos) {
+        if (record instanceof Tuple) {
+            try {
+                return ((Tuple) record).get(pos);
+            } catch (ExecException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+            }
+        } else
+            throw new RuntimeException("Unsupported type in record:"
+                                            + record.getClass());
+    }
+
+    /**
+     * Called by the implementation of {@link #writeArray} to get the
+     * size of an array.
+     */
+    @Override
+    protected long getArraySize(Object array) {
+        if (array instanceof DataBag) {
+            return ((DataBag) array).size();
+        } else
+            throw new RuntimeException("Unsupported type in array:"
+                                            + array.getClass());
+    }
+
+    /**
+     * Called by the implementation of {@link #writeArray} to enumerate
+     * array elements.
+     */
+    @Override
+    protected Iterator<? extends Object> getArrayElements(Object array) {
+        if (array instanceof DataBag) {
+            return ((DataBag) array).iterator();
+        } else
+            throw new RuntimeException("Unsupported type in array:"
+                                            + array.getClass());
+    }
+
+
+}

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * The InputFormat for avro data.
+ * 
+ */
+public class PigAvroInputFormat
+extends FileInputFormat<NullWritable, Writable> {
+
+    private Schema schema = null;  /* avro schema */
+
+    /**
+     * empty constructor
+     */
+    public PigAvroInputFormat() {
+    }
+
+    /**
+     * constructor called by AvroStorage to pass in schema
+     * @param s                     input data schema
+     */
+    public PigAvroInputFormat(Schema s) {
+        schema = s;
+    }
+
+    /**
+     * Ignore files not ending with ".avro"
+     */
+    @Override
+    protected List<FileStatus> listStatus(JobContext context)
+                                    throws IOException {
+        List<FileStatus> result = new ArrayList<FileStatus>();
+        for (FileStatus file : super.listStatus(context))
+            if (file.getPath().getName().endsWith(PigAvroOutputFormat.EXT))
+                result.add(file);
+        return result;
+    }
+
+    /**
+     * Create and return an avro record reader. 
+     * It uses the input schema passed in to the
+     * constructor.
+     */
+    @Override
+    public RecordReader<NullWritable, Writable>
+    createRecordReader(InputSplit split, TaskAttemptContext context)
+    throws IOException,  InterruptedException {
+        context.setStatus(split.toString());
+        return new PigAvroRecordReader(context, (FileSplit) split, schema);
+    }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * The OutputFormat for avro data.
+ * 
+ */
+public class PigAvroOutputFormat extends FileOutputFormat<NullWritable, Object> {
+
+    /** The file name extension for avro data files. */
+    public final static String EXT = ".avro";
+
+    /** The configuration key for Avro deflate level. */
+    public static final String DEFLATE_LEVEL_KEY = "avro.mapred.deflate.level";
+
+    /** The default deflate level. */
+    public static final int DEFAULT_DEFLATE_LEVEL = 1;
+
+    /* avro schema of output data */
+    private Schema schema = null;
+
+    /**
+     * default constructor
+     */
+    public PigAvroOutputFormat() {
+    }
+
+    /**
+     * construct with specified output schema 
+     * @param s             output schema
+     */
+    public PigAvroOutputFormat(Schema s) {
+        schema = s;
+    }
+
+    /** 
+     * Enable output compression using the deflate codec and 
+     * specify its level. 
+     */
+    public static void setDeflateLevel(Job job, int level) {
+        FileOutputFormat.setCompressOutput(job, true);
+        job.getConfiguration().setInt(DEFLATE_LEVEL_KEY, level);
+
+    }
+
+    @Override
+    public RecordWriter<NullWritable, Object> getRecordWriter(
+                                    TaskAttemptContext context)
+                                    throws IOException, InterruptedException {
+
+        if (schema == null)
+            throw new IOException("expect a not-null schema.");
+        
+        Configuration conf = context.getConfiguration();
+
+        DataFileWriter<Object> writer = new DataFileWriter<Object>(
+                                        new PigAvroDatumWriter(schema));
+
+        if (FileOutputFormat.getCompressOutput(context)) {
+            int level = context.getConfiguration().getInt(DEFLATE_LEVEL_KEY,
+                                        DEFAULT_DEFLATE_LEVEL);
+            writer.setCodec(CodecFactory.deflateCodec(level));
+        }
+
+        Path path = getDefaultWorkFile(context, EXT);
+        writer.create(schema, path.getFileSystem(conf).create(path));
+        return new PigAvroRecordWriter(writer);
+    }
+
+}