You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/02/01 02:40:59 UTC

svn commit: r1065886 [2/2] - in /pig/trunk: ./ .eclipse.templates/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/ contrib/piggybank/java/src/t...

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java Tue Feb  1 01:40:58 2011
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * This is an implementation of record reader which reads in avro data and 
+ * convert them into <NullWritable, Writable> pairs.
+ * 
+ */
+public class PigAvroRecordReader extends RecordReader<NullWritable, Writable> {
+
+    private AvroStorageInputStream in;
+    private DataFileReader<Object> reader;   /*reader of input avro data*/
+    private long start;
+    private long end;
+
+    /**
+     * constructor to initialize input and avro data reader
+     */
+    public PigAvroRecordReader(TaskAttemptContext context, FileSplit split,
+                                    Schema schema) throws IOException {
+        this.in = new AvroStorageInputStream(split.getPath(), context);
+        if(schema == null)
+            throw new IOException("Need to provide input avro schema");
+
+        this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(schema));
+        this.reader.sync(split.getStart()); // sync to start
+        this.start = in.tell();
+        this.end = split.getStart() + split.getLength();
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+        return end == start ? 0.0f : Math.min(1.0f, (getPos() - start) / (float) (end - start));
+    }
+
+    public long getPos() throws IOException {
+        return in.tell();
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException, InterruptedException {
+        return NullWritable.get();
+    }
+
+    @Override
+    public Writable getCurrentValue() throws IOException, InterruptedException {
+        Object obj = reader.next();
+        if (obj instanceof Tuple) {
+            AvroStorageLog.details("Class =" + obj.getClass());
+            return (Tuple) obj;
+        } else {
+            AvroStorageLog.details("Wrap calss " + obj.getClass() + " as a tuple.");
+            return wrapAsTuple(obj);
+        }
+    }
+
+    /**
+     * Wrap non-tuple value as a tuple
+     */
+    protected Tuple wrapAsTuple(Object in) {
+        Tuple tuple = TupleFactory.getInstance().newTuple();
+        tuple.append(in);
+        return tuple;
+    }
+
+    @Override
+    public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
+        // Nothing to do
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+        if (!reader.hasNext() || reader.pastSync(end))
+            return false;
+
+        return true;
+    }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordWriter.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordWriter.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordWriter.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordWriter.java Tue Feb  1 01:40:58 2011
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * The RecordWriter used to output pig results as avro data
+ */
+public class PigAvroRecordWriter extends RecordWriter<NullWritable, Object> {
+
+    private DataFileWriter<Object> writer;
+
+    /**
+     * construct with avro writer
+     * @param writer                avro data writer
+     */
+    public PigAvroRecordWriter(DataFileWriter<Object> writer) {
+        this.writer = writer;
+
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+        writer.close();
+    }
+
+    @Override
+    public void write(NullWritable key, Object value) throws IOException, InterruptedException {
+        writer.append(value);
+    }
+}

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java Tue Feb  1 01:40:58 2011
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+
+/**
+ * This class contains functions to convert Pig schema to Avro. It consists of
+ * two sets of methods:
+ * 
+ * 1. Convert a Pig schema to Avro schema;
+ * 2. Validate whether a Pig schema is compatible with a given Avro schema.
+ * Notice that the Avro schema doesn't need to cover all fields in Pig schema,
+ * and the missing fields are converted using methods in set 1.
+ * 
+ */
+public class PigSchema2Avro {
+
+    public static final String TUPLE_NAME = "TUPLE";
+    public static final String FIELD_NAME = "FIELD";
+    public static int tupleIndex = 0;
+
+    // //////////////////////////////////////////////////////////
+    // Methods in Set 1: Convert Pig schema to Avro schema
+    // //////////////////////////////////////////////////////////
+
+    /**
+     * Convert a pig ResourceSchema to avro schema
+     * 
+     */
+    public static Schema convert(ResourceSchema pigSchema, boolean nullable) throws IOException {
+        ResourceFieldSchema[] pigFields = pigSchema.getFields();
+
+        /* remove the pig tuple wrapper */
+        if (pigFields.length == 1 && AvroStorageUtils.isTupleWrapper(pigFields[0])) {
+
+            AvroStorageLog.details("Ignore the pig tuple wrapper.");
+            ResourceFieldSchema[] listSchemas = pigFields[0].getSchema()
+                                            .getFields();
+            if (listSchemas.length != 1)
+                throw new IOException("Expect one subfield from " + pigFields);
+
+            return convert(listSchemas[0], nullable);
+        } else
+            return convertRecord(pigFields, nullable);
+    }
+
+    /**
+     * Convert a Pig ResourceFieldSchema to avro schema
+     * 
+     */
+    protected static Schema convert(ResourceFieldSchema pigSchema, boolean nullable) throws IOException {
+
+        AvroStorageLog.details("Convert pig field schema:" + pigSchema);
+
+        final byte pigType = pigSchema.getType();
+
+        if (pigType == DataType.TUPLE) {
+            AvroStorageLog.details("Convert a pig field tuple: " + pigSchema);
+
+            ResourceFieldSchema[] listSchemas = pigSchema.getSchema()
+                                            .getFields();
+            Schema outSchema = null;
+
+            if (AvroStorageUtils.isTupleWrapper(pigSchema)) {
+                /* remove Pig tuple wrapper */
+                AvroStorageLog.details("Ignore the pig tuple wrapper.");
+                if (listSchemas.length != 1)
+                    throw new IOException("Expect one subfield from "
+                                                    + pigSchema);
+                outSchema = convert(listSchemas[0], nullable);
+            } else {
+                outSchema = convertRecord(listSchemas, nullable);
+            }
+
+            return AvroStorageUtils.wrapAsUnion(outSchema, nullable);
+
+        } else if (pigType == DataType.BAG) {
+
+            AvroStorageLog.details("Convert a pig field bag:" + pigSchema);
+
+            /* Bag elements have to be Tuples */
+            ResourceFieldSchema[] fs = pigSchema.getSchema().getFields();
+            if (fs == null || fs.length != 1
+                                 || fs[0].getType() != DataType.TUPLE)
+                throw new IOException("Expect one tuple field in a bag");
+
+            Schema outSchema = Schema.createArray(convert(fs[0], nullable));
+            return AvroStorageUtils.wrapAsUnion(outSchema, nullable);
+
+        } else if (pigType == DataType.MAP) {
+            /* Pig doesn't provide schema info of Map value */
+            throw new IOException("Please provide schema for Map field!");
+        
+        } else if (pigType == DataType.UNKNOWN) {
+            /* Results of Pig UNION operation is of UNKNOWN type */
+            throw new IOException("Must specify a schema for UNKNOWN pig type.");
+        
+        } else if (pigType == DataType.CHARARRAY
+                                        || pigType == DataType.BIGCHARARRAY
+                                        || pigType == DataType.BOOLEAN
+                                        || pigType == DataType.BYTE
+                                        || pigType == DataType.BYTEARRAY
+                                        || pigType == DataType.DOUBLE
+                                        || pigType == DataType.FLOAT
+                                        || pigType == DataType.INTEGER
+                                        || pigType == DataType.LONG) {
+
+            AvroStorageLog.details("Convert a pig field primitive:" + pigSchema);
+            Schema outSchema = convertPrimitiveType(pigType);
+            return AvroStorageUtils.wrapAsUnion(outSchema, nullable);
+
+        } else
+            throw new IOException("unsupported pig type:"
+                                            + DataType.findTypeName(pigType));
+    }
+
+    /**
+     * Convert pig data to Avro record
+     * 
+     */
+    protected static Schema convertRecord(ResourceFieldSchema[] pigFields, boolean nullable) throws IOException {
+
+        AvroStorageLog.funcCall("convertRecord");
+
+        // Type name is required for Avro record
+        String typeName = getRecordName();
+        Schema outSchema = Schema.createRecord(typeName, null, null, false);
+
+        List<Schema.Field> outFields = new ArrayList<Schema.Field>();
+        for (int i = 0; i < pigFields.length; i++) {
+
+            /* get schema */
+            Schema fieldSchema = convert(pigFields[i], nullable);
+
+            /* get field name of output */
+            String outname = pigFields[i].getName();
+            if (outname == null)
+                outname = FIELD_NAME + "_" + i; // field name cannot be null
+
+            /* get doc of output */
+            String desc = pigFields[i].getDescription();
+
+            outFields.add(new Field(outname, fieldSchema, desc, null));
+        }
+
+        outSchema.setFields(outFields);
+        return outSchema;
+
+    }
+
+    private static String getRecordName() {
+        String name = TUPLE_NAME + "_" + tupleIndex;
+        tupleIndex++;
+        return name;
+    }
+
+    /**
+     * Convert Pig primitive type to Avro type
+     * 
+     */
+    protected static Schema convertPrimitiveType(byte pigType) throws IOException {
+
+        if (pigType == DataType.BOOLEAN) {
+            return AvroStorageUtils.BooleanSchema;
+        } else if (pigType == DataType.BYTEARRAY) {
+            return AvroStorageUtils.BytesSchema;
+        } else if (pigType == DataType.CHARARRAY
+                                        || pigType == DataType.BIGCHARARRAY) {
+            return AvroStorageUtils.StringSchema;
+        } else if (pigType == DataType.DOUBLE) {
+            return AvroStorageUtils.DoubleSchema;
+        } else if (pigType == DataType.FLOAT) {
+            return AvroStorageUtils.FloatSchema;
+        } else if (pigType == DataType.INTEGER) {
+            return AvroStorageUtils.IntSchema;
+        } else if (pigType == DataType.LONG) {
+            return AvroStorageUtils.LongSchema;
+        } else
+            throw new IOException("unsupported pig type:"
+                                            + DataType.findTypeName(pigType));
+
+    }
+
+    
+    // //////////////////////////////////////////////////////////
+    // Methods in Set 2: Validate whether a Pig schema is compatible 
+    //         with a given Avro schema.
+    // //////////////////////////////////////////////////////////
+
+    /**
+     * Validate whether pigSchema is compatible with avroSchema
+     */
+    public static Schema validateAndConvert(Schema avroSchema, ResourceSchema pigSchema) throws IOException {
+        return validateAndConvertRecord(avroSchema, pigSchema.getFields());
+    }
+
+    /**
+     * Validate whether pigSchema is compatible with avroSchema and convert
+     * those Pig fields with to corresponding Avro schemas.
+     */
+    protected static Schema validateAndConvert(Schema avroSchema, ResourceFieldSchema pigSchema) throws IOException {
+
+        AvroStorageLog.details("Validate pig field schema:" + pigSchema);
+
+        /* compatibility check based on data types */
+        if (!isCompatible(avroSchema, pigSchema))
+            throw new IOException("Schemas are not compatible.\n Avro=" + avroSchema + "\n" + "Pig=" + pigSchema);
+
+        final byte pigType = pigSchema.getType();
+        if (avroSchema.getType().equals(Schema.Type.UNION)) {
+            AvroStorageLog.details("Validate Pig schema with Avro union:" + avroSchema);
+
+            List<Schema> unionSchemas = avroSchema.getTypes();
+            for (Schema schema : unionSchemas) {
+                try {
+                    @SuppressWarnings("unused")
+                    Schema s = validateAndConvert(schema, pigSchema);
+                    return avroSchema;
+                } catch (IOException e) {
+                    // ignore the unmatched one
+                }
+            }
+            throw new IOException("pig schema " + pigSchema  + " is not compatible with avro " + avroSchema);
+        } else if (pigType == DataType.TUPLE) {
+            AvroStorageLog.details("Validate a pig tuple: " + pigSchema);
+            ResourceFieldSchema[] pigFields = pigSchema.getSchema().getFields();
+            Schema outSchema = validateAndConvertRecord(avroSchema, pigFields);
+            return outSchema;
+
+        } else if (pigType == DataType.BAG) {
+            AvroStorageLog.details("Validate a pig bag:" + pigSchema);
+
+            /* get fields of containing tuples */
+            ResourceFieldSchema[] fs = pigSchema.getSchema().getFields();
+            if (fs == null || fs.length != 1 || fs[0].getType() != DataType.TUPLE)
+                throw new IOException("Expect one tuple field in a bag");
+
+            Schema inElemSchema = avroSchema.getElementType();
+            Schema outSchema = Schema.createArray(validateAndConvert(inElemSchema, fs[0]));
+            return outSchema;
+        } else if (pigType == DataType.MAP) {
+            AvroStorageLog.details("Cannot validate a pig map. Will use user defined Avro schema.");
+            return avroSchema;
+
+        } else if (pigType == DataType.UNKNOWN  || pigType == DataType.CHARARRAY
+                                                || pigType == DataType.BIGCHARARRAY
+                                                || pigType == DataType.BOOLEAN
+                                                || pigType == DataType.BYTE
+                                                || pigType == DataType.BYTEARRAY
+                                                || pigType == DataType.DOUBLE
+                                                || pigType == DataType.FLOAT
+                                                || pigType == DataType.INTEGER
+                                                || pigType == DataType.LONG) {
+
+            AvroStorageLog.details("Validate a pig primitive type:" + pigSchema);
+            return avroSchema;
+
+        } else
+            throw new IOException("Unsupported pig type:" + DataType.findTypeName(pigType));
+    }
+
+    /**
+     * Validate a Pig tuple is compatible with Avro record. If the Avro schema 
+     * is not complete (with uncovered fields), then convert those fields using 
+     * methods in set 1. 
+     * 
+     * Notice that users can get rid of Pig tuple wrappers, e.g. an Avro schema
+     * "int" is compatible with a Pig schema "T:(int)"
+     * 
+     */
+    protected static Schema validateAndConvertRecord(Schema avroSchema, ResourceFieldSchema[] pigFields) throws IOException {
+
+        /* Get rid of Pig tuple wrappers. */
+        if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
+            if (pigFields.length != 1)
+                throw new IOException("Expect only one field in Pig tuple schema. Avro schema is " + avroSchema.getType());
+
+            return validateAndConvert(avroSchema, pigFields[0]);
+        }
+
+        /* validate and convert a pig tuple with avro record */
+        boolean isPartialSchema = AvroStorageUtils.isUDPartialRecordSchema(avroSchema);
+        AvroStorageLog.details("isPartialSchema=" + isPartialSchema);
+
+        String typeName = isPartialSchema ? getRecordName() : avroSchema.getName();
+        Schema outSchema = Schema.createRecord(typeName, avroSchema.getDoc(), avroSchema.getNamespace(), false);
+
+        List<Schema.Field> inFields = avroSchema.getFields();
+        if (!isPartialSchema && inFields.size() != pigFields.length) {
+            throw new IOException("Expect " + inFields.size() + " fields in pig schema." + " But there are " + pigFields.length);
+        }
+
+        List<Schema.Field> outFields = new ArrayList<Schema.Field>();
+
+        for (int i = 0; i < pigFields.length; i++) {
+            /* get user defined avro field schema */
+            Field inputField = isPartialSchema ? AvroStorageUtils.getUDField(avroSchema, i) : inFields.get(i);
+
+            /* get schema */
+            Schema fieldSchema = null;
+            if (inputField == null) { 
+                /* convert pig schema (nullable) */
+                fieldSchema = convert(pigFields[i], true);
+            } else if (inputField.schema() == null) { 
+                /* convert pig schema (not-null) */
+                fieldSchema = convert(pigFields[i], false);
+            } else { 
+                /* validate pigFields[i] with given avro schema */
+                fieldSchema = validateAndConvert(inputField.schema(),
+                                                pigFields[i]);
+            }
+
+            /* get field name of output */
+            String outname = (isPartialSchema) ? pigFields[i].getName() : inputField.name();
+            if (outname == null)
+                outname = FIELD_NAME + "_" + i; // field name cannot be null
+
+            /* get doc of output */
+            String doc = (isPartialSchema) ? pigFields[i].getDescription() : inputField.doc();
+
+            outFields.add(new Field(outname, fieldSchema, doc, null));
+        }
+
+        outSchema.setFields(outFields);
+        return outSchema;
+
+    }
+
+    /**
+     * Check whether Avro type is compatible with Pig type
+     * 
+     */
+    protected static boolean isCompatible(Schema avroSchema, ResourceFieldSchema pigSchema) {
+
+        Schema.Type avroType = avroSchema.getType();
+        byte pigType = pigSchema.getType();
+
+        if (avroType.equals(Schema.Type.UNION)) {
+            return true;
+        } else if (pigType == DataType.TUPLE) {
+            /* Tuple is compatible with any type; for users may want to
+               get rid of the tuple wrapper */
+            return true;
+        }
+        return  (avroType.equals(Schema.Type.ARRAY) && pigType == DataType.BAG)
+                      || (avroType.equals(Schema.Type.MAP) && pigType == DataType.MAP)
+                      || (avroType.equals(Schema.Type.STRING) 
+                                                      && pigType == DataType.CHARARRAY 
+                                                      || pigType == DataType.BIGCHARARRAY)
+                      || (avroType.equals(Schema.Type.ENUM) 
+                                                      && pigType == DataType.CHARARRAY)
+                      || (avroType.equals(Schema.Type.BOOLEAN) 
+                                                      && pigType == DataType.BOOLEAN 
+                                                      || pigType == DataType.INTEGER)
+                      || (avroType.equals(Schema.Type.BYTES) 
+                                                      && pigType == DataType.BYTEARRAY)
+                      || (avroType.equals(Schema.Type.DOUBLE) 
+                                                      && pigType == DataType.DOUBLE
+                                                      || pigType == DataType.FLOAT
+                                                      || pigType == DataType.INTEGER 
+                                                      || pigType == DataType.LONG)
+                      || (avroType.equals(Schema.Type.FLOAT)
+                                                      && pigType == DataType.FLOAT
+                                                      || pigType == DataType.INTEGER 
+                                                      || pigType == DataType.LONG)
+                      || (avroType.equals(Schema.Type.FIXED) 
+                                                      && pigType == DataType.BYTEARRAY)
+                      || (avroType.equals(Schema.Type.INT) 
+                                                      && pigType == DataType.INTEGER)
+                      || (avroType.equals(Schema.Type.LONG)
+                                                      && pigType == DataType.LONG 
+                                                      || pigType == DataType.INTEGER);
+
+    }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Tue Feb  1 01:40:58 2011
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage.avro;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestAvroStorage {
+
+    protected static final Log LOG = LogFactory.getLog(TestAvroStorage.class);
+
+    private static PigServer pigServerLocal = null;
+
+    final private static String basedir = "src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/";
+
+    final private static String outbasedir = "/tmp/TestAvroStorage/";
+    
+    public static final PathFilter hiddenPathFilter = new PathFilter() {
+        public boolean accept(Path p) {
+          String name = p.getName();
+          return !name.startsWith("_") && !name.startsWith(".");
+        }
+      };
+
+    private static String getInputFile(String file) {
+        return "file:///" + System.getProperty("user.dir") + "/" + basedir + file;
+    }
+
+    final private String testArrayFile = getInputFile("test_array.avro");
+    final private String testRecordFile = getInputFile("test_record.avro");
+
+    @BeforeClass
+    public static void setup() throws ExecException {
+        pigServerLocal = new PigServer(ExecType.LOCAL);
+        deleteDirectory(new File(outbasedir));
+    }
+
+    @AfterClass
+    public static void teardown() {
+        if(pigServerLocal != null) pigServerLocal.shutdown();
+    }
+
+    @Test
+    public void testArrayDefault() throws IOException {
+        String output= outbasedir + "testArrayDefault";
+        String expected = basedir + "expected_testArrayDefault.avro";
+        
+        deleteDirectory(new File(output));
+        
+        String [] queries = {
+           " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+           " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
+            };
+        testAvroStorage( queries);
+        verifyResults(output, expected);
+    }
+
+
+    @Test
+    public void testArrayWithSchema() throws IOException {
+        String output= outbasedir + "testArrayWithSchema";
+        String expected = basedir + "expected_testArrayWithSchema.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+           " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+           " STORE in INTO '" + output +
+               "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( "  +
+               "   'schema', '{\"type\":\"array\",\"items\":\"float\"}'  );"
+            };
+        testAvroStorage( queries);
+        verifyResults(output, expected);
+    }
+    
+    @Test
+    public void testArrayWithNotNull() throws IOException {
+        String output= outbasedir + "testArrayWithNotNull";
+        String expected = basedir + "expected_testArrayWithSchema.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+           " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+           " STORE in INTO '" + output +
+               "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( "  +
+               "   '{\"nullable\": false }'  );"
+            };
+        testAvroStorage( queries);
+        verifyResults(output, expected);
+    }
+    
+    @Test
+    public void testArrayWithSame() throws IOException {
+        String output= outbasedir + "testArrayWithSame";
+        String expected = basedir + "expected_testArrayWithSchema.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+           " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+           " STORE in INTO '" + output +
+               "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( "  +
+               "   'same', '" + testArrayFile + "'  );"
+            };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+
+    @Test
+    public void testRecordWithSplit() throws IOException {
+        String output1= outbasedir + "testRecordSplit1";
+        String output2= outbasedir + "testRecordSplit2";
+        String expected1 = basedir + "expected_testRecordSplit1.avro";
+        String expected2 = basedir + "expected_testRecordSplit2.avro";
+        deleteDirectory(new File(output1));
+        deleteDirectory(new File(output2));
+        String [] queries = {
+           " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+           " groups = GROUP avro BY member_id;",
+           " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;",
+           " STORE sc INTO '" + output1 + "' " +
+                 " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+                 "'{\"index\": 1, " +
+                 "  \"schema\": {\"type\":\"record\", " +
+                                        " \"name\":\"result\", " +
+                                       "  \"fields\":[ {\"name\":\"member_id\",\"type\":\"int\"}, " +
+                                                             "{\"name\":\"count\", \"type\":\"long\"} " +
+                                                          "]" +
+                                         "}" +
+                " }');",
+            " STORE sc INTO '" + output2 +
+                    " 'USING org.apache.pig.piggybank.storage.avro.AvroStorage ('index', '2');"
+            };
+        testAvroStorage( queries);
+        verifyResults(output1, expected1);
+        verifyResults(output2, expected2);
+    }
+    
+    @Test
+    public void testRecordWithFieldSchema() throws IOException {
+        String output= outbasedir + "testRecordWithFieldSchema";
+        String expected = basedir + "expected_testRecordWithFieldSchema.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+           " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+           " avro1 = FILTER avro BY member_id > 1211;",
+           " avro2 = FOREACH avro1 GENERATE member_id, browser_id, tracking_time, act_content ;",
+           " STORE avro2 INTO '" + output + "' " +
+                 " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+                 "'{\"data\":  \"" + testRecordFile + "\" ," +
+                 "  \"field0\": \"int\", " +
+                  " \"field1\":  \"def:browser_id\", " +
+                 "  \"field3\": \"def:act_content\" " +
+                " }');"
+            };
+        testAvroStorage( queries);
+        verifyResults(output, expected);
+    }
+    
+    private static void deleteDirectory (File path) {
+        if ( path.exists()) {
+            File [] files = path.listFiles();
+            for (File file: files) {
+                if (file.isDirectory()) 
+                    deleteDirectory(file);
+                file.delete();
+            }
+        }
+    }
+    
+    private void testAvroStorage(String ...queries) throws IOException {
+        pigServerLocal.setBatchOn();
+
+        for (String query: queries){
+            if (query != null && query.length() > 0)
+                pigServerLocal.registerQuery(query);
+        }
+        pigServerLocal.executeBatch();
+    }
+    
+    private void verifyResults(String outPath, String expectedOutpath) throws IOException {
+        FileSystem fs = FileSystem.getLocal(new Configuration()) ; 
+        
+        /* read in expected results*/
+        Set<Object> expected = getExpected (expectedOutpath);
+        
+        /* read in output results and compare */
+        Path output = new Path(outPath);
+        assertTrue("Output dir does not exists!", fs.exists(output)
+                && fs.getFileStatus(output).isDir());
+        
+        Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
+        assertTrue("Split field dirs not found!", paths != null);
+
+        for (Path path : paths) {
+          Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
+          assertTrue("No files found for path: " + path.toUri().getPath(),
+                  files != null);
+          for (Path filePath : files) {
+            assertTrue("This shouldn't be a directory", fs.isFile(filePath));
+            
+            GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
+            
+            DataFileStream<Object> in = new DataFileStream<Object>(
+                                            fs.open(filePath), reader);
+            int count = 0;
+            while (in.hasNext()) {
+                Object obj = in.next();
+              //System.out.println("obj = " + (GenericData.Array<Float>)obj);
+              assertTrue(expected.contains(obj));
+              count++;
+            }        
+            in.close();
+            assertEquals(count, expected.size());
+          }
+        }
+      }
+    
+    private Set<Object> getExpected (String pathstr ) throws IOException {
+        
+        Set<Object> ret = new HashSet<Object>();
+        FileSystem fs = FileSystem.getLocal(new Configuration())  ; 
+                                    
+        /* read in output results and compare */
+        Path output = new Path(pathstr);
+        assertTrue("Expected output does not exists!", fs.exists(output));
+    
+        Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
+        assertTrue("Split field dirs not found!", paths != null);
+
+        for (Path path : paths) {
+            Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
+            assertTrue("No files found for path: " + path.toUri().getPath(), files != null);
+            for (Path filePath : files) {
+                assertTrue("This shouldn't be a directory", fs.isFile(filePath));
+        
+                GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
+        
+                DataFileStream<Object> in = new DataFileStream<Object>(fs.open(filePath), reader);
+        
+                while (in.hasNext()) {
+                    Object obj = in.next();
+                    ret.add(obj);
+                }        
+                in.close();
+            }
+        }
+        return ret;
+  }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java Tue Feb  1 01:40:58 2011
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage.avro;
+
+import org.apache.avro.Schema;
+import org.apache.pig.piggybank.storage.avro.AvroStorageUtils;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+public class TestAvroStorageUtils {
+
+    // Common elements of test records
+    private final String TYPE_RECORD = "{ \"type\" : \"record\", ";
+    private final String NAME_NODE   =   "\"name\": \"Node\" , ";
+    private final String FIELDS_VALUE = " \"fields\": [ { \"name\": \"value\", \"type\":\"int\"}, ";
+
+    public final String RECORD_BEGINNING = TYPE_RECORD + NAME_NODE + FIELDS_VALUE;
+
+    @Test
+    public void canIdentifyRecursiveRecords() throws IOException {
+        final String str1 = RECORD_BEGINNING +
+        		                     "{ \"name\": \"next\", \"type\": [\"null\", \"Node\"] } ] }";
+        Schema s = Schema.parse(str1);
+        assertTrue(AvroStorageUtils.containsRecursiveRecord(s));
+
+        final String str2 = "{\"type\": \"array\", \"items\": "  + str1 + "}";
+        s = Schema.parse(str2);
+        assertTrue(AvroStorageUtils.containsRecursiveRecord(s));
+
+        final String str3 ="[\"null\", " + str2 + "]";
+        s = Schema.parse(str3);
+        assertTrue(AvroStorageUtils.containsRecursiveRecord(s));
+    }
+
+    @Test
+    public void canIdentifyNonRecursiveRecords() throws IOException {
+        final String non = RECORD_BEGINNING + "{ \"name\": \"next\", \"type\": [\"null\", \"string\"] } ] }";
+        assertFalse(AvroStorageUtils.containsRecursiveRecord(Schema.parse(non)));
+    }
+
+     @Test
+     public void testGenericUnion() throws IOException {
+
+        final String str1 = "[ \"string\", \"int\", \"boolean\"  ]";
+        Schema s = Schema.parse(str1);
+        assertTrue(AvroStorageUtils.containsGenericUnion(s));
+
+        final String str2 = "[ \"string\", \"int\", \"null\"  ]";
+        s = Schema.parse(str2);
+        assertTrue(AvroStorageUtils.containsGenericUnion(s));
+
+        final String str3 = "[ \"string\", \"null\"  ]";
+        s = Schema.parse(str3);
+        assertFalse(AvroStorageUtils.containsGenericUnion(s));
+        Schema realSchema = AvroStorageUtils.getAcceptedType(s);
+        assertEquals(AvroStorageUtils.StringSchema, realSchema);
+
+        final String str4 =  "{\"type\": \"array\", \"items\": "  + str2 + "}";
+        s = Schema.parse(str4);
+        assertTrue(AvroStorageUtils.containsGenericUnion(s));
+        try {
+            realSchema = AvroStorageUtils.getAcceptedType(s);
+            fail("\"Should throw a runtime exception when trying to get accepted type from a unacceptable union");
+        } catch (Exception e) {
+            assertTrue(e.getMessage().contains("Cannot call this function on a unacceptable union"));
+        }
+
+        final String str5 = RECORD_BEGINNING +
+                                 "{ \"name\": \"next\", \"type\": [\"null\", \"int\"] } ] }";
+        s = Schema.parse(str5);
+        assertFalse(AvroStorageUtils.containsGenericUnion(s));
+
+        final String str6 = RECORD_BEGINNING +
+                                 "{ \"name\": \"next\", \"type\": [\"string\", \"int\"] } ] }";
+        s = Schema.parse(str6);
+        assertTrue(AvroStorageUtils.containsGenericUnion(s));
+
+        final String str7 = "[ \"string\"  ]"; /*union with one type*/
+        s = Schema.parse(str7);
+        assertFalse(AvroStorageUtils.containsGenericUnion(s));
+        realSchema = AvroStorageUtils.getAcceptedType(s);
+        assertEquals(AvroStorageUtils.StringSchema, realSchema);
+
+        final String str8 = "[  ]"; /*union with no type*/
+        s = Schema.parse(str8);
+        assertFalse(AvroStorageUtils.containsGenericUnion(s));
+        realSchema = AvroStorageUtils.getAcceptedType(s);
+        assertNull(realSchema);
+    }
+
+}

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testArrayDefault.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testArrayDefault.avro?rev=1065886&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testArrayDefault.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testArrayWithSchema.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testArrayWithSchema.avro?rev=1065886&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testArrayWithSchema.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit1.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit1.avro?rev=1065886&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit1.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit2.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit2.avro?rev=1065886&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit2.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordWithFieldSchema.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordWithFieldSchema.avro?rev=1065886&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordWithFieldSchema.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_array.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_array.avro?rev=1065886&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_array.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_record.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_record.avro?rev=1065886&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_record.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1065886&r1=1065885&r2=1065886&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Tue Feb  1 01:40:58 2011
@@ -65,6 +65,11 @@
     <dependency org="commons-cli" name="commons-cli" rev="${commons-cli.version}"
       conf="checkstyle->master"/>
     
+    <dependency org="org.apache.avro" name="avro" rev="${avro.version}"
+      conf="compile->master;checkstyle->master"/>
+    <dependency org="com.googlecode.json-simple" name="json-simple" rev="${json-simple.version}"
+      conf="compile->master;checkstyle->master"/>
+
     <dependency org="jdiff" name="jdiff" rev="${jdiff.version}"
       conf="jdiff->default"/>
     <dependency org="xerces" name="xerces" rev="${xerces.version}"

Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1065886&r1=1065885&r2=1065886&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Tue Feb  1 01:40:58 2011
@@ -18,6 +18,7 @@ apacheant.version=1.7.1
 antlr.version=2.7.6
 pig.version=0.8.0
 
+avro.version=1.4.1
 commons-beanutils.version=1.7.0
 commons-cli.version=1.0
 commons-el.version=1.0
@@ -30,13 +31,14 @@ hadoop-core.version=0.20.2
 hadoop-test.version=0.20.2
 hbase.version=0.20.6
 hsqldb.version=1.8.0.10
-jackson.version=1.0.1
+jackson.version=1.6.0
 javacc.version=4.2
 jdiff.version=1.0.9
 jetty-util.version=6.1.14
 jline.version=0.9.94
 joda-time.version=1.6
 jsch.version=0.1.38
+json-simple.version=1.1
 junit.version=4.5
 jython.version=2.5.0
 log4j.version=1.2.14
@@ -45,3 +47,4 @@ slf4j-api.version=1.4.3
 slf4j-log4j12.version=1.4.3
 xerces.version=1.4.4
 wagon-http.version=1.0-beta-2
+