You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/01/31 22:18:25 UTC

svn commit: r1065790 [2/2] - in /pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/ contrib/piggybank/java/src/test/java/org/apache/pig/p...

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * This is an implementation of record reader which reads in avro data and 
+ * convert them into <NullWritable, Writable> pairs.
+ * 
+ */
+public class PigAvroRecordReader extends RecordReader<NullWritable, Writable> {
+
+    private ASFsInput in;       
+    private DataFileReader<Object> reader;   /*reader of input avro data*/
+    private long start;
+    private long end;
+    private Schema schema = null; /* schema of input avro data*/
+
+    /**
+     * constructor to initialize input and avro data reader
+     */
+    public PigAvroRecordReader(TaskAttemptContext context, FileSplit split,
+                                    Schema s) throws IOException {
+        this.in = new ASFsInput(split.getPath(), context);
+        if (s != null) {
+            schema = s;
+        } else {
+            throw new IOException("Need to provide input avro schema");
+        }
+        
+        this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(
+                                        schema));
+        reader.sync(split.getStart()); // sync to start
+        this.start = in.tell();
+        this.end = split.getStart() + split.getLength();
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+        if (end == start) {
+            return 0.0f;
+        } else {
+            return Math.min(1.0f, (getPos() - start) / (float) (end - start));
+        }
+    }
+
+    public long getPos() throws IOException {
+        return in.tell();
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException,
+                                    InterruptedException {
+        return NullWritable.get();
+    }
+
+    @Override
+    public Writable getCurrentValue() throws IOException, InterruptedException {
+        Object obj = reader.next();
+        if (obj instanceof Tuple) {
+            ASLog.details("Class =" + obj.getClass());
+            return (Tuple) obj;
+        } else {
+            ASLog.details("Wrap calss " + obj.getClass() + " as a tuple.");
+            return wrapAsTuple(obj);
+        }
+    }
+
+    /**
+     * Wrap non-tuple value as a tuple
+     */
+    protected Tuple wrapAsTuple(Object in) throws IOException {
+        Tuple tuple = TupleFactory.getInstance().newTuple();
+        tuple.append(in);
+        return tuple;
+    }
+
+    @Override
+    public void initialize(InputSplit arg0, TaskAttemptContext arg1)
+                                    throws IOException, InterruptedException {
+        
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+        if (!reader.hasNext() || reader.pastSync(end))
+            return false;
+
+        return true;
+    }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordWriter.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordWriter.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordWriter.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordWriter.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * The RecordWriter used to output pig results as avro data
+ */
+public class PigAvroRecordWriter extends RecordWriter<NullWritable, Object> {
+
+    private DataFileWriter<Object> writer;
+
+    /**
+     * construct with avro writer
+     * @param writer                avro data writer
+     */
+    public PigAvroRecordWriter(DataFileWriter<Object> writer) {
+        this.writer = writer;
+
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException,
+                                    InterruptedException {
+        writer.close();
+    }
+
+    @Override
+    public void write(NullWritable key, Object value) throws IOException,
+                                    InterruptedException {
+        writer.append(value);
+    }
+}

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+
+/**
+ * This class contains functions to convert Pig schema to Avro. It consists of
+ * two sets of methods:
+ * 
+ * 1. Convert a Pig schema to Avro schema;
+ * 2. Validate whether a Pig schema is compatible with a given Avro schema.
+ * Notice that the Avro schema doesn't need to cover all fields in Pig schema,
+ * and the missing fields are converted using methods in set 1.
+ * 
+ */
+public class PigSchema2Avro {
+
+    public static final String TUPLE_NAME = "TUPLE";
+    public static final String FIELD_NAME = "FIELD";
+    public static int tupleIndex = 0;
+
+    // //////////////////////////////////////////////////////////
+    // Methods in Set 1: Convert Pig schema to Avro schema
+    // //////////////////////////////////////////////////////////
+
+    /**
+     * Convert a pig ResourceSchema to avro schema
+     * 
+     */
+    public static Schema convert(ResourceSchema pigSchema, boolean nullable)
+                                    throws IOException {
+        ResourceFieldSchema[] pigFields = pigSchema.getFields();
+
+        /* remove the pig tuple wrapper */
+        if (pigFields.length == 1 && ASCommons.isTupleWrapper(pigFields[0])) {
+
+            ASLog.details("Ignore the pig tuple wrapper.");
+            ResourceFieldSchema[] listSchemas = pigFields[0].getSchema()
+                                            .getFields();
+            if (listSchemas.length != 1)
+                throw new IOException("Expect one subfield from " + pigFields);
+
+            return convert(listSchemas[0], nullable);
+        } else
+            return convertRecord(pigFields, nullable);
+    }
+
+    /**
+     * Convert a Pig ResourceFieldSchema to avro schema
+     * 
+     */
+    protected static Schema convert(ResourceFieldSchema pigSchema,
+                                    boolean nullable) throws IOException {
+
+        ASLog.details("Convert pig field schema:" + pigSchema);
+
+        final byte pigType = pigSchema.getType();
+
+        if (pigType == DataType.TUPLE) {
+            ASLog.details("Convert a pig field tuple: " + pigSchema);
+
+            ResourceFieldSchema[] listSchemas = pigSchema.getSchema()
+                                            .getFields();
+            Schema outSchema = null;
+
+            if (ASCommons.isTupleWrapper(pigSchema)) {
+                /* remove Pig tuple wrapper */
+                ASLog.details("Ignore the pig tuple wrapper.");
+                if (listSchemas.length != 1)
+                    throw new IOException("Expect one subfield from "
+                                                    + pigSchema);
+                outSchema = convert(listSchemas[0], nullable);
+            } else {
+                outSchema = convertRecord(listSchemas, nullable);
+            }
+
+            return ASCommons.wrapAsUnion(outSchema, nullable);
+
+        } else if (pigType == DataType.BAG) {
+
+            ASLog.details("Convert a pig field bag:" + pigSchema);
+
+            /* Bag elements have to be Tuples */
+            ResourceFieldSchema[] fs = pigSchema.getSchema().getFields();
+            if (fs == null || fs.length != 1
+                                 || fs[0].getType() != DataType.TUPLE)
+                throw new IOException("Expect one tuple field in a bag");
+
+            Schema outSchema = Schema.createArray(convert(fs[0], nullable));
+            return ASCommons.wrapAsUnion(outSchema, nullable);
+
+        } else if (pigType == DataType.MAP) {
+            /* Pig doesn't provide schema info of Map value */
+            throw new IOException("Please provide schema for Map field!");
+        
+        } else if (pigType == DataType.UNKNOWN) {
+            /* Results of Pig UNION operation is of UNKNOWN type */
+            throw new IOException("Must specify a schema for UNKNOWN pig type.");
+        
+        } else if (pigType == DataType.CHARARRAY
+                                        || pigType == DataType.BIGCHARARRAY
+                                        || pigType == DataType.BOOLEAN
+                                        || pigType == DataType.BYTE
+                                        || pigType == DataType.BYTEARRAY
+                                        || pigType == DataType.DOUBLE
+                                        || pigType == DataType.FLOAT
+                                        || pigType == DataType.INTEGER
+                                        || pigType == DataType.LONG) {
+
+            ASLog.details("Convert a pig field primitive:" + pigSchema);
+            Schema outSchema = convertPrimitiveType(pigType);
+            return ASCommons.wrapAsUnion(outSchema, nullable);
+
+        } else
+            throw new IOException("unsupported pig type:"
+                                            + DataType.findTypeName(pigType));
+    }
+
+    /**
+     * Convert pig data to Avro record
+     * 
+     */
+    protected static Schema convertRecord(ResourceFieldSchema[] pigFields,
+                                    boolean nullable) throws IOException {
+
+        ASLog.funcCall("convertRecord");
+
+        // Type name is required for Avro record
+        String typeName = getRecordName();
+        Schema outSchema = Schema.createRecord(typeName, null, null, false);
+
+        List<Schema.Field> outFields = new ArrayList<Schema.Field>();
+        for (int i = 0; i < pigFields.length; i++) {
+
+            /* get schema */
+            Schema fieldSchema = convert(pigFields[i], nullable);
+
+            /* get field name of output */
+            String outname = pigFields[i].getName();
+            if (outname == null)
+                outname = FIELD_NAME + "_" + i; // field name cannot be null
+
+            /* get doc of output */
+            String desc = pigFields[i].getDescription();
+
+            outFields.add(new Field(outname, fieldSchema, desc, null));
+        }
+
+        outSchema.setFields(outFields);
+        return outSchema;
+
+    }
+
+    private static String getRecordName() {
+        String name = TUPLE_NAME + "_" + tupleIndex;
+        tupleIndex++;
+        return name;
+    }
+
+    /**
+     * Convert Pig primitive type to Avro type
+     * 
+     */
+    protected static Schema convertPrimitiveType(byte pigType)
+                                    throws IOException {
+
+        if (pigType == DataType.BOOLEAN) {
+            return ASCommons.BooleanSchema;
+        } else if (pigType == DataType.BYTEARRAY) {
+            return ASCommons.BytesSchema;
+        } else if (pigType == DataType.CHARARRAY
+                                        || pigType == DataType.BIGCHARARRAY) {
+            return ASCommons.StringSchema;
+        } else if (pigType == DataType.DOUBLE) {
+            return ASCommons.DoubleSchema;
+        } else if (pigType == DataType.FLOAT) {
+            return ASCommons.FloatSchema;
+        } else if (pigType == DataType.INTEGER) {
+            return ASCommons.IntSchema;
+        } else if (pigType == DataType.LONG) {
+            return ASCommons.LongSchema;
+        } else
+            throw new IOException("unsupported pig type:"
+                                            + DataType.findTypeName(pigType));
+
+    }
+
+    
+    // //////////////////////////////////////////////////////////
+    // Methods in Set 2: Validate whether a Pig schema is compatible 
+    //         with a given Avro schema.
+    // //////////////////////////////////////////////////////////
+
+    /**
+     * Validate whether pigSchema is compatible with avroSchema
+     */
+    public static Schema validateAndConvert(Schema avroSchema,
+                                    ResourceSchema pigSchema)
+                                    throws IOException {
+        ResourceFieldSchema[] pigFields = pigSchema.getFields();
+        return validateAndConvertRecord(avroSchema, pigFields);
+    }
+
+    /**
+     * Validate whether pigSchema is compatible with avroSchema and convert
+     * those Pig fields with to corresponding Avro schemas.
+     */
+    protected static Schema validateAndConvert(Schema avroSchema,
+                                    ResourceFieldSchema pigSchema)
+                                    throws IOException {
+
+        ASLog.details("Validate pig field schema:" + pigSchema);
+
+        /* compatibility check based on data types */
+        if (!isCompatible(avroSchema, pigSchema))
+            throw new IOException("Schemas are not compatible.\n Avro="
+                                            + avroSchema + "\n" + "Pig="
+                                            + pigSchema);
+
+        final byte pigType = pigSchema.getType();
+        if (avroSchema.getType().equals(Schema.Type.UNION)) {
+
+            ASLog.details("Validate Pig schema with Avro union:"
+                                            + avroSchema);
+
+            List<Schema> unionSchemas = avroSchema.getTypes();
+            for (Schema schema : unionSchemas) {
+                try {
+                    @SuppressWarnings("unused")
+                    Schema s = validateAndConvert(schema, pigSchema);
+                    return avroSchema;
+                } catch (IOException e) {
+                    // ignore the unmatched one
+                }
+            }
+            throw new IOException("pig schema " + pigSchema
+                                            + " is not compatible with avro "
+                                            + avroSchema);
+        } else if (pigType == DataType.TUPLE) {
+
+            ASLog.details("Validate a pig tuple: " + pigSchema);
+            ResourceFieldSchema[] pigFields = pigSchema.getSchema().getFields();
+            Schema outSchema = validateAndConvertRecord(avroSchema, pigFields);
+            return outSchema;
+
+        } else if (pigType == DataType.BAG) {
+
+            ASLog.details("Validate a pig bag:" + pigSchema);
+
+            /* get fields of containing tuples */
+            ResourceFieldSchema[] fs = pigSchema.getSchema().getFields();
+            if (fs == null || fs.length != 1
+                                || fs[0].getType() != DataType.TUPLE)
+                throw new IOException("Expect one tuple field in a bag");
+
+            Schema inElemSchema = avroSchema.getElementType();
+            Schema outSchema = Schema.createArray(validateAndConvert(
+                                            inElemSchema, fs[0]));
+            return outSchema;
+        } else if (pigType == DataType.MAP) {
+
+            ASLog.details("Cannot validate a pig map. Will use user defined Avro schema.");
+            return avroSchema;
+
+        } else if (pigType == DataType.UNKNOWN || pigType == DataType.CHARARRAY
+                                        || pigType == DataType.BIGCHARARRAY
+                                        || pigType == DataType.BOOLEAN
+                                        || pigType == DataType.BYTE
+                                        || pigType == DataType.BYTEARRAY
+                                        || pigType == DataType.DOUBLE
+                                        || pigType == DataType.FLOAT
+                                        || pigType == DataType.INTEGER
+                                        || pigType == DataType.LONG) {
+
+            ASLog.details("Validate a pig primitive type:" + pigSchema);
+            return avroSchema;
+
+        } else
+            throw new IOException("Unsupported pig type:"
+                                            + DataType.findTypeName(pigType));
+    }
+
+    /**
+     * Validate a Pig tuple is compatible with Avro record. If the Avro schema 
+     * is not complete (with uncovered fields), then convert those fields using 
+     * methods in set 1. 
+     * 
+     * Notice that users can get rid of Pig tuple wrappers, e.g. an Avro schema
+     * "int" is compatible with a Pig schema "T:(int)"
+     * 
+     */
+    protected static Schema validateAndConvertRecord(Schema avroSchema,
+                                    ResourceFieldSchema[] pigFields)
+                                    throws IOException {
+
+        /* Get rid of Pig tuple wrappers. */
+        if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
+            if (pigFields.length != 1)
+                throw new IOException(
+                                                "Expect only one field in Pig tuple schema. Avro schema is "
+                                                                                + avroSchema.getType());
+
+            return validateAndConvert(avroSchema, pigFields[0]);
+        }
+
+        /* validate and convert a pig tuple with avro record */
+        boolean isPartialSchema = ASCommons.isUDPartialRecordSchema(avroSchema);
+        ASLog.details("isPartialSchema=" + isPartialSchema);
+
+        String typeName = isPartialSchema ? getRecordName() : avroSchema
+                                        .getName();
+        Schema outSchema = Schema.createRecord(typeName, avroSchema.getDoc(),
+                                                avroSchema.getNamespace(), false);
+
+        List<Schema.Field> inFields = avroSchema.getFields();
+        if (!isPartialSchema && inFields.size() != pigFields.length) {
+            throw new IOException("Expect " + inFields.size()
+                                            + " fields in pig schema."
+                                            + " But there are "
+                                            + pigFields.length);
+        }
+
+        List<Schema.Field> outFields = new ArrayList<Schema.Field>();
+
+        for (int i = 0; i < pigFields.length; i++) {
+            /* get user defined avro field schema */
+            Field inputField = isPartialSchema 
+                                        ? ASCommons.getUDField(avroSchema, i) 
+                                        : inFields.get(i);
+
+            /* get schema */
+            Schema fieldSchema = null;
+            if (inputField == null) { 
+                /* convert pig schema (nullable) */
+                fieldSchema = convert(pigFields[i], true);
+            } else if (inputField.schema() == null) { 
+                /* convert pig schema (not-null) */
+                fieldSchema = convert(pigFields[i], false);
+            } else { 
+                /* validate pigFields[i] with given avro schema */
+                fieldSchema = validateAndConvert(inputField.schema(),
+                                                pigFields[i]);
+            }
+
+            /* get field name of output */
+            String outname = (isPartialSchema) ? pigFields[i].getName()
+                                            : inputField.name();
+            if (outname == null)
+                outname = FIELD_NAME + "_" + i; // field name cannot be null
+
+            /* get doc of output */
+            String doc = (isPartialSchema) ? pigFields[i].getDescription()
+                                            : inputField.doc();
+
+            outFields.add(new Field(outname, fieldSchema, doc, null));
+        }
+
+        outSchema.setFields(outFields);
+        return outSchema;
+
+    }
+
+    /**
+     * Check whether Avro type is compatible with Pig type
+     * 
+     */
+    protected static boolean isCompatible(Schema avroSchema,
+                                    ResourceFieldSchema pigSchema)
+                                    throws IOException {
+
+        Schema.Type avroType = avroSchema.getType();
+        byte pigType = pigSchema.getType();
+
+        if (avroType.equals(Schema.Type.UNION)) {
+            return true;
+        } else if (pigType == DataType.TUPLE) {
+            /* Tuple is compatible with any type; for users may want to
+               get rid of the tuple wrapper */
+            return true;
+        }
+        return  (avroType.equals(Schema.Type.ARRAY) && pigType == DataType.BAG)
+                      || (avroType.equals(Schema.Type.MAP) && pigType == DataType.MAP)
+                      || (avroType.equals(Schema.Type.STRING) 
+                                                      && pigType == DataType.CHARARRAY 
+                                                      || pigType == DataType.BIGCHARARRAY)
+                      || (avroType.equals(Schema.Type.ENUM) 
+                                                      && pigType == DataType.CHARARRAY)
+                      || (avroType.equals(Schema.Type.BOOLEAN) 
+                                                      && pigType == DataType.BOOLEAN 
+                                                      || pigType == DataType.INTEGER)
+                      || (avroType.equals(Schema.Type.BYTES) 
+                                                      && pigType == DataType.BYTEARRAY)
+                      || (avroType.equals(Schema.Type.DOUBLE) 
+                                                      && pigType == DataType.DOUBLE
+                                                      || pigType == DataType.FLOAT
+                                                      || pigType == DataType.INTEGER 
+                                                      || pigType == DataType.LONG)
+                      || (avroType.equals(Schema.Type.FLOAT)
+                                                      && pigType == DataType.FLOAT
+                                                      || pigType == DataType.INTEGER 
+                                                      || pigType == DataType.LONG)
+                      || (avroType.equals(Schema.Type.FIXED) 
+                                                      && pigType == DataType.BYTEARRAY)
+                      || (avroType.equals(Schema.Type.INT) 
+                                                      && pigType == DataType.INTEGER)
+                      || (avroType.equals(Schema.Type.LONG)
+                                                      && pigType == DataType.LONG 
+                                                      || pigType == DataType.INTEGER);
+
+    }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorage.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorage.java (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorage.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import junit.framework.Assert;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.piggybank.storage.avro.ASCommons;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestAvroStorage {
+    
+    protected static final Log LOG = LogFactory.getLog(TestAvroStorage.class);
+    private static PigServer pigServerLocal = null;
+    private static String basedir ;
+    private static String outbasedir  ;
+    
+    public static final PathFilter hiddenPathFilter = new PathFilter() {
+        public boolean accept(Path p) {
+          String name = p.getName();
+          return !name.startsWith("_") && !name.startsWith(".");
+        }
+      };
+      
+    @BeforeClass
+    public static void setup() throws ExecException {
+        if (pigServerLocal == null)  
+            pigServerLocal = new PigServer(ExecType.LOCAL);
+        
+       basedir = "src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/";
+       outbasedir  = "/tmp/TestAvroStorage/";
+        
+        deleteDirectory(new File (outbasedir));
+    }
+
+    @Test
+    public void testRecursiveRecord() throws IOException {
+        final String str1 = 
+        		"{ \"type\" : \"record\", " +
+        		  "\"name\": \"Node\" , " +
+        		  "\"fields\": [ { \"name\": \"value\", \"type\":\"int\"}, " +
+        		                     "{ \"name\": \"next\", \"type\": [\"null\", \"Node\"] } ] }";
+        Schema s = Schema.parse(str1);
+        Assert.assertTrue(ASCommons.containRecursiveRecord(s));
+        
+        final String str2 = "{\"type\": \"array\", \"items\": "  + str1 + "}";
+        s = Schema.parse(str2);
+        Assert.assertTrue(ASCommons.containRecursiveRecord(s));
+        
+        final String str3 ="[\"null\", " + str2 + "]"; 
+        s = Schema.parse(str3);
+        Assert.assertTrue(ASCommons.containRecursiveRecord(s));
+        
+        final String str4 = 
+            "{ \"type\" : \"record\", " +
+              "\"name\": \"Node\" , " +
+              "\"fields\": [ { \"name\": \"value\", \"type\":\"int\"}, " +
+                                 "{ \"name\": \"next\", \"type\": [\"null\", \"string\"] } ] }";
+        s = Schema.parse(str4);
+        Assert.assertFalse(ASCommons.containRecursiveRecord(s));
+    }
+    
+     @Test
+     public void testGenericUnion() throws IOException {
+      
+        final String str1 = "[ \"string\", \"int\", \"boolean\"  ]";
+        Schema s = Schema.parse(str1);
+        Assert.assertTrue(ASCommons.containGenericUnion(s));
+        
+        final String str2 = "[ \"string\", \"int\", \"null\"  ]";
+        s = Schema.parse(str2);
+        Assert.assertTrue(ASCommons.containGenericUnion(s));
+        
+        final String str3 = "[ \"string\", \"null\"  ]";
+        s = Schema.parse(str3);
+        Assert.assertFalse(ASCommons.containGenericUnion(s));
+        Schema realSchema = ASCommons.getAcceptedType(s);
+        Assert.assertEquals(ASCommons.StringSchema, realSchema);
+        
+        final String str4 =  "{\"type\": \"array\", \"items\": "  + str2 + "}";
+        s = Schema.parse(str4);
+        Assert.assertTrue(ASCommons.containGenericUnion(s));
+        try {
+            realSchema = ASCommons.getAcceptedType(s);
+            Assert.assertFalse("Should throw a runtime exception when trying to " +
+                                            "get accepted type from a unacceptable union", false);
+        } catch (Exception e) {
+           
+        }
+        
+        final String str5 = 
+            "{ \"type\" : \"record\", " +
+              "\"name\": \"Node\" , " +
+              "\"fields\": [ { \"name\": \"value\", \"type\":\"int\"}, " +
+                                 "{ \"name\": \"next\", \"type\": [\"null\", \"int\"] } ] }";
+        s = Schema.parse(str5);
+        Assert.assertFalse(ASCommons.containGenericUnion(s));
+        
+        final String str6 = 
+            "{ \"type\" : \"record\", " +
+              "\"name\": \"Node\" , " +
+              "\"fields\": [ { \"name\": \"value\", \"type\":\"int\"}, " +
+                                 "{ \"name\": \"next\", \"type\": [\"string\", \"int\"] } ] }";
+        s = Schema.parse(str6);
+        Assert.assertTrue(ASCommons.containGenericUnion(s));
+        
+        final String str7 = "[ \"string\"  ]"; /*union with one type*/
+        s = Schema.parse(str7);
+        Assert.assertFalse(ASCommons.containGenericUnion(s));
+        realSchema = ASCommons.getAcceptedType(s);
+        Assert.assertEquals(ASCommons.StringSchema, realSchema);
+        
+        final String str8 = "[  ]"; /*union with no type*/
+        s = Schema.parse(str8);
+        Assert.assertFalse(ASCommons.containGenericUnion(s));
+        realSchema = ASCommons.getAcceptedType(s);
+        Assert.assertNull(realSchema);
+    }
+    
+    @Test
+    public void testArrayDefault() throws IOException {
+        String input = basedir + "test_array.avro"; 
+        String output= outbasedir + "testArrayDefault";
+        String expected = basedir + "expected_testArrayDefault.avro";
+        
+        deleteDirectory(new File(output));
+        
+        String [] queries = {
+       " in = LOAD '" + input + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+       " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();" 
+        };
+        testAvroStorage( queries);
+        verifyResults(output, expected);
+    }
+    
+    @Test
+    public void testArrayWithSchema() throws IOException {
+        String input = basedir + "test_array.avro"; 
+        String output= outbasedir + "testArrayWithSchema";
+        String expected = basedir + "expected_testArrayWithSchema.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+       " in = LOAD '" + input + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+       " STORE in INTO '" + output + 
+           "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( "  +
+           "   'schema', '{\"type\":\"array\",\"items\":\"float\"}'  );" 
+        };
+        testAvroStorage( queries);
+        verifyResults(output, expected);
+    }
+    
+    @Test
+    public void testArrayWithNotNull() throws IOException {
+        String input = basedir + "test_array.avro"; 
+        String output= outbasedir + "testArrayWithNotNull";
+        String expected = basedir + "expected_testArrayWithSchema.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+       " in = LOAD '" + input + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+       " STORE in INTO '" + output + 
+           "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( "  +
+           "   '{\"nullable\": false }'  );" 
+        };
+        testAvroStorage( queries);
+        verifyResults(output, expected);
+    }
+    
+    @Test
+    public void testArrayWithSame() throws IOException {
+
+        String input = basedir + "test_array.avro"; 
+        String output= outbasedir + "testArrayWithSame";
+        String expected = basedir + "expected_testArrayWithSchema.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+       " in = LOAD '" + input + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+       " STORE in INTO '" + output + 
+           "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( "  +
+           "   'same', '" + input + "'  );" 
+        };
+        testAvroStorage( queries);
+        verifyResults(output, expected);
+    }
+    
+    @Test
+    public void testRecordWithSplit() throws IOException {
+
+        String input = basedir + "test_record.avro"; 
+        String output1= outbasedir + "testRecordSplit1";
+        String output2= outbasedir + "testRecordSplit2";
+        String expected1 = basedir + "expected_testRecordSplit1.avro";
+        String expected2 = basedir + "expected_testRecordSplit2.avro";
+        deleteDirectory(new File(output1));
+        deleteDirectory(new File(output2));
+        String [] queries = {
+       " avro = LOAD '" + input + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+       " groups = GROUP avro BY member_id;",
+       " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;",
+       " STORE sc INTO '" + output1 + "' " +
+             " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+             "'{\"index\": 1, " +
+             "  \"schema\": {\"type\":\"record\", " +
+                                    " \"name\":\"result\", " +
+                                   "  \"fields\":[ {\"name\":\"member_id\",\"type\":\"int\"}, " +
+                                                         "{\"name\":\"count\", \"type\":\"long\"} " +
+                                                      "]" +
+                                     "}" +
+            " }');", 
+        " STORE sc INTO '" + output2 + 
+                " 'USING org.apache.pig.piggybank.storage.avro.AvroStorage ('index', '2');"
+        };
+        testAvroStorage( queries);
+        verifyResults(output1, expected1);
+        verifyResults(output2, expected2);
+    }
+    
+    @Test
+    public void testRecordWithFieldSchema() throws IOException {
+
+        String input = basedir + "test_record.avro"; 
+        String output= outbasedir + "testRecordWithFieldSchema";
+        String expected = basedir + "expected_testRecordWithFieldSchema.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+       " avro = LOAD '" + input + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+       " avro1 = FILTER avro BY member_id > 1211;",
+       " avro2 = FOREACH avro1 GENERATE member_id, browser_id, tracking_time, act_content ;",
+       " STORE avro2 INTO '" + output + "' " +
+             " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+             "'{\"data\":  \"" + input + "\" ," +
+             "  \"field0\": \"int\", " +
+              " \"field1\":  \"def:browser_id\", " +
+             "  \"field3\": \"def:act_content\" " +
+            " }');"
+        };
+        testAvroStorage( queries);
+        verifyResults(output, expected);
+    }
+    
+    private static void deleteDirectory (File path) {
+        if ( path.exists()) {
+            File [] files = path.listFiles();
+            for (File file: files) {
+                if (file.isDirectory()) 
+                    deleteDirectory(file);
+                file.delete();
+            }
+        }
+    }
+    
+    private void testAvroStorage(String ...queries)
+    throws IOException {
+        
+         PigServer pigServer = pigServerLocal; 
+        pigServer.setBatchOn();
+       for (String query: queries){
+            if (query != null && query.length() > 0)
+              pigServer.registerQuery(query);
+        }
+        
+        pigServer.executeBatch();
+    }
+    
+    private void verifyResults(String outPath,
+                                    String expectedOutpath) 
+    throws IOException {
+        FileSystem fs = FileSystem.getLocal(new Configuration()) ; 
+        
+        /* read in expected results*/
+        Set<Object> expected = getExpected (expectedOutpath);
+        
+        /* read in output results and compare */
+        Path output = new Path(outPath);
+        Assert.assertTrue("Output dir does not exists!", fs.exists(output)
+            && fs.getFileStatus(output).isDir());
+        
+        Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
+        Assert.assertTrue("Split field dirs not found!", paths != null);
+
+        for (Path path : paths) {
+          //String splitField = path.getName();
+          Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
+          Assert.assertTrue("No files found for path: " + path.toUri().getPath(),
+              files != null);
+          for (Path filePath : files) {
+            Assert.assertTrue("This shouldn't be a directory", fs.isFile(filePath));
+            
+            GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
+            
+            DataFileStream<Object> in = new DataFileStream<Object>(
+                                            fs.open(filePath), reader);
+            
+            int count = 0;
+            while (in.hasNext()) {
+                Object obj = in.next();
+                
+              Assert.assertTrue( expected.contains(obj));
+              count++;
+            }        
+            in.close();
+            Assert.assertEquals(count, expected.size());
+          }
+        }
+      }
+    
+    private Set<Object> getExpected (String pathstr ) throws IOException {
+        
+        Set<Object> ret = new HashSet<Object>();
+        FileSystem fs = FileSystem.getLocal(new Configuration())  ; 
+                                    
+        /* read in output results and compare */
+        Path output = new Path(pathstr);
+        Assert.assertTrue("Expected output does not exists!", fs.exists(output));
+    
+        Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
+        Assert.assertTrue("Split field dirs not found!", paths != null);
+
+        for (Path path : paths) {
+            //String splitField = path.getName();
+            Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
+            Assert.assertTrue("No files found for path: " + path.toUri().getPath(),
+                                            files != null);
+            for (Path filePath : files) {
+                Assert.assertTrue("This shouldn't be a directory", fs.isFile(filePath));
+        
+                GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
+        
+                DataFileStream<Object> in = new DataFileStream<Object>(
+                                        fs.open(filePath), reader);
+        
+                while (in.hasNext()) {
+                    Object obj = in.next();
+                    ret.add(obj);
+                }        
+                in.close();
+            }
+        }
+        return ret;
+  }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testArrayDefault.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testArrayDefault.avro?rev=1065790&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testArrayDefault.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testArrayWithSchema.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testArrayWithSchema.avro?rev=1065790&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testArrayWithSchema.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordSplit1.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordSplit1.avro?rev=1065790&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordSplit1.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordSplit2.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordSplit2.avro?rev=1065790&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordSplit2.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordWithFieldSchema.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordWithFieldSchema.avro?rev=1065790&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordWithFieldSchema.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/test_array.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/test_array.avro?rev=1065790&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/test_array.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/test_record.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/test_record.avro?rev=1065790&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/test_record.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1065790&r1=1065789&r2=1065790&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Mon Jan 31 21:18:24 2011
@@ -65,6 +65,11 @@
     <dependency org="commons-cli" name="commons-cli" rev="${commons-cli.version}"
       conf="checkstyle->master"/>
     
+    <dependency org="org.apache.avro" name="avro" rev="${avro.version}"
+      conf="compile->master;checkstyle->master"/>
+    <dependency org="com.googlecode.json-simple" name="json-simple" rev="${json-simple.version}"
+      conf="compile->master;checkstyle->master"/>
+
     <dependency org="jdiff" name="jdiff" rev="${jdiff.version}"
       conf="jdiff->default"/>
     <dependency org="xerces" name="xerces" rev="${xerces.version}"

Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1065790&r1=1065789&r2=1065790&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Mon Jan 31 21:18:24 2011
@@ -30,7 +30,7 @@ hadoop-core.version=0.20.2
 hadoop-test.version=0.20.2
 hbase.version=0.20.6
 hsqldb.version=1.8.0.10
-jackson.version=1.0.1
+jackson.version=1.6.0
 javacc.version=4.2
 jdiff.version=1.0.9
 jetty-util.version=6.1.14
@@ -45,3 +45,6 @@ slf4j-api.version=1.4.3
 slf4j-log4j12.version=1.4.3
 xerces.version=1.4.4
 wagon-http.version=1.0-beta-2
+
+avro.version=1.4.0
+json-simple.version=1.1