You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2012/08/20 09:14:55 UTC

svn commit: r1374925 - in /pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/ contrib/piggybank/java/src/test/java/org/apache/pig/pi...

Author: sms
Date: Mon Aug 20 07:14:54 2012
New Revision: 1374925

URL: http://svn.apache.org/viewvc?rev=1374925&view=rev
Log:
PIG-2875: Add recursive record support to AvroStorage (cheolsoo via sms)

Added:
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference1.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference2.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference3.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_generic_union.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_array.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_map.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_record.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avsc
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1374925&r1=1374924&r2=1374925&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Aug 20 07:14:54 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2875: Add recursive record support to AvroStorage (cheolsoo via sms)
+
 PIG-2662: skew join does not honor its config parameters (rajesh.balamohan via thejas)
 
 PIG-2871: Refactor signature for PigReducerEstimator (billgraham)

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java?rev=1374925&r1=1374924&r2=1374925&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java Mon Aug 20 07:14:54 2012
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -18,7 +18,10 @@
 package org.apache.pig.piggybank.storage.avro;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+
 import org.apache.avro.Schema;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -47,13 +50,11 @@ public class AvroSchema2Pig {
      */
     public static ResourceSchema convert(Schema schema) throws IOException {
 
-        if (AvroStorageUtils.containsRecursiveRecord(schema))
-            throw new IOException ("We don't accept schema containing recursive records.");
-        
         if (AvroStorageUtils.containsGenericUnion(schema))
             throw new IOException ("We don't accept schema containing generic unions.");
 
-        ResourceFieldSchema inSchema = inconvert(schema, FIELD);
+        Set<Schema> visitedRecords = new HashSet<Schema>();
+        ResourceFieldSchema inSchema = inconvert(schema, FIELD, visitedRecords);
 
         ResourceSchema tupleSchema;
         if (inSchema.getType() == DataType.TUPLE) {
@@ -73,7 +74,8 @@ public class AvroSchema2Pig {
     /**
      * Convert a schema with field name to a pig schema
      */
-     private static ResourceFieldSchema inconvert(Schema in, String fieldName) throws IOException {
+     private static ResourceFieldSchema inconvert(Schema in, String fieldName, Set<Schema> visitedRecords)
+             throws IOException {
 
         AvroStorageLog.details("InConvert avro schema with field name " + fieldName);
 
@@ -85,24 +87,29 @@ public class AvroSchema2Pig {
 
             AvroStorageLog.details("convert to a pig tuple");
 
-            fieldSchema.setType(DataType.TUPLE);
-            ResourceSchema tupleSchema = new ResourceSchema();
-            List<Schema.Field> fields = in.getFields();
-            ResourceFieldSchema[] childFields = new ResourceFieldSchema[fields.size()];
-            int index = 0;
-            for (Schema.Field field : fields) {
-                childFields[index++] = inconvert(field.schema(), field.name());
-            }
+            if (visitedRecords.contains(in)) {
+                fieldSchema.setType(DataType.BYTEARRAY);
+            } else {
+                visitedRecords.add(in);
+                fieldSchema.setType(DataType.TUPLE);
+                ResourceSchema tupleSchema = new ResourceSchema();
+                List<Schema.Field> fields = in.getFields();
+                ResourceFieldSchema[] childFields = new ResourceFieldSchema[fields.size()];
+                int index = 0;
+                for (Schema.Field field : fields) {
+                    childFields[index++] = inconvert(field.schema(), field.name(), visitedRecords);
+                }
 
-            tupleSchema.setFields(childFields);
-            fieldSchema.setSchema(tupleSchema);
+                tupleSchema.setFields(childFields);
+                fieldSchema.setSchema(tupleSchema);
+            }
 
         } else if (avroType.equals(Schema.Type.ARRAY)) {
 
             AvroStorageLog.details("convert array to a pig bag");
             fieldSchema.setType(DataType.BAG);
             Schema elemSchema = in.getElementType();
-            ResourceFieldSchema subFieldSchema = inconvert(elemSchema, ARRAY_FIELD);
+            ResourceFieldSchema subFieldSchema = inconvert(elemSchema, ARRAY_FIELD, visitedRecords);
             add2BagSchema(fieldSchema, subFieldSchema);
 
         } else if (avroType.equals(Schema.Type.MAP)) {
@@ -114,7 +121,7 @@ public class AvroSchema2Pig {
 
             if (AvroStorageUtils.isAcceptableUnion(in)) {
                 Schema acceptSchema = AvroStorageUtils.getAcceptedType(in);
-                ResourceFieldSchema realFieldSchema = inconvert(acceptSchema, null);
+                ResourceFieldSchema realFieldSchema = inconvert(acceptSchema, null, visitedRecords);
                 fieldSchema.setType(realFieldSchema.getType());
                 fieldSchema.setSchema(realFieldSchema.getSchema());
             } else
@@ -138,7 +145,7 @@ public class AvroSchema2Pig {
             fieldSchema.setType(DataType.LONG);
         } else if (avroType.equals(Schema.Type.STRING)) {
             fieldSchema.setType(DataType.CHARARRAY);
-        } else if (avroType.equals(Schema.Type.NULL)) { 
+        } else if (avroType.equals(Schema.Type.NULL)) {
             // value of NULL is always NULL
             fieldSchema.setType(DataType.INTEGER);
         } else {
@@ -154,7 +161,7 @@ public class AvroSchema2Pig {
                                     ResourceFieldSchema subFieldSchema)
                                     throws IOException {
 
-        ResourceFieldSchema wrapped = (subFieldSchema.getType() == DataType.TUPLE) 
+        ResourceFieldSchema wrapped = (subFieldSchema.getType() == DataType.TUPLE)
                                                               ? subFieldSchema
                                                               : AvroStorageUtils.wrapAsTuple(subFieldSchema);
 

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java?rev=1374925&r1=1374924&r2=1374925&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java Mon Aug 20 07:14:54 2012
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -17,6 +17,7 @@
 
 package org.apache.pig.piggybank.storage.avro;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -27,7 +28,7 @@ import org.apache.avro.Schema.Type;
 /**
  * This class creates two maps out of a given Avro schema. And it supports
  * looking up avro schemas using either type name or field name.
- * 
+ *
  * 1. map[type name] = > avro schema
  * 2. map[field name] => avro schema
  *
@@ -42,11 +43,18 @@ public class AvroSchemaManager {
     /**
      * Construct with a given schema
      */
-    public AvroSchemaManager(Schema schema) {
+    public AvroSchemaManager(Schema schema) throws IOException {
 
         name2Schema = new HashMap<String, Schema>();
         typeName2Schema = new HashMap<String, Schema>();
 
+        if (AvroStorageUtils.containsRecursiveRecord(schema)) {
+            throw new IOException ("Schema containing recursive records cannot be referred to"
+                + " by 'data' and 'schema_file'. Please instead use 'same' with a path that"
+                + " points to an avro file encoded by the same schema as what you want to use,"
+                + " or use 'schema' with a json string representation." );
+        }
+
         init(null, schema, false);
     }
 
@@ -58,7 +66,7 @@ public class AvroSchemaManager {
     /**
      * Initialize given a schema
      */
-    protected void init(String namespace, Schema schema, 
+    protected void init(String namespace, Schema schema,
                                     boolean ignoreNameMap) {
 
         /* put to map[type name]=>schema */

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1374925&r1=1374924&r2=1374925&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Mon Aug 20 07:14:54 2012
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -67,22 +67,22 @@ import org.json.simple.parser.ParseExcep
 public class AvroStorage extends FileInputLoadFunc implements StoreFuncInterface, LoadMetadata {
 
     /* storeFunc parameters */
-    private static final String NOTNULL = "NOTNULL"; 
+    private static final String NOTNULL = "NOTNULL";
     private static final String AVRO_OUTPUT_SCHEMA_PROPERTY = "avro_output_schema";
     private static final String SCHEMA_DELIM = "#";
     private static final String SCHEMA_KEYVALUE_DELIM = "@";
-    
+
     /* FIXME: we use this variable to distinguish schemas specified
-     * by different AvroStorage calls and will remove this once 
-     * StoreFunc provides access to Pig output schema in backend. 
+     * by different AvroStorage calls and will remove this once
+     * StoreFunc provides access to Pig output schema in backend.
      * Default value is 0.
      */
     private int storeFuncIndex = 0;
     private PigAvroRecordWriter writer = null;    /* avro record writer */
-    private Schema outputAvroSchema = null;  /* output avro schema */ 
+    private Schema outputAvroSchema = null;  /* output avro schema */
     /* indicate whether data is nullable */
-    private boolean nullable = true;                    
-    
+    private boolean nullable = true;
+
     /* loadFunc parameters */
     private PigAvroRecordReader reader = null;   /* avro record writer */
     private Schema inputAvroSchema = null;    /* input avro schema */
@@ -110,24 +110,25 @@ public class AvroStorage extends FileInp
 
         outputAvroSchema = null;
         nullable = true;
-
         checkSchema = true;
-        if (parts.length == 1 && parts[0].equalsIgnoreCase("no_schema_check")) {
-            checkSchema = false;
-        } else {
-            /*parse input parameters */
-            Map<String, Object> map = parts.length > 1 ? parseStringList(parts) : parseJsonString(parts[0]);
 
-            init(map);  /* initialize */
+        if (parts.length == 1 && !parts[0].equalsIgnoreCase("no_schema_check")) {
+            /* If one parameter is given, and that is not 'no_schema_check',
+             * then it must be a json string.
+             */
+            init(parseJsonString(parts[0]));
+        } else {
+            /* parse parameters */
+            init(parseStringList(parts));
         }
     }
 
-    
+
     /**
-     * Set input location and obtain input schema. 
-     * 
-     * FIXME: currently we assume all avro files under the same "location" 
-     * share the same schema and will throw exception if not. 
+     * Set input location and obtain input schema.
+     *
+     * FIXME: currently we assume all avro files under the same "location"
+     * share the same schema and will throw exception if not.
      */
     @Override
     public void setLocation(String location, Job job) throws IOException {
@@ -155,12 +156,12 @@ public class AvroStorage extends FileInp
     /**
      * Get avro schema of input path. There are three cases:
      * 1. if path is a file, then return its avro schema;
-     * 2. if path is a first-level directory (no sub-directories), then 
-     * return the avro schema of one underlying file; 
-     * 3. if path contains sub-directories, then recursively check 
-     * whether all of them share the same schema and return it 
+     * 2. if path is a first-level directory (no sub-directories), then
+     * return the avro schema of one underlying file;
+     * 3. if path contains sub-directories, then recursively check
+     * whether all of them share the same schema and return it
      * if so or throw an exception if not.
-     * 
+     *
      * @param path input path
      * @param fs file system
      * @return avro schema of data
@@ -175,13 +176,13 @@ public class AvroStorage extends FileInp
         if (!fs.isDirectory(path)) {
             return getSchema(path, fs);
         }
-        
+
         FileStatus[] ss = fs.listStatus(path, AvroStorageUtils.PATH_FILTER);
         Schema schema = null;
         if (ss.length > 0) {
             if (AvroStorageUtils.noDir(ss))
                 return getSchema(path, fs);
-            
+
             /*otherwise, check whether schemas of underlying directories are the same */
             for (FileStatus s : ss) {
                 Schema newSchema = getAvroSchema(s.getPath(), fs);
@@ -197,19 +198,19 @@ public class AvroStorage extends FileInp
                 }
             }
         }
-        
+
         if (schema == null)
             System.err.println("Cannot get avro schema! Input path " + path + " might be empty.");
-        
+
         System.err.println(schema.toString());
         return schema;
     }
 
     /**
-     * This method is called by {@link #getAvroSchema}. The default implementation 
-     * returns the schema of an avro file; or the schema of the last file in a first-level 
+     * This method is called by {@link #getAvroSchema}. The default implementation
+     * returns the schema of an avro file; or the schema of the last file in a first-level
      * directory (it does not contain sub-directories).
-     * 
+     *
      * @param path  path of a file or first level directory
      * @param fs  file system
      * @return avro schema
@@ -263,7 +264,7 @@ public class AvroStorage extends FileInp
             return new TextInputFormat();
     }
 
-    @SuppressWarnings("rawtypes") 
+    @SuppressWarnings("rawtypes")
     @Override
     public void prepareToRead(RecordReader reader, PigSplit split)
     throws IOException {
@@ -283,9 +284,9 @@ public class AvroStorage extends FileInp
         }
     }
 
-    
+
     /* implement LoadMetadata */
-    
+
     /**
      * Get avro schema from "location" and return the converted
      * PigSchema.
@@ -329,8 +330,8 @@ public class AvroStorage extends FileInp
     }
 
     /**
-     * build a property map from a json object 
-     * 
+     * build a property map from a json object
+     *
      * @param jsonString  json object in string format
      * @return a property map
      * @throws ParseException
@@ -357,14 +358,14 @@ public class AvroStorage extends FileInp
                /* convert avro schema (as json object) to string */
                 obj.put(key, value.toString().trim());
             }
- 
+
         }
         return obj;
     }
 
     /**
-     * build a property map from a string list 
-     * 
+     * build a property map from a string list
+     *
      * @param parts  input string list
      * @return a property map
      * @throws IOException
@@ -374,25 +375,34 @@ public class AvroStorage extends FileInp
 
         Map<String, Object> map = new HashMap<String, Object>();
 
-        for (int i = 0; i < parts.length - 1; i += 2) {
+        for (int i = 0; i < parts.length; ) {
             String name = parts[i].trim();
-            String value = parts[i+1].trim();
-            if (name.equalsIgnoreCase("debug")
-                         || name.equalsIgnoreCase("index")) {
-                /* store value as integer */
-                map.put(name, Integer.parseInt(value));
-            } else if (name.equalsIgnoreCase("data")
-                         || name.equalsIgnoreCase("same")
-                         || name.equalsIgnoreCase("schema")
-                         || name.equalsIgnoreCase("schema_file")
-                         || name.matches("field\\d+")) {
-                /* store value as string */
-                map.put(name, value);
-            } else if (name.equalsIgnoreCase("nullable")) {
-                /* store value as boolean */
-                map.put(name, Boolean.getBoolean(value));
-            } else
-                throw new IOException("Invalid parameter:" + name);
+            if (name.equalsIgnoreCase("no_schema_check")) {
+                checkSchema = false;
+                /* parameter only, so increase iteration counter by 1 */
+                i += 1;
+            } else {
+                String value = parts[i+1].trim();
+                if (name.equalsIgnoreCase("debug")
+                             || name.equalsIgnoreCase("index")) {
+                    /* store value as integer */
+                    map.put(name, Integer.parseInt(value));
+                } else if (name.equalsIgnoreCase("data")
+                             || name.equalsIgnoreCase("same")
+                             || name.equalsIgnoreCase("schema")
+                             || name.equalsIgnoreCase("schema_file")
+                             || name.matches("field\\d+")) {
+                    /* store value as string */
+                    map.put(name, value);
+                } else if (name.equalsIgnoreCase("nullable")) {
+                    /* store value as boolean */
+                    map.put(name, Boolean.getBoolean(value));
+                } else {
+                    throw new IOException("Invalid parameter:" + name);
+                }
+                /* parameter/value pair, so increase iteration counter by 2 */
+                i += 2;
+            }
         }
         return map;
     }
@@ -432,7 +442,7 @@ public class AvroStorage extends FileInp
             String name = entry.getKey().trim();
             Object value = entry.getValue();
 
-            if (name.equalsIgnoreCase("index")) { 
+            if (name.equalsIgnoreCase("index")) {
                 /* set index of store function */
                 storeFuncIndex = (Integer) value;
             } else if (name.equalsIgnoreCase("same")) {
@@ -518,7 +528,7 @@ public class AvroStorage extends FileInp
     }
 
     /**
-     * Append newly specified schema 
+     * Append newly specified schema
      */
     @Override
     public void checkSchema(ResourceSchema s) throws IOException {
@@ -529,7 +539,7 @@ public class AvroStorage extends FileInp
         AvroStorageLog.details("Previously defined schemas=" + prevSchemaStr);
 
         String key = getSchemaKey();
-        Map<String, String> schemaMap = (prevSchemaStr != null) 
+        Map<String, String> schemaMap = (prevSchemaStr != null)
                                                                 ? parseSchemaMap(prevSchemaStr)
                                                                 : null;
 
@@ -539,17 +549,19 @@ public class AvroStorage extends FileInp
         }
 
         /* validate and convert output schema */
-        Schema schema = outputAvroSchema != null 
-                                      ? PigSchema2Avro.validateAndConvert(outputAvroSchema, s)
-                                      : PigSchema2Avro.convert(s, nullable);
+        Schema schema = outputAvroSchema != null
+                                  ? (checkSchema
+                                          ? PigSchema2Avro.validateAndConvert(outputAvroSchema, s)
+                                          : outputAvroSchema)
+                                  : PigSchema2Avro.convert(s, nullable);
 
         AvroStorageLog.info("key=" + key + " outputSchema=" + schema);
 
         String schemaStr = schema.toString();
         String append = key + SCHEMA_KEYVALUE_DELIM + schemaStr;
 
-        String newSchemaStr = (schemaMap != null) 
-                                                ? prevSchemaStr + SCHEMA_DELIM + append 
+        String newSchemaStr = (schemaMap != null)
+                                                ? prevSchemaStr + SCHEMA_DELIM + append
                                                 : append;
         property.setProperty(AVRO_OUTPUT_SCHEMA_PROPERTY, newSchemaStr);
         AvroStorageLog.details("New schemas=" + newSchemaStr);
@@ -598,7 +610,7 @@ public class AvroStorage extends FileInp
         return new PigAvroOutputFormat(schema);
     }
 
-    @SuppressWarnings("rawtypes") 
+    @SuppressWarnings("rawtypes")
     @Override
     public  void  prepareToWrite(RecordWriter writer) throws IOException {
         this.writer = (PigAvroRecordWriter) writer;

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java?rev=1374925&r1=1374924&r2=1374925&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java Mon Aug 20 07:14:54 2012
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -113,7 +113,7 @@ public class AvroStorageUtils {
 
     /**
      * Adds all non-hidden directories and subdirectories to set param
-     * 
+     *
      * @throws IOException
      */
     static boolean getAllSubDirs(Path path, Job job, Set<Path> paths) throws IOException {
@@ -158,15 +158,15 @@ public class AvroStorageUtils {
         }
         return true;
     }
-    
-    /** get last file of a hdfs path if it is  a directory; 
+
+    /** get last file of a hdfs path if it is  a directory;
      *   or return the file itself if path is a file
      */
     public static Path getLast(String path, FileSystem fs) throws IOException {
         return getLast(new Path(path), fs);
     }
 
-    /** get last file of a hdfs path if it is  a directory; 
+    /** get last file of a hdfs path if it is  a directory;
      *   or return the file itself if path is a file
      */
     public static Path getLast(Path path, FileSystem fs) throws IOException {
@@ -182,7 +182,7 @@ public class AvroStorageUtils {
     }
 
     /**
-     * Wrap an avro schema as a nullable union if needed. 
+     * Wrap an avro schema as a nullable union if needed.
      * For instance, wrap schema "int" as ["null", "int"]
      */
     public static Schema wrapAsUnion(Schema schema, boolean nullable) {
@@ -204,21 +204,21 @@ public class AvroStorageUtils {
         Set<String> set = new HashSet<String> ();
         return containsRecursiveRecord(s, set);
     }
-  
+
     /**
      * Called by {@link #containsRecursiveRecord(Schema)} and it recursively checks
-     * whether the input schema contains recursive records. 
+     * whether the input schema contains recursive records.
      */
     protected static boolean containsRecursiveRecord(Schema s, Set<String> definedRecordNames) {
-        
+
         /* if it is a record, check itself and all fields*/
         if (s.getType().equals(Schema.Type.RECORD)) {
             String name = s.getName();
             if (definedRecordNames.contains(name)) return true;
-            
+
             /* add its own name into defined record set*/
             definedRecordNames.add(s.getName());
-            
+
             /* check all fields */
             List<Field> fields = s.getFields();
             for (Field field: fields) {
@@ -226,25 +226,25 @@ public class AvroStorageUtils {
                 if (containsRecursiveRecord(fs, definedRecordNames))
                     return true;
             }
-            
+
             /* remove its own name from the name set */
             definedRecordNames.remove(s.getName());
 
             return false;
         }
-        
+
         /* if it is an array, check its element type */
         else if (s.getType().equals(Schema.Type.ARRAY)) {
             Schema fs = s.getElementType();
             return containsRecursiveRecord(fs, definedRecordNames);
         }
-        
+
         /*if it is a map, check its value type */
         else if (s.getType().equals(Schema.Type.MAP)) {
             Schema vs = s.getValueType();
             return containsRecursiveRecord(vs, definedRecordNames);
         }
-        
+
         /* if it is a union, check all possible types */
         else if (s.getType().equals(Schema.Type.UNION)) {
             List<Schema> types = s.getTypes();
@@ -254,57 +254,84 @@ public class AvroStorageUtils {
             }
             return false;
         }
-        
+
         /* return false for other cases */
         else {
             return false;
         }
     }
-    
+
     /** determine whether the input schema contains generic unions */
     public static boolean containsGenericUnion(Schema s) {
-             
+        /* initialize empty set of visited records */
+        Set<Schema> set = new HashSet<Schema> ();
+        return containsGenericUnion(s, set);
+    }
+
+    /**
+     * Called by {@link #containsGenericUnion(Schema)} and it recursively checks
+     * whether the input schema contains generic unions.
+     */
+    protected static boolean containsGenericUnion(Schema s, Set<Schema> visitedRecords) {
+
         /* if it is a record, check all fields*/
         if (s.getType().equals(Schema.Type.RECORD)) {
+
+            /* add its own name into visited record set*/
+            visitedRecords.add(s);
+
+            /* check all fields */
             List<Field> fields = s.getFields();
             for (Field field: fields) {
                 Schema fs = field.schema();
-                if (containsGenericUnion(fs))
-                    return true;
+                if (!visitedRecords.contains(fs)) {
+                    if (containsGenericUnion(fs, visitedRecords)) {
+                        return true;
+                    }
+                }
             }
             return false;
         }
-        
+
         /* if it is an array, check its element type */
         else if (s.getType().equals(Schema.Type.ARRAY)) {
             Schema fs = s.getElementType();
-            return  containsGenericUnion(fs) ;
+            if (!visitedRecords.contains(fs)) {
+                return containsGenericUnion(fs, visitedRecords);
+            }
+            return false;
         }
-        
+
         /*if it is a map, check its value type */
         else if (s.getType().equals(Schema.Type.MAP)) {
             Schema vs = s.getValueType();
-            return containsGenericUnion(vs) ;
+            if (!visitedRecords.contains(vs)) {
+                return containsGenericUnion(vs, visitedRecords);
+            }
+            return false;
         }
-        
+
         /* if it is a union, check all possible types and itself */
         else if (s.getType().equals(Schema.Type.UNION)) {
             List<Schema> types = s.getTypes();
             for (Schema type: types) {
-                if (containsGenericUnion(type) )
-                    return true;
+                if (!visitedRecords.contains(type)) {
+                    if (containsGenericUnion(type, visitedRecords)) {
+                        return true;
+                    }
+                }
             }
             /* check whether itself is acceptable (null-union) */
-            return  ! isAcceptableUnion (s);
+            return !isAcceptableUnion(s);
         }
-        
+
         /* return false for other cases */
         else {
             return false;
         }
     }
-    
-    /** determine whether a union is a nullable union; 
+
+    /** determine whether a union is a nullable union;
      * note that this function doesn't check containing
      * types of the input union recursively. */
     public static boolean isAcceptableUnion(Schema in) {
@@ -347,22 +374,22 @@ public class AvroStorageUtils {
 
     /** extract schema from a nullable union */
     public static Schema getAcceptedType(Schema in) {
-        if (!isAcceptableUnion(in)) 
+        if (!isAcceptableUnion(in))
             throw new RuntimeException("Cannot call this function on a unacceptable union");
-    
+
         List<Schema> types = in.getTypes();
         switch (types.size()) {
-        case 0: 
+        case 0:
             return null; /*union with no type*/
-        case 1: 
+        case 1:
             return types.get(0); /*union with one type*/
         case 2:
             return  (types.get(0).getType().equals(Schema.Type.NULL))
-                        ? types.get(1) 
+                        ? types.get(1)
                         : types.get(0);
         default:
             return null;
-        } 
+        }
     }
 
 }

Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1374925&r1=1374924&r2=1374925&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Mon Aug 20 07:14:54 2012
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -60,7 +60,7 @@ public class TestAvroStorage {
     final private static String basedir = "src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/";
 
     final private static String outbasedir = "/tmp/TestAvroStorage/";
-    
+
     public static final PathFilter hiddenPathFilter = new PathFilter() {
         public boolean accept(Path p) {
           String name = p.getName();
@@ -82,11 +82,82 @@ public class TestAvroStorage {
     final private String testArrayFile = getInputFile("test_array.avro");
     final private String testRecordFile = getInputFile("test_record.avro");
     final private String testRecordSchema = getInputFile("test_record.avsc");
-    final private String testRecursiveSchemaFile = getInputFile("test_recursive_schema.avro");
-    final private String testGenericUnionSchemaFile = getInputFile("test_generic_union_schema.avro");
+    final private String testGenericUnionFile = getInputFile("test_generic_union.avro");
+    final private String testRecursiveRecordInMap = getInputFile("test_recursive_record_in_map.avro");
+    final private String testRecursiveRecordInArray = getInputFile("test_recursive_record_in_array.avro");
+    final private String testRecursiveRecordInUnion = getInputFile("test_recursive_record_in_union.avro");
+    final private String testRecursiveRecordInRecord = getInputFile("test_recursive_record_in_record.avro");
+    final private String testRecursiveRecordInUnionSchema = getInputFile("test_recursive_record_in_union.avsc");
     final private String testTextFile = getInputFile("test_record.txt");
     final private String testSingleTupleBagFile = getInputFile("messages.avro");
     final private String testNoExtensionFile = getInputFile("test_no_extension");
+    final private String recursiveRecordInMap =
+        " {" +
+        "   \"type\" : \"record\"," +
+        "   \"name\" : \"recursive_record\"," +
+        "   \"fields\" : [ {" +
+        "     \"name\" : \"id\"," +
+        "     \"type\" : \"int\"" +
+        "   }, {" +
+        "     \"name\" : \"nested\"," +
+        "     \"type\" : [ \"null\", {" +
+        "       \"type\" : \"map\"," +
+        "       \"values\" : \"recursive_record\"" +
+        "     } ]" +
+        "   } ]" +
+        " }";
+    final private String recursiveRecordInArray =
+        " {" +
+        "   \"type\" : \"record\"," +
+        "   \"name\" : \"recursive_record\"," +
+        "   \"fields\" : [ {" +
+        "     \"name\" : \"id\"," +
+        "     \"type\" : \"int\"" +
+        "   }, {" +
+        "     \"name\" : \"nested\"," +
+        "     \"type\" : [ \"null\", {" +
+        "       \"type\" : \"array\"," +
+        "       \"items\" : \"recursive_record\"" +
+        "     } ]" +
+        "   } ]" +
+        " }";
+    final private String recursiveRecordInUnion =
+        " {" +
+        "   \"type\" : \"record\"," +
+        "   \"name\" : \"recursive_record\"," +
+        "   \"fields\" : [ {" +
+        "     \"name\" : \"value\"," +
+        "     \"type\" : \"int\"" +
+        "   }, {" +
+        "     \"name\" : \"next\"," +
+        "     \"type\" : [ \"null\", \"recursive_record\" ]" +
+        "   } ]" +
+        " }";
+    final private String recursiveRecordInRecord =
+        " {" +
+        "   \"type\" : \"record\"," +
+        "   \"name\" : \"recursive_record\"," +
+        "   \"fields\" : [ {" +
+        "     \"name\" : \"id\"," +
+        "     \"type\" : \"int\"" +
+        "   }, {" +
+        "     \"name\" : \"nested\"," +
+        "     \"type\" : [ \"null\", {" +
+        "       \"type\" : \"record\"," +
+        "       \"name\" : \"nested_record\"," +
+        "       \"fields\" : [ {" +
+        "         \"name\" : \"value1\"," +
+        "         \"type\" : \"string\"" +
+        "       }, {" +
+        "         \"name\" : \"next\"," +
+        "         \"type\" : \"recursive_record\"" +
+        "       }, {" +
+        "         \"name\" : \"value2\"," +
+        "         \"type\" : \"string\"" +
+        "       } ]" +
+        "     } ]" +
+        "   } ]" +
+        " }";
 
     @BeforeClass
     public static void setup() throws ExecException {
@@ -100,40 +171,287 @@ public class TestAvroStorage {
     }
 
     @Test
-    public void testRecursiveSchema() throws IOException {
-        // Verify that a FrontendException is thrown if schema is recursive.
-        String output= outbasedir + "testRecursiveSchema";
+    public void testRecursiveRecordInMap() throws IOException {
+        // Verify that recursive records in map can be loaded/saved.
+        String output= outbasedir + "testRecursiveRecordInMap";
+        String expected = testRecursiveRecordInMap;
         deleteDirectory(new File(output));
         String [] queries = {
-          " in = LOAD '" + testRecursiveSchemaFile +
+          " in = LOAD '" + testRecursiveRecordInMap +
               "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
           " STORE in INTO '" + output +
-              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'no_schema_check'," +
+              " 'schema', '" + recursiveRecordInMap + "' );"
+           };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+
+    @Test
+    public void testRecursiveRecordInArray() throws IOException {
+        // Verify that recursive records in array can be loaded/saved.
+        String output= outbasedir + "testRecursiveRecordInArray";
+        String expected = testRecursiveRecordInArray;
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + testRecursiveRecordInArray +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+          " STORE in INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'no_schema_check'," +
+              " 'schema', '" + recursiveRecordInArray + "' );"
+           };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+
+    @Test
+    public void testRecursiveRecordInUnion() throws IOException {
+        // Verify that recursive records in union can be loaded/saved.
+        String output= outbasedir + "testRecursiveRecordInUnion";
+        String expected = testRecursiveRecordInUnion;
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + testRecursiveRecordInUnion +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+          " STORE in INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'no_schema_check'," +
+              " 'schema', '" + recursiveRecordInUnion + "' );"
+           };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+
+    @Test
+    public void testRecursiveRecordInRecord() throws IOException {
+        // Verify that recursive records in record can be loaded/saved.
+        String output= outbasedir + "testRecursiveRecordInRecord";
+        String expected = testRecursiveRecordInRecord;
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + testRecursiveRecordInRecord +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+          " STORE in INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'no_schema_check'," +
+              " 'schema', '" + recursiveRecordInRecord + "' );"
+           };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+
+    @Test
+    public void testRecursiveRecordWithSame() throws IOException {
+        // Verify that avro schema can be specified via an external avro file
+        // instead of a json string.
+        String output= outbasedir + "testRecursiveRecordWithSame";
+        String expected = testRecursiveRecordInUnion;
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + testRecursiveRecordInUnion +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+          " STORE in INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'no_schema_check'," +
+              " 'same', '" + testRecursiveRecordInUnion + "' );"
+           };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+
+    @Test
+    public void testRecursiveRecordReference1() throws IOException {
+        // The relation 'in' looks like this:
+        //  (1,(2,(3,)))
+        //  (2,(3,))
+        //  (3,)
+        // $0 looks like this:
+        //  (1)
+        //  (2)
+        //  (3)
+        // Avro file stored after filtering out nulls looks like this:
+        //  1
+        //  2
+        //  3
+        String output= outbasedir + "testRecursiveRecordReference1";
+        String expected = basedir + "expected_testRecursiveRecordReference1.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + testRecursiveRecordInUnion +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+          " first = FOREACH in GENERATE $0 AS value;",
+          " filtered = FILTER first BY value is not null;",
+          " STORE filtered INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'no_schema_check'," +
+              " 'schema', '\"int\"' );"
+           };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+
+    @Test
+    public void testRecursiveRecordReference2() throws IOException {
+        // The relation 'in' looks like this:
+        //  (1,(2,(3,)))
+        //  (2,(3,))
+        //  (3,)
+        // $1.$0 looks like this:
+        //  (2)
+        //  (3)
+        //  ()
+        // Avro file stored after filtering out nulls looks like this:
+        //  2
+        //  3
+        String output= outbasedir + "testRecursiveRecordReference2";
+        String expected = basedir + "expected_testRecursiveRecordReference2.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + testRecursiveRecordInUnion +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+          " second = FOREACH in GENERATE $1.$0 AS value;",
+          " filtered = FILTER second BY value is not null;",
+          " STORE filtered INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'no_schema_check'," +
+              " 'schema', '\"int\"' );"
+           };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+
+    @Test
+    public void testRecursiveRecordReference3() throws IOException {
+        // The relation 'in' looks like this:
+        //  (1,(2,(3,)))
+        //  (2,(3,))
+        //  (3,)
+        // $1.$1.$0 looks like this:
+        //  (3)
+        //  ()
+        //  ()
+        // Avro file stored after filtering out nulls looks like this:
+        //  3
+        String output= outbasedir + "testRecursiveRecordReference3";
+        String expected = basedir + "expected_testRecursiveRecordReference3.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + testRecursiveRecordInUnion +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+          " third = FOREACH in GENERATE $1.$1.$0 AS value;",
+          " filtered = FILTER third BY value is not null;",
+          " STORE filtered INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'no_schema_check'," +
+              " 'schema', '\"int\"' );"
+           };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+
+    @Test
+    public void testRecursiveRecordWithNoAvroSchema() throws IOException {
+        // Verify that recursive records cannot be stored,
+        // if no avro schema is specified either via 'schema' or 'same'.
+        String output= outbasedir + "testRecursiveRecordWithNoAvroSchema";
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + testRecursiveRecordInUnion +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+          " STORE in INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'no_schema_check' );"
+           };
+        // Since Avro schema is not specified via the 'schema' parameter, it is
+        // derived from Pig schema. Job is expected to fail because this derived
+        // Avro schema (bytes) is not compatible with data (tuples).
+        testAvroStorage(true, queries);
+    }
+
+    @Test
+    public void testRecursiveRecordWithSchemaCheck() throws IOException {
+        // Verify that recursive records cannot be stored if schema check is enbled.
+        String output= outbasedir + "testRecursiveWithSchemaCheck";
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + testRecursiveRecordInUnion +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+          " STORE in INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'schema', '" + recursiveRecordInUnion + "' );"
            };
         try {
             testAvroStorage(queries);
-            Assert.fail();
+            Assert.fail("Negative test to test an exception. Should not be succeeding!");
+        } catch (IOException e) {
+            // An IOException is thrown by AvroStorage during schema check due to incompatible
+            // data types.
+            assertTrue(e.getMessage().contains("bytearray is not compatible with avro"));
+        }
+    }
+
+    @Test
+    public void testRecursiveRecordWithSchemaFile() throws IOException {
+        // Verify that recursive records cannot be stored if avro schema is specified by 'schema_file'.
+        String output= outbasedir + "testRecursiveWithSchemaFile";
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + testRecursiveRecordInUnion +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+          " STORE in INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'no_schema_check'," +
+              " 'schema_file', '" + testRecursiveRecordInUnionSchema + "' );"
+           };
+        try {
+            testAvroStorage(queries);
+            Assert.fail("Negative test to test an exception. Should not be succeeding!");
         } catch (FrontendException e) {
-            // The IOException thrown by AvroStorage for recursive schema is caught
+            // The IOException thrown by AvroSchemaManager for recursive record is caught
             // by the Pig frontend, and FrontendException is re-thrown.
-            assertTrue(e.getMessage().contains("Cannot get schema"));
+            assertTrue(e.getMessage().contains("could not instantiate 'org.apache.pig.piggybank.storage.avro.AvroStorage'"));
+        }
+    }
+
+    @Test
+    public void testRecursiveRecordWithData() throws IOException {
+        // Verify that recursive records cannot be stored if avro schema is specified by 'data'.
+        String output= outbasedir + "testRecursiveWithData";
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + testRecursiveRecordInUnion +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+          " STORE in INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+              " 'no_schema_check'," +
+              " 'data', '" + testRecursiveRecordInUnion + "' );"
+           };
+        try {
+            testAvroStorage(queries);
+            Assert.fail("Negative test to test an exception. Should not be succeeding!");
+        } catch (FrontendException e) {
+            // The IOException thrown by AvroSchemaManager for recursive record is caught
+            // by the Pig frontend, and FrontendException is re-thrown.
+            assertTrue(e.getMessage().contains("could not instantiate 'org.apache.pig.piggybank.storage.avro.AvroStorage'"));
         }
     }
 
     @Test
-    public void testGenericUnionSchema() throws IOException {
+    public void testGenericUnion() throws IOException {
         // Verify that a FrontendException is thrown if schema has generic union.
-        String output= outbasedir + "testGenericUnionSchema";
+        String output= outbasedir + "testGenericUnion";
         deleteDirectory(new File(output));
         String [] queries = {
-          " in = LOAD '" + testGenericUnionSchemaFile +
+          " in = LOAD '" + testGenericUnionFile +
               "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
           " STORE in INTO '" + output +
               "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
            };
         try {
             testAvroStorage(queries);
-            Assert.fail();
+            Assert.fail("Negative test to test an exception. Should not be succeeding!");
         } catch (FrontendException e) {
             // The IOException thrown by AvroStorage for generic union is caught
             // by the Pig frontend, and FrontendException is re-thrown.
@@ -243,7 +561,7 @@ public class TestAvroStorage {
             };
         try {
             testAvroStorage(queries);
-            Assert.fail();
+            Assert.fail("Negative test to test an exception. Should not be succeeding!");
         } catch (JobCreationException e) {
             // The IOException thrown by AvroStorage for input file not found is catched
             // by the Pig backend, and JobCreationException (a subclass of IOException)
@@ -256,9 +574,9 @@ public class TestAvroStorage {
     public void testArrayDefault() throws IOException {
         String output= outbasedir + "testArrayDefault";
         String expected = basedir + "expected_testArrayDefault.avro";
-        
+
         deleteDirectory(new File(output));
-        
+
         String [] queries = {
            " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
            " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
@@ -282,7 +600,7 @@ public class TestAvroStorage {
         testAvroStorage( queries);
         verifyResults(output, expected);
     }
-    
+
     @Test
     public void testArrayWithNotNull() throws IOException {
         String output= outbasedir + "testArrayWithNotNull";
@@ -297,7 +615,7 @@ public class TestAvroStorage {
         testAvroStorage( queries);
         verifyResults(output, expected);
     }
-    
+
     @Test
     public void testArrayWithSame() throws IOException {
         String output= outbasedir + "testArrayWithSame";
@@ -317,9 +635,9 @@ public class TestAvroStorage {
     public void testArrayWithSnappyCompression() throws IOException {
         String output= outbasedir + "testArrayWithSnappyCompression";
         String expected = basedir + "expected_testArrayDefault.avro";
-      
+
         deleteDirectory(new File(output));
-      
+
         Properties properties = new Properties();
         properties.setProperty("mapred.output.compress", "true");
         properties.setProperty("mapred.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec");
@@ -464,7 +782,7 @@ public class TestAvroStorage {
         testAvroStorage( queries);
         verifyResults(output, expected);
     }
-    
+
     @Test
     public void testSingleFieldTuples() throws IOException {
         String output= outbasedir + "testSingleFieldTuples";
@@ -478,7 +796,7 @@ public class TestAvroStorage {
         };
         testAvroStorage( queries);
     }
-    
+
     @Test
     public void testFileWithNoExtension() throws IOException {
         String output= outbasedir + "testFileWithNoExtension";
@@ -527,25 +845,37 @@ public class TestAvroStorage {
         if ( path.exists()) {
             File [] files = path.listFiles();
             for (File file: files) {
-                if (file.isDirectory()) 
+                if (file.isDirectory())
                     deleteDirectory(file);
                 file.delete();
             }
         }
     }
-    
+
     private void testAvroStorage(String ...queries) throws IOException {
+        testAvroStorage(false, queries);
+    }
+
+    private void testAvroStorage(boolean expectedToFail, String ...queries) throws IOException {
         pigServerLocal.setBatchOn();
         for (String query: queries){
-            if (query != null && query.length() > 0)
+            if (query != null && query.length() > 0) {
                 pigServerLocal.registerQuery(query);
+            }
         }
-        List<ExecJob> jobs = pigServerLocal.executeBatch();
-        for (ExecJob job : jobs) {
-            assertEquals(JOB_STATUS.COMPLETED, job.getStatus());
+        int numOfFailedJobs = 0;
+        for (ExecJob job : pigServerLocal.executeBatch()) {
+            if (job.getStatus().equals(JOB_STATUS.FAILED)) {
+                numOfFailedJobs++;
+            }
+        }
+        if (expectedToFail) {
+            assertTrue("There was no failed job!", numOfFailedJobs > 0);
+        } else {
+            assertTrue("There was a failed job!", numOfFailedJobs == 0);
         }
     }
-    
+
     private void verifyResults(String outPath, String expectedOutpath) throws IOException {
         verifyResults(outPath, expectedOutpath, null);
     }
@@ -554,17 +884,17 @@ public class TestAvroStorage {
         // Seems compress for Avro is broken in 23. Skip this test and open Jira PIG-
         if (Util.isHadoop23())
             return;
-        
-        FileSystem fs = FileSystem.getLocal(new Configuration()) ; 
-        
+
+        FileSystem fs = FileSystem.getLocal(new Configuration()) ;
+
         /* read in expected results*/
         Set<Object> expected = getExpected (expectedOutpath);
-        
+
         /* read in output results and compare */
         Path output = new Path(outPath);
         assertTrue("Output dir does not exists!", fs.exists(output)
                 && fs.getFileStatus(output).isDir());
-        
+
         Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
         assertTrue("Split field dirs not found!", paths != null);
 
@@ -574,34 +904,34 @@ public class TestAvroStorage {
                   files != null);
           for (Path filePath : files) {
             assertTrue("This shouldn't be a directory", fs.isFile(filePath));
-            
+
             GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
-            
+
             DataFileStream<Object> in = new DataFileStream<Object>(
                                             fs.open(filePath), reader);
             assertEquals("codec", expectedCodec, in.getMetaString("avro.codec"));
             int count = 0;
             while (in.hasNext()) {
                 Object obj = in.next();
-              //System.out.println("obj = " + (GenericData.Array<Float>)obj);
-              assertTrue("Avro result object found that's not expected: " + obj, expected.contains(obj));
-              count++;
-            }        
+                //System.out.println("obj = " + (GenericData.Array<Float>)obj);
+                assertTrue("Avro result object found that's not expected: " + obj, expected.contains(obj));
+                count++;
+            }
             in.close();
             assertEquals(expected.size(), count);
           }
         }
       }
-    
+
     private Set<Object> getExpected (String pathstr ) throws IOException {
-        
+
         Set<Object> ret = new HashSet<Object>();
-        FileSystem fs = FileSystem.getLocal(new Configuration())  ; 
-                                    
+        FileSystem fs = FileSystem.getLocal(new Configuration());
+
         /* read in output results and compare */
         Path output = new Path(pathstr);
         assertTrue("Expected output does not exists!", fs.exists(output));
-    
+
         Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
         assertTrue("Split field dirs not found!", paths != null);
 
@@ -610,15 +940,15 @@ public class TestAvroStorage {
             assertTrue("No files found for path: " + path.toUri().getPath(), files != null);
             for (Path filePath : files) {
                 assertTrue("This shouldn't be a directory", fs.isFile(filePath));
-        
+
                 GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
-        
+
                 DataFileStream<Object> in = new DataFileStream<Object>(fs.open(filePath), reader);
-        
+
                 while (in.hasNext()) {
                     Object obj = in.next();
                     ret.add(obj);
-                }        
+                }
                 in.close();
             }
         }

Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java?rev=1374925&r1=1374924&r2=1374925&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java Mon Aug 20 07:14:54 2012
@@ -38,44 +38,6 @@ public class TestAvroStorageUtils {
 
     public final String RECORD_BEGINNING = TYPE_RECORD + NAME_NODE + FIELDS_VALUE;
 
-    @Test
-    public void canIdentifyRecursiveRecords() throws IOException {
-        final String str1 = RECORD_BEGINNING +
-        		                     "{ \"name\": \"next\", \"type\": [\"null\", \"Node\"] } ] }";
-        Schema s = Schema.parse(str1);
-        assertTrue(AvroStorageUtils.containsRecursiveRecord(s));
-
-        final String str2 = "{\"type\": \"array\", \"items\": "  + str1 + "}";
-        s = Schema.parse(str2);
-        assertTrue(AvroStorageUtils.containsRecursiveRecord(s));
-
-        final String str3 ="[\"null\", " + str2 + "]";
-        s = Schema.parse(str3);
-        assertTrue(AvroStorageUtils.containsRecursiveRecord(s));
-        
-    }
-
-    @Test
-    public void canIdentifyNonRecursiveRecords() throws IOException {
-        final String non = RECORD_BEGINNING + "{ \"name\": \"next\", \"type\": [\"null\", \"string\"] } ] }";
-        assertFalse(AvroStorageUtils.containsRecursiveRecord(Schema.parse(non)));
-        
-        final String s1 =
-            "{ \"type\":\"record\", \"name\":\"Event\", " +
-               "\"fields\":[ " +
-                       "{\"name\":\"f1\", " +
-                         "\"type\":{ \"type\":\"record\",\"name\":\"Entity\", " +
-                                          "\"fields\":[{\"name\":\"type\", \"type\": \"string\"}," +
-                                                             "{\"name\":\"value\",\"type\": \"int\"}] " +
-                                         "} }, " +
-                       " {\"name\":\"f2\",\"type\": \"Entity\"}, " +
-                       " {\"name\":\"f3\",\"type\": \"Entity\"} " +
-                       "] }";
-            Schema schema1 = Schema.parse(s1);
-            assertFalse(AvroStorageUtils.containsRecursiveRecord(schema1));
-
-    }
-
      @Test
      public void testGenericUnion() throws IOException {
 

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference1.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference1.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference1.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference2.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference2.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference2.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference3.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference3.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference3.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_generic_union.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_generic_union.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_generic_union.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_array.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_array.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_array.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_map.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_map.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_map.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_record.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_record.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_record.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avsc
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avsc?rev=1374925&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avsc (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avsc Mon Aug 20 07:14:54 2012
@@ -0,0 +1,11 @@
+{
+  "type" : "record",
+  "name" : "recursive_record",
+  "fields" : [ {
+    "name" : "value",
+    "type" : "int"
+  }, {
+    "name" : "next",
+    "type" : [ "null", "recursive_record" ]
+  } ]
+}