You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2012/10/12 00:06:33 UTC

svn commit: r1397333 - in /pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/ contrib/piggybank/java/src/test/java/org/apache/pig/pi...

Author: sms
Date: Thu Oct 11 22:06:32 2012
New Revision: 1397333

URL: http://svn.apache.org/viewvc?rev=1397333&view=rev
Log:
PIG-2579: Support for multiple input schemas in AvroStorage (cheolsoo via sms)

Added:
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_double.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_enum.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_float.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_int.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_long.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_string.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_double.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_enum.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_float.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_int.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_long.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_string.avro   (with props)
Removed:
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_generic_union_schema.avro
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_schema.avro
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1397333&r1=1397332&r2=1397333&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Oct 11 22:06:32 2012
@@ -25,6 +25,8 @@ PIG-1891 Enable StoreFunc to make intell
 
 IMPROVEMENTS
 
+PIG-2579: Support for multiple input schemas in AvroStorage (cheolsoo via sms)
+
 PIG-2946: Documentation of "history" and "clear" commands (xalan via azaroth)
 
 PIG-2877: Make SchemaTuple work in foreach (and thus, in loads) (jcoveney)

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1397333&r1=1397332&r2=1397333&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Thu Oct 11 22:06:32 2012
@@ -71,6 +71,9 @@ public class AvroStorage extends FileInp
     private static final String AVRO_OUTPUT_SCHEMA_PROPERTY = "avro_output_schema";
     private static final String SCHEMA_DELIM = "#";
     private static final String SCHEMA_KEYVALUE_DELIM = "@";
+    private static final String NO_SCHEMA_CHECK = "no_schema_check";
+    private static final String IGNORE_BAD_FILES = "ignore_bad_files";
+    private static final String MULTIPLE_SCHEMAS = "multiple_schemas";
 
     /* FIXME: we use this variable to distinguish schemas specified
      * by different AvroStorage calls and will remove this once
@@ -87,6 +90,18 @@ public class AvroStorage extends FileInp
     private PigAvroRecordReader reader = null;   /* avro record writer */
     private Schema inputAvroSchema = null;    /* input avro schema */
 
+    /* if multiple avro record schemas are merged, this map associates each input
+     * record with a remapping of its fields relative to the merged schema. please
+     * see AvroStorageUtils.getSchemaToMergedSchemaMap() for more details.
+     */
+    private Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap = null;
+
+    /* whether input avro files have the same schema or not. If input avro files
+     * do not have the same schema, we merge these schemas into a single schema
+     * and derive the pig schema from it.
+     */
+    private boolean useMultipleSchemas = false;
+
     private boolean checkSchema = true; /*whether check schema of input directories*/
     private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
 
@@ -108,16 +123,17 @@ public class AvroStorage extends FileInp
      * @throws ParseException
      */
     public AvroStorage(String[] parts) throws IOException, ParseException {
-
         outputAvroSchema = null;
         nullable = true;
         checkSchema = true;
 
         if (parts.length == 1
-                && !parts[0].equalsIgnoreCase("no_schema_check")
-                && !parts[0].equalsIgnoreCase("ignore_bad_files")) {
-            /* If one parameter is given, and that is neither 'no_schema_check'
-             * nor 'ignore_bad_files', then it must be a json string.
+                && !parts[0].equalsIgnoreCase(NO_SCHEMA_CHECK)
+                && !parts[0].equalsIgnoreCase(IGNORE_BAD_FILES)
+                && !parts[0].equalsIgnoreCase(MULTIPLE_SCHEMAS)) {
+            /* If one parameter is given, and that is not 'no_schema_check',
+             * 'ignore_bad_files', or 'multiple_schemas', then it must be a
+             * json string.
              */
             init(parseJsonString(parts[0]));
         } else {
@@ -126,12 +142,8 @@ public class AvroStorage extends FileInp
         }
     }
 
-
     /**
      * Set input location and obtain input schema.
-     *
-     * FIXME: currently we assume all avro files under the same "location"
-     * share the same schema and will throw exception if not.
      */
     @Override
     public void setLocation(String location, Job job) throws IOException {
@@ -139,20 +151,44 @@ public class AvroStorage extends FileInp
             return;
         }
         Set<Path> paths = new HashSet<Path>();
-        if (AvroStorageUtils.getAllSubDirs(new Path(location), job, paths)) {
+        Configuration conf = job.getConfiguration();
+        if (AvroStorageUtils.getAllSubDirs(new Path(location), conf, paths)) {
+            setInputAvroSchema(paths, conf);
             FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
-            inputAvroSchema = getAvroSchema(location, job);
         } else {
             throw new IOException("Input path \'" + location + "\' is not found");
         }
     }
 
-    protected Schema getAvroSchema(String location, Job job) throws IOException {
-        Path path = AvroStorageUtils.getConcretePathFromGlob(location, job);
-        if (path == null) {
+    /**
+     * Set input avro schema. If the 'multiple_schemas' option is enabled, we merge multiple
+     * schemas and use that merged schema; otherwise, we simply use the schema from the first
+     * file in the paths set.
+     *
+     * @param paths  set of input files
+     * @param conf  configuration
+     * @return avro schema
+     * @throws IOException
+     */
+    protected void setInputAvroSchema(Set<Path> paths, Configuration conf) throws IOException {
+        inputAvroSchema = useMultipleSchemas ? getMergedSchema(paths, conf)
+                                             : getAvroSchema(paths, conf);
+    }
+
+    /**
+     * Get avro schema of first input file that matches the location pattern.
+     *
+     * @param paths  set of input files
+     * @param conf  configuration
+     * @return avro schema
+     * @throws IOException
+     */
+    protected Schema getAvroSchema(Set<Path> paths, Configuration conf) throws IOException {
+        if (paths == null || paths.isEmpty()) {
             return null;
         }
-        FileSystem fs = FileSystem.get(path.toUri(), job.getConfiguration());
+        Path path = paths.iterator().next();
+        FileSystem fs = FileSystem.get(path.toUri(), conf);
         return getAvroSchema(path, fs);
     }
 
@@ -205,11 +241,35 @@ public class AvroStorage extends FileInp
         if (schema == null)
             System.err.println("Cannot get avro schema! Input path " + path + " might be empty.");
 
-        System.err.println(schema.toString());
         return schema;
     }
 
     /**
+     * Merge multiple input avro schemas into one. Note that we can't merge arbitrary schemas.
+     * Please see AvroStorageUtils.mergeSchema() for what's allowed and what's not allowed.
+     *
+     * @param paths  set of input files
+     * @param conf  configuration
+     * @return avro schema
+     * @throws IOException
+     */
+    protected Schema getMergedSchema(Set<Path> paths, Configuration conf) throws IOException {
+        Schema result = null;
+        Map<Path, Schema> mergedFiles = new HashMap<Path, Schema>();
+        for (Path path : paths) {
+            FileSystem fs = FileSystem.get(path.toUri(), conf);
+            Schema schema = getSchema(path, fs);
+            result = AvroStorageUtils.mergeSchema(result, schema);
+            mergedFiles.put(path, schema);
+        }
+        // schemaToMergedSchemaMap is only needed when merging multiple records.
+        if (mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
+            schemaToMergedSchemaMap = AvroStorageUtils.getSchemaToMergedSchemaMap(result, mergedFiles);
+        }
+        return result;
+    }
+
+    /**
      * This method is called by {@link #getAvroSchema}. The default implementation
      * returns the schema of an avro file; or the schema of the last file in a first-level
      * directory (it does not contain sub-directories).
@@ -261,10 +321,21 @@ public class AvroStorage extends FileInp
     @Override
     public InputFormat getInputFormat() throws IOException {
         AvroStorageLog.funcCall("getInputFormat");
-        if(inputAvroSchema != null)
-            return new PigAvroInputFormat(inputAvroSchema, ignoreBadFiles);
-        else
-            return new TextInputFormat();
+        InputFormat result = null;
+        if(inputAvroSchema != null) {
+            if (useMultipleSchemas) {
+                // When merging multiple avro schemas, we use embedded schemas
+                // to load input files. So no input avro schema is passed.
+                result = new PigAvroInputFormat(
+                        null, ignoreBadFiles, schemaToMergedSchemaMap);
+            } else {
+                result = new PigAvroInputFormat(
+                        inputAvroSchema, ignoreBadFiles, schemaToMergedSchemaMap);
+            }
+        } else {
+            result = new TextInputFormat();
+        }
+        return result;
     }
 
     @SuppressWarnings("rawtypes")
@@ -295,13 +366,16 @@ public class AvroStorage extends FileInp
      * PigSchema.
      */
     @Override
-    public ResourceSchema getSchema(String location, Job job)
-                                    throws IOException {
+    public ResourceSchema getSchema(String location, Job job) throws IOException {
 
         /* get avro schema */
         AvroStorageLog.funcCall("getSchema");
         if (inputAvroSchema == null) {
-            inputAvroSchema = getAvroSchema(location, job);
+            Set<Path> paths = new HashSet<Path>();
+            Configuration conf = job.getConfiguration();
+            if (AvroStorageUtils.getAllSubDirs(new Path(location), conf, paths)) {
+                setInputAvroSchema(paths, conf);
+            }
         }
         if(inputAvroSchema != null) {
             AvroStorageLog.details( "avro input schema:"  + inputAvroSchema);
@@ -310,11 +384,12 @@ public class AvroStorage extends FileInp
             ResourceSchema pigSchema = AvroSchema2Pig.convert(inputAvroSchema);
             AvroStorageLog.details("pig input schema:" + pigSchema);
             if (pigSchema.getFields().length == 1){
-            	pigSchema = pigSchema.getFields()[0].getSchema();
+                pigSchema = pigSchema.getFields()[0].getSchema();
             }
             return pigSchema;
-        } else
+        } else {
             return null;
+        }
     }
 
     @Override
@@ -380,14 +455,18 @@ public class AvroStorage extends FileInp
 
         for (int i = 0; i < parts.length; ) {
             String name = parts[i].trim();
-            if (name.equalsIgnoreCase("no_schema_check")) {
+            if (name.equalsIgnoreCase(NO_SCHEMA_CHECK)) {
                 checkSchema = false;
                 /* parameter only, so increase iteration counter by 1 */
                 i += 1;
-            } else if (name.equalsIgnoreCase("ignore_bad_files")) {
+            } else if (name.equalsIgnoreCase(IGNORE_BAD_FILES)) {
                 ignoreBadFiles = true;
                 /* parameter only, so increase iteration counter by 1 */
                 i += 1;
+            } else if (name.equalsIgnoreCase(MULTIPLE_SCHEMAS)) {
+                useMultipleSchemas = true;
+                /* parameter only, so increase iteration counter by 1 */
+                i += 1;
             } else {
                 String value = parts[i+1].trim();
                 if (name.equalsIgnoreCase("debug")
@@ -572,7 +651,6 @@ public class AvroStorage extends FileInp
                                                 : append;
         property.setProperty(AVRO_OUTPUT_SCHEMA_PROPERTY, newSchemaStr);
         AvroStorageLog.details("New schemas=" + newSchemaStr);
-
     }
 
     private String getSchemaKey() {

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java?rev=1397333&r1=1397332&r2=1397333&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java Thu Oct 11 22:06:32 2012
@@ -22,9 +22,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.net.URI;
 import org.apache.avro.Schema;
@@ -95,20 +98,16 @@ public class AvroStorageUtils {
     /**
      * get input paths to job config
      */
-    public static boolean addInputPaths(String pathString, Job job)
-        throws IOException
-    {
+    public static boolean addInputPaths(String pathString, Job job) throws IOException {
       Configuration conf = job.getConfiguration();
       FileSystem fs = FileSystem.get(conf);
       HashSet<Path> paths = new  HashSet<Path>();
-      if (getAllSubDirs(new Path(pathString), job, paths))
-      {
+      if (getAllSubDirs(new Path(pathString), conf, paths)) {
         paths.addAll(Arrays.asList(FileInputFormat.getInputPaths(job)));
         FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
         return true;
       }
       return false;
-
     }
 
     /**
@@ -116,8 +115,8 @@ public class AvroStorageUtils {
      *
      * @throws IOException
      */
-    static boolean getAllSubDirs(Path path, Job job, Set<Path> paths) throws IOException {
-        FileSystem fs = FileSystem.get(path.toUri(), job.getConfiguration());
+    public static boolean getAllSubDirs(Path path, Configuration conf, Set<Path> paths) throws IOException {
+        FileSystem fs = FileSystem.get(path.toUri(), conf);
         FileStatus[] matchedFiles = fs.globStatus(path, PATH_FILTER);
         if (matchedFiles == null || matchedFiles.length == 0) {
             return false;
@@ -125,7 +124,7 @@ public class AvroStorageUtils {
         for (FileStatus file : matchedFiles) {
             if (file.isDir()) {
                 for (FileStatus sub : fs.listStatus(file.getPath())) {
-                    getAllSubDirs(sub.getPath(), job, paths);
+                    getAllSubDirs(sub.getPath(), conf, paths);
                 }
             } else {
                 AvroStorageLog.details("Add input file:" + file);
@@ -135,21 +134,6 @@ public class AvroStorageUtils {
         return true;
     }
 
-    /**
-     * Return the first path that matches the glob pattern in the file system.
-     *
-     * @throws IOException
-     */
-    public static Path getConcretePathFromGlob(String pattern, Job job) throws IOException {
-        Path path = new Path(pattern);
-        FileSystem fs = FileSystem.get(path.toUri(), job.getConfiguration());
-        FileStatus[] matchedFiles = fs.globStatus(path, PATH_FILTER);
-        if (matchedFiles == null || matchedFiles.length == 0) {
-            return null;
-        }
-        return matchedFiles[0].getPath();
-    }
-
     /** check whether there is NO directory in the input file (status) list*/
     public static boolean noDir(FileStatus [] ss) {
         for (FileStatus s : ss) {
@@ -182,6 +166,291 @@ public class AvroStorageUtils {
     }
 
     /**
+     * This method merges two primitive avro types into one. This method must
+     * be used only to merge two primitive types. For complex types, null will
+     * be returned unless they are both the same type. Also note that not every
+     * primitive type can be merged. For types that cannot be merged, null is
+     * returned.
+     *
+     * @param x first avro type to merge
+     * @param y second avro type to merge
+     * @return merged avro type
+     */
+    private static Schema.Type mergeType(Schema.Type x, Schema.Type y) {
+        if (x.equals(y)) {
+            return x;
+        }
+
+        switch(x) {
+            case INT:
+                switch (y) {
+                    case LONG:
+                        return Schema.Type.LONG;
+                    case FLOAT:
+                        return Schema.Type.FLOAT;
+                    case DOUBLE:
+                        return Schema.Type.DOUBLE;
+                    case ENUM:
+                    case STRING:
+                        return Schema.Type.STRING;
+                }
+
+            case LONG:
+                switch (y) {
+                    case INT:
+                        return Schema.Type.LONG;
+                    case FLOAT:
+                        return Schema.Type.FLOAT;
+                    case DOUBLE:
+                        return Schema.Type.DOUBLE;
+                    case ENUM:
+                    case STRING:
+                        return Schema.Type.STRING;
+                }
+
+            case FLOAT:
+                switch (y) {
+                    case INT:
+                    case LONG:
+                        return Schema.Type.FLOAT;
+                    case DOUBLE:
+                        return Schema.Type.DOUBLE;
+                    case ENUM:
+                    case STRING:
+                        return Schema.Type.STRING;
+                }
+
+            case DOUBLE:
+                switch (y) {
+                    case INT:
+                    case LONG:
+                    case FLOAT:
+                        return Schema.Type.DOUBLE;
+                    case ENUM:
+                    case STRING:
+                        return Schema.Type.STRING;
+                }
+
+            case ENUM:
+                switch (y) {
+                    case INT:
+                    case LONG:
+                    case FLOAT:
+                    case DOUBLE:
+                    case STRING:
+                        return Schema.Type.STRING;
+                }
+
+            case STRING:
+                switch (y) {
+                    case INT:
+                    case LONG:
+                    case FLOAT:
+                    case DOUBLE:
+                    case ENUM:
+                        return Schema.Type.STRING;
+                }
+        }
+
+        // else return just null in particular, bytes and boolean
+        return null;
+    }
+
+    /**
+     * This method merges two avro schemas into one. Note that not every avro schema
+     * can be merged. For complex types to be merged, they must be the same type.
+     * For primitive types to be merged, they must meet certain conditions. For
+     * schemas that cannot be merged, an exception is thrown.
+     *
+     * @param x first avro schema to merge
+     * @param y second avro schema to merge
+     * @return merged avro schema
+     * @throws IOException
+     */
+    public static Schema mergeSchema(Schema x, Schema y) throws IOException {
+        if (x == null) {
+            return y;
+        }
+        if (y == null) {
+            return x;
+        }
+        if (x.equals(y)) {
+            return x;
+        }
+
+        Schema.Type xType = x.getType();
+        Schema.Type yType = y.getType();
+
+        switch (xType) {
+            case RECORD:
+                if (!yType.equals(Schema.Type.RECORD)) {
+                   throw new IOException("Cannot merge " + xType + " with " + yType);
+                }
+
+                List<Schema.Field> xFields = x.getFields();
+                List<Schema.Field> yFields = y.getFields();
+
+                // LinkedHashMap is used to keep fields in insertion order.
+                // It's convenient for testing to have determinitic behaviors.
+                Map<String, Schema> fieldName2Schema =
+                        new LinkedHashMap<String, Schema>(xFields.size() + yFields.size());
+
+                for (Schema.Field xField : xFields) {
+                    fieldName2Schema.put(xField.name(), xField.schema());
+                }
+                for (Schema.Field yField : yFields) {
+                    String name = yField.name();
+                    Schema currSchema = yField.schema();
+                    Schema prevSchema = fieldName2Schema.get(name);
+                    if (prevSchema == null) {
+                        fieldName2Schema.put(name, currSchema);
+                    } else {
+                        fieldName2Schema.put(name, mergeSchema(prevSchema, currSchema));
+                    }
+                }
+
+                List<Schema.Field> mergedFields = new ArrayList<Schema.Field>(fieldName2Schema.size());
+                for (Entry<String, Schema> entry : fieldName2Schema.entrySet()) {
+                    mergedFields.add(new Schema.Field(entry.getKey(), entry.getValue(), "auto-gen", null));
+                }
+                Schema result = Schema.createRecord(
+                        "merged", null, "merged schema (generated by AvroStorage)", false);
+                result.setFields(mergedFields);
+                return result;
+
+            case ARRAY:
+                if (!yType.equals(Schema.Type.ARRAY)) {
+                    throw new IOException("Cannot merge " + xType + " with " + yType);
+                }
+                return Schema.createArray(mergeSchema(x.getElementType(), y.getElementType()));
+
+            case MAP:
+                if (!yType.equals(Schema.Type.MAP)) {
+                    throw new IOException("Cannot merge " + xType + " with " + yType);
+                }
+                return Schema.createMap(mergeSchema(x.getValueType(), y.getValueType()));
+
+            case UNION:
+                if (!yType.equals(Schema.Type.UNION)) {
+                    throw new IOException("Cannot merge " + xType + " with " + yType);
+                }
+                List<Schema> xTypes = x.getTypes();
+                List<Schema> yTypes = y.getTypes();
+
+                List<Schema> unionTypes = new ArrayList<Schema>();
+                for (Schema xSchema : xTypes) {
+                    unionTypes.add(xSchema);
+                }
+                for (Schema ySchema : yTypes) {
+                    if (!unionTypes.contains(ySchema)) {
+                        unionTypes.add(ySchema);
+                    }
+                }
+                return Schema.createUnion(unionTypes);
+
+            case FIXED:
+                if (!yType.equals(Schema.Type.FIXED)) {
+                    throw new IOException("Cannot merge " + xType + " with " + yType);
+                }
+                int xSize = x.getFixedSize();
+                int ySize = y.getFixedSize();
+                if (xSize != ySize) {
+                    throw new IOException("Cannot merge FIXED types with different sizes: " + xSize + " and " + ySize);
+                }
+                return Schema.createFixed("merged", null, "merged schema (generated by AvroStorage)", xSize);
+
+            default: // primitive types
+                Schema.Type mergedType = mergeType(xType ,yType);
+                if (mergedType == null) {
+                    throw new IOException("Cannot merge " + xType + " with " + yType);
+                }
+                return Schema.create(mergedType);
+        }
+    }
+
+    /**
+     * When merging multiple avro record schemas, we build a map (schemaToMergedSchemaMap)
+     * to associate each input record with a remapping of its fields relative to the merged
+     * schema. Take the following two schemas for example:
+     *
+     * // path1
+     * { "type": "record",
+     *   "name": "x",
+     *   "fields": [ { "name": "xField", "type": "string" } ]
+     * }
+     *
+     * // path2
+     * { "type": "record",
+     *   "name": "y",
+     *   "fields": [ { "name": "yField", "type": "string" } ]
+     * }
+     *
+     * The merged schema will be something like this:
+     *
+     * // merged
+     * { "type": "record",
+     *   "name": "merged",
+     *   "fields": [ { "name": "xField", "type": "string" },
+     *               { "name": "yField", "type": "string" } ]
+     * }
+     *
+     * The schemaToMergedSchemaMap will look like this:
+     *
+     * // schemaToMergedSchemaMap
+     * { path1 : { 0 : 0 },
+     *   path2 : { 0 : 1 }
+     * }
+     *
+     * The meaning of the map is:
+     * - The field at index '0' of 'path1' is moved to index '0' in merged schema.
+     * - The field at index '0' of 'path2' is moved to index '1' in merged schema.
+     *
+     * With this map, we can now remap the field position of the original schema to
+     * that of the merged schema. This is necessary because in the backend, we don't
+     * use the merged avro schema but embedded avro schemas of input files to load
+     * them. Therefore, we must relocate each field from old positions in the original
+     * schema to new positions in the merged schema.
+     *
+     * @param mergedSchema new schema generated from multiple input schemas
+     * @param mergedFiles input avro files that are merged
+     * @return schemaToMergedSchemaMap that maps old position of each field in the
+     * original schema to new position in the new schema
+     * @throws IOException
+     */
+    public static Map<Path, Map<Integer, Integer>> getSchemaToMergedSchemaMap(
+            Schema mergedSchema, Map<Path, Schema> mergedFiles) throws IOException {
+
+        if (!mergedSchema.getType().equals(Schema.Type.RECORD)) {
+            throw new IOException("Remapping of non-record schemas is not supported");
+        }
+
+        Map<Path, Map<Integer, Integer>> result =
+                new HashMap<Path, Map<Integer, Integer>>(mergedFiles.size());
+
+        // map from field position in old schema to field position in new schema
+        for (Map.Entry<Path, Schema> entry : mergedFiles.entrySet()) {
+            Path path = entry.getKey();
+            Schema schema = entry.getValue();
+            if (!schema.getType().equals(Schema.Type.RECORD)) {
+                throw new IOException("Remapping of non-record schemas is not supported");
+            }
+            List<Field> fields = schema.getFields();
+            Map<Integer, Integer> oldPos2NewPos = result.get(path);
+            if (oldPos2NewPos == null) {
+                oldPos2NewPos = new HashMap<Integer, Integer>(fields.size());
+                result.put(path, oldPos2NewPos);
+            }
+            for (Field field : fields) {
+                String fieldName = field.name();
+                int oldPos = schema.getField(fieldName).pos();
+                int newPos = mergedSchema.getField(fieldName).pos();
+                oldPos2NewPos.put(oldPos, newPos);
+            }
+        }
+        return result;
+    }
+
+    /**
      * Wrap an avro schema as a nullable union if needed.
      * For instance, wrap schema "int" as ["null", "int"]
      */

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java?rev=1397333&r1=1397332&r2=1397333&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java Thu Oct 11 22:06:32 2012
@@ -20,8 +20,10 @@ package org.apache.pig.piggybank.storage
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -33,13 +35,19 @@ import org.apache.hadoop.mapreduce.lib.i
 
 /**
  * The InputFormat for avro data.
- * 
+ *
  */
 public class PigAvroInputFormat extends FileInputFormat<NullWritable, Writable> {
 
     private Schema schema = null;  /* avro schema */
     private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
 
+    /* if multiple avro record schemas are merged, this map associates each input
+     * record with a remapping of its fields relative to the merged schema. please
+     * see AvroStorageUtils.getSchemaToMergedSchemaMap() for more details.
+     */
+    private Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap;
+
     /**
      * empty constructor
      */
@@ -50,14 +58,18 @@ public class PigAvroInputFormat extends 
      * constructor called by AvroStorage to pass in schema and ignoreBadFiles.
      * @param schema input data schema
      * @param ignoreBadFiles whether ignore corrupted files during load
+     * @param schemaToMergedSchemaMap map that associates each input record
+     * with a remapping of its fields relative to the merged schema
      */
-    public PigAvroInputFormat(Schema schema, boolean ignoreBadFiles) {
+    public PigAvroInputFormat(Schema schema, boolean ignoreBadFiles,
+            Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap) {
         this.schema = schema;
         this.ignoreBadFiles = ignoreBadFiles;
+        this.schemaToMergedSchemaMap = schemaToMergedSchemaMap;
     }
 
     /**
-     * Create and return an avro record reader. 
+     * Create and return an avro record reader.
      * It uses the input schema passed in to the
      * constructor.
      */
@@ -66,7 +78,8 @@ public class PigAvroInputFormat extends 
     createRecordReader(InputSplit split, TaskAttemptContext context)
     throws IOException,  InterruptedException {
         context.setStatus(split.toString());
-        return new PigAvroRecordReader(context, (FileSplit) split, schema, ignoreBadFiles);
+        return new PigAvroRecordReader(context, (FileSplit) split, schema,
+                ignoreBadFiles, schemaToMergedSchemaMap);
     }
 
 }

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1397333&r1=1397332&r2=1397333&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java Thu Oct 11 22:06:32 2012
@@ -33,10 +33,12 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 
+import java.util.ArrayList;
+import java.util.Map;
+
 /**
- * This is an implementation of record reader which reads in avro data and 
+ * This is an implementation of record reader which reads in avro data and
  * convert them into <NullWritable, Writable> pairs.
- * 
  */
 public class PigAvroRecordReader extends RecordReader<NullWritable, Writable> {
 
@@ -47,17 +49,32 @@ public class PigAvroRecordReader extends
     private long start;
     private long end;
     private Path path;
-    private boolean ignoreBadFiles;
+    private boolean ignoreBadFiles; /* whether ignore corrupted files during load */
+
+    private TupleFactory tupleFactory = TupleFactory.getInstance();
+
+    /* if multiple avro record schemas are merged, this list will hold field objects
+     * of the merged schema.
+     */
+    private ArrayList<Object> mProtoTuple;
+
+    /* if multiple avro record schemas are merged, this map associates each input
+     * record with a remapping of its fields relative to the merged schema. please
+     * see AvroStorageUtils.getSchemaToMergedSchemaMap() for more details.
+     */
+    private Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap;
 
     /**
      * constructor to initialize input and avro data reader
      */
     public PigAvroRecordReader(TaskAttemptContext context, FileSplit split,
-                               Schema schema, boolean ignoreBadFiles) throws IOException {
+            Schema schema, boolean ignoreBadFiles,
+            Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap) throws IOException {
         this.path = split.getPath();
         this.in = new AvroStorageInputStream(path, context);
-        if(schema == null)
-            throw new IOException("Need to provide input avro schema");
+        if(schema == null) {
+            AvroStorageLog.details("No avro schema given; assuming the schema is embedded");
+        }
 
         try {
           this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(schema));
@@ -69,6 +86,22 @@ public class PigAvroRecordReader extends
         this.start = in.tell();
         this.end = split.getStart() + split.getLength();
         this.ignoreBadFiles = ignoreBadFiles;
+        this.schemaToMergedSchemaMap = schemaToMergedSchemaMap;
+        if (schemaToMergedSchemaMap != null) {
+            // initialize mProtoTuple
+            int maxPos = 0;
+            for (Map<Integer, Integer> map : schemaToMergedSchemaMap.values()) {
+                for (Integer i : map.values()) {
+                    maxPos = Math.max(i, maxPos);
+                }
+            }
+            int tupleSize = maxPos + 1;
+            AvroStorageLog.details("Creating proto tuple of fixed size: " + tupleSize);
+            mProtoTuple = new ArrayList<Object>(tupleSize);
+            for (int i = 0; i < tupleSize; i++) {
+                mProtoTuple.add(i, null);
+            }
+        }
     }
 
     @Override
@@ -93,20 +126,45 @@ public class PigAvroRecordReader extends
     @Override
     public Writable getCurrentValue() throws IOException, InterruptedException {
         Object obj = reader.next();
+        Tuple result = null;
         if (obj instanceof Tuple) {
             AvroStorageLog.details("Class =" + obj.getClass());
-            return (Tuple) obj;
+            result = (Tuple) obj;
         } else {
             AvroStorageLog.details("Wrap calss " + obj.getClass() + " as a tuple.");
-            return wrapAsTuple(obj);
+            result = wrapAsTuple(obj);
+        }
+        if (schemaToMergedSchemaMap != null) {
+            // remap the position of fields to the merged schema
+            Map<Integer, Integer> map = schemaToMergedSchemaMap.get(path);
+            if (map == null) {
+                throw new IOException("The schema of '" + path + "' " +
+                                      "is not merged by AvroStorage.");
+            }
+            result = remap(result, map);
+        }
+        return result;
+    }
+
+    /**
+     * Remap the position of fields to the merged schema
+     */
+    private Tuple remap(Tuple tuple, Map<Integer, Integer> map) throws IOException {
+        try {
+            for (int pos = 0; pos < tuple.size(); pos++) {
+                mProtoTuple.set(map.get(pos), tuple.get(pos));
+            }
+        } catch (Exception e) {
+            throw new IOException(e);
         }
+        return tupleFactory.newTuple(mProtoTuple);
     }
 
     /**
      * Wrap non-tuple value as a tuple
      */
     protected Tuple wrapAsTuple(Object in) {
-        Tuple tuple = TupleFactory.getInstance().newTuple();
+        Tuple tuple = tupleFactory.newTuple();
         tuple.append(in);
         return tuple;
     }

Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1397333&r1=1397332&r2=1397333&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Thu Oct 11 22:06:32 2012
@@ -159,6 +159,8 @@ public class TestAvroStorage {
         "   } ]" +
         " }";
     final private String testCorruptedFile = getInputFile("test_corrupted_file.avro");
+    final private String testMultipleSchemas1File = getInputFile("test_primitive_types/*");
+    final private String testMultipleSchemas2File = getInputFile("test_complex_types/*");
 
     @BeforeClass
     public static void setup() throws ExecException {
@@ -461,6 +463,105 @@ public class TestAvroStorage {
     }
 
     @Test
+    public void testMultipleSchemas1() throws IOException {
+        // Verify that multiple primitive types can be loaded.
+        // Input Avro files have the following schemas:
+        //  "int"
+        //  "long"
+        //  "float"
+        //  "double"
+        //  "string"
+        //  { "type" : "enum", "name" : "foo", "symbols" : [ "6" ] }
+        // Merged Avro schema looks like this:
+        //  "string"
+        // The relation 'in' looks like this: (order of rows can be different.)
+        //  (6)
+        //  (4.0)
+        //  (3.0)
+        //  (5)
+        //  (2)
+        //  (1)
+        // Avro file stored after processing looks like this:
+        //  "1"
+        //  "2"
+        //  "3.0"
+        //  "4.0"
+        //  "5"
+        //  "6"
+        String output= outbasedir + "testMultipleSchemas1";
+        String expected = basedir + "expected_testMultipleSchemas1.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + testMultipleSchemas1File +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('multiple_schemas');",
+          " s = FOREACH in GENERATE StringConcat($0);",
+          " o = ORDER s BY $0;",
+          " STORE o INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('schema', '\"string\"');"
+           };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+
+    @Test
+    public void testMultipleSchemas2() throws IOException {
+        // Verify that multiple complex types (records) can be loaded.
+        // Input Avro files have the following schemas:
+        //  { "type" : "record", "name" : "r", "fields" : [ { "name" : "i", "type" : "int" } ] }
+        //  { "type" : "record", "name" : "r", "fields" : [ { "name" : "l", "type" : "long" } ] }
+        //  { "type" : "record", "name" : "r", "fields" : [ { "name" : "f", "type" : "float" } ] }
+        //  { "type" : "record", "name" : "r", "fields" : [ { "name" : "d", "type" : "double" } ] }
+        //  { "type" : "record", "name" : "r", "fields" : [ { "name" : "s", "type" : "string" } ] }
+        //  { "type" : "record", "name" : "r", "fields" : [ { "name" : "e", "type" : {
+        //      "type" : "enum", "name" : "foo", "symbols" : [ "6" ] } } ] }
+        // Merged Avro schema looks like this:
+        //  { "type" : "record",
+        //    "name" : "merged",
+        //    "fields" : [ { "name" : "i", "type" : "int" },
+        //                 { "name" : "l", "type" : "long" },
+        //                 { "name" : "f", "type" : "float" },
+        //                 { "name" : "d", "type" : "double" },
+        //                 { "name" : "s", "type" : "string" },
+        //                 { "name" : "e", "type" : {
+        //                      "type" : "enum", "name" : "foo", "symbols" : [ "6" ] } }
+        //               ]
+        //  }
+        // The relation 'in' looks like this: (order of rows can be different.)
+        //  (,,6,,,)
+        //  (,,,,4.0,)
+        //  (,,,,,3.0)
+        //  (,5,,,,)
+        //  (,,,2,,)
+        //  (1,,,,,)
+        // Avro file stored after processing looks like this:
+        //  "1"
+        //  "2"
+        //  "3.0"
+        //  "4.0"
+        //  "5"
+        //  "6"
+        String output= outbasedir + "testMultipleSchemas2";
+        String expected = basedir + "expected_testMultipleSchemas2.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+          " in = LOAD '" + testMultipleSchemas2File +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('multiple_schemas');",
+          " f = FOREACH in GENERATE ($0 is not null ? (chararray)$0 : '')," +
+          "                         ($1 is not null ? (chararray)$1 : '')," +
+          "                         ($2 is not null ? (chararray)$2 : '')," +
+          "                         ($3 is not null ? (chararray)$3 : '')," +
+          "                         ($4 is not null ? (chararray)$4 : '')," +
+          "                         ($5 is not null ? (chararray)$5 : '');",
+          " c = FOREACH f GENERATE StringConcat( $0, $1, $2, $3, $4, $5 );",
+          " o = ORDER c BY $0;",
+          " STORE o INTO '" + output +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('schema', '\"string\"');"
+           };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+
+    @Test
     public void testDir() throws IOException {
         // Verify that all files in a directory including its sub-directories are loaded.
         String output= outbasedir + "testDir";

Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java?rev=1397333&r1=1397332&r2=1397333&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java Thu Oct 11 22:06:32 2012
@@ -26,6 +26,10 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 import static org.junit.Assert.*;
 
@@ -89,39 +93,383 @@ public class TestAvroStorageUtils {
     }
 
     @Test
-    public void testGetConcretePathFromGlob() throws IOException {
+    public void testGetAllSubDirs() throws IOException {
         final String basedir = "file://" + System.getProperty("user.dir");
         final String tempdir = Long.toString(System.currentTimeMillis());
         final String nonexistentpath = basedir + "/" + tempdir + "/this_path_does_not_exist";
 
-        Path[] paths = null;
-        Path concretePath = null;
-        Job job = new Job(new Configuration());
+        String locationStr = null;
+        Set<Path> paths = new HashSet<Path>();
+        Configuration conf = new Configuration();
 
         // existent path
-        String locationStr = basedir;
-        concretePath = AvroStorageUtils.getConcretePathFromGlob(locationStr, job);
-        assertEquals(basedir, concretePath.toUri().toString());
+        locationStr = basedir;
+        assertTrue(AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths));
+        assertFalse(paths.isEmpty());
+        paths.clear();
 
         // non-existent path
         locationStr = nonexistentpath;
-        concretePath = AvroStorageUtils.getConcretePathFromGlob(locationStr, job);
-        assertEquals(null, concretePath);
+        assertFalse(AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths));
+        assertTrue(paths.isEmpty());
+        paths.clear();
 
         // empty glob pattern
         locationStr = basedir + "/{}";
-        concretePath = AvroStorageUtils.getConcretePathFromGlob(locationStr, job);
-        assertEquals(null, concretePath);
+        assertFalse(AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths));
+        assertTrue(paths.isEmpty());
+        paths.clear();
 
         // bad glob pattern
         locationStr = basedir + "/{1,";
         try {
-            concretePath = AvroStorageUtils.getConcretePathFromGlob(locationStr, job);
-            Assert.fail();
+            AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths);
+            Assert.fail("Negative test to test illegal file pattern. Should not be succeeding!");
         } catch (IOException e) {
             // The message of the exception for illegal file pattern is rather long,
             // so we simply confirm if it contains 'illegal file pattern'.
             assertTrue(e.getMessage().contains("Illegal file pattern"));
         }
     }
+
+    // test merging null and non-null
+    @Test
+    public void testMergeSchema1() throws IOException {
+        Schema nonNull = Schema.create(Schema.Type.INT);
+        assertEquals(AvroStorageUtils.mergeSchema(null, null), null);
+        assertEquals(AvroStorageUtils.mergeSchema(null, nonNull), nonNull);
+        assertEquals(AvroStorageUtils.mergeSchema(nonNull, null), nonNull);
+    }
+
+    // test merging primitive types
+    @Test
+    public void testMergeSchema2() throws IOException {
+        Schema.Type primitiveType[] = {
+                Schema.Type.NULL,
+                Schema.Type.BOOLEAN,
+                Schema.Type.BYTES,
+                Schema.Type.INT,
+                Schema.Type.LONG,
+                Schema.Type.FLOAT,
+                Schema.Type.DOUBLE,
+                Schema.Type.ENUM,
+                Schema.Type.STRING,
+        };
+
+        // First index is type1, and second index is type2. Note that Schema.Type.NULL
+        // means the Avro null type while null means undefined.
+        Schema.Type expectedType[][] = {
+                {
+                    Schema.Type.NULL,    // = null + null
+                    null,                // = null + boolean
+                    null,                // = null + bytes
+                    null,                // = null + int
+                    null,                // = null + long
+                    null,                // = null + float
+                    null,                // = null + double
+                    null,                // = null + enum
+                    null,                // = null + string
+                },
+                {
+                    null,                // = boolean + null
+                    Schema.Type.BOOLEAN, // = boolean + boolean
+                    null,                // = boolean + bytes
+                    null,                // = boolean + int
+                    null,                // = boolean + long
+                    null,                // = boolean + float
+                    null,                // = boolean + double
+                    null,                // = boolean + enum
+                    null,                // = boolean + string
+                },
+                {
+                    null,                // = bytes + null
+                    null,                // = bytes + boolean
+                    Schema.Type.BYTES,   // = bytes + bytes
+                    null,                // = bytes + int
+                    null,                // = bytes + long
+                    null,                // = bytes + float
+                    null,                // = bytes + double
+                    null,                // = bytes + enum
+                    null,                // = bytes + string
+                },
+                {
+                    null,                // = int + null
+                    null,                // = int + boolean
+                    null,                // = int + bytes
+                    Schema.Type.INT,     // = int + int
+                    Schema.Type.LONG,    // = int + long
+                    Schema.Type.FLOAT,   // = int + float
+                    Schema.Type.DOUBLE,  // = int + double
+                    Schema.Type.STRING,  // = int + enum
+                    Schema.Type.STRING,  // = int + string
+                },
+                {
+                    null,                // = long + null
+                    null,                // = long + boolean
+                    null,                // = long + bytes
+                    Schema.Type.LONG,    // = long + int
+                    Schema.Type.LONG,    // = long + long
+                    Schema.Type.FLOAT,   // = long + float
+                    Schema.Type.DOUBLE,  // = long + double
+                    Schema.Type.STRING,  // = long + enum
+                    Schema.Type.STRING,  // = long + string
+                },
+                {
+                    null,                // = float + null
+                    null,                // = float + boolean
+                    null,                // = float + bytes
+                    Schema.Type.FLOAT,   // = float + int
+                    Schema.Type.FLOAT,   // = float + long
+                    Schema.Type.FLOAT,   // = float + float
+                    Schema.Type.DOUBLE,  // = float + double
+                    Schema.Type.STRING,  // = float + enum
+                    Schema.Type.STRING,  // = float + string
+                },
+                {
+                    null,                // = double + null
+                    null,                // = double + boolean
+                    null,                // = double + bytes
+                    Schema.Type.DOUBLE,  // = double + int
+                    Schema.Type.DOUBLE,  // = double + long
+                    Schema.Type.DOUBLE,  // = double + float
+                    Schema.Type.DOUBLE,  // = double + double
+                    Schema.Type.STRING,  // = double + enum
+                    Schema.Type.STRING,  // = double + string
+                },
+                {
+                    null,                // = enum + null
+                    null,                // = enum + boolean
+                    null,                // = enum + bytes
+                    Schema.Type.STRING,  // = enum + int
+                    Schema.Type.STRING,  // = enum + long
+                    Schema.Type.STRING,  // = enum + float
+                    Schema.Type.STRING,  // = enum + double
+                    Schema.Type.ENUM,    // = enum + enum
+                    Schema.Type.STRING,  // = enum + string
+                },
+                {
+                    null,                // = string + null
+                    null,                // = string + boolean
+                    null,                // = string + bytes
+                    Schema.Type.STRING,  // = string + int
+                    Schema.Type.STRING,  // = string + long
+                    Schema.Type.STRING,  // = string + float
+                    Schema.Type.STRING,  // = string + double
+                    Schema.Type.STRING,  // = string + enum
+                    Schema.Type.STRING,  // = string + string
+                },
+        };
+
+        List<String> enumSymbols = new ArrayList<String>();
+        enumSymbols.add("sym1");
+        enumSymbols.add("sym2");
+        Schema enumSchema = Schema.createEnum("enum", null, null, enumSymbols);
+
+        for (int i = 0; i < primitiveType.length; i++) {
+            Schema.Type x = primitiveType[i];
+            for (int j = 0; j < primitiveType.length; j++) {
+                Schema.Type y = primitiveType[j];
+                if (expectedType[i][j] == null) {
+                    try {
+                        Schema z = AvroStorageUtils.mergeSchema(
+                                x.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(x),
+                                y.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(y));
+                        Assert.fail("exception is expected, but " + z.getType() + " is returned");
+                    } catch (IOException e) {
+                        assertEquals("Cannot merge "+ x +" with " +y, e.getMessage());
+                    }
+                } else {
+                    Schema z = AvroStorageUtils.mergeSchema(
+                            x.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(x),
+                            y.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(y));
+                    assertEquals(expectedType[i][j], z.getType());
+                }
+            }
+        }
+    }
+
+    // test merging different kinds of complex types (negative test)
+    // different kinds of complex types cannot be merged
+    @Test
+    public void testMergeSchema3() throws IOException {
+        Schema complexType[] = {
+            Schema.createRecord(new ArrayList<Schema.Field>()),
+            Schema.createArray(Schema.create(Schema.Type.INT)),
+            Schema.createMap(Schema.create(Schema.Type.INT)),
+            Schema.createUnion(new ArrayList<Schema>()),
+            Schema.createFixed("fixed", null, null, 1),
+        };
+
+        for (int i = 0; i < complexType.length; i++) {
+            Schema x = complexType[i];
+            for (int j = 0; j < complexType.length; j++) {
+                Schema y = complexType[j];
+                if (i != j) {
+                    try {
+                        Schema z = AvroStorageUtils.mergeSchema(x, y);
+                        Assert.fail("exception is expected, but " + z.getType() + " is returned");
+                    } catch (IOException e) {
+                        assertEquals("Cannot merge "+ x.getType()+ " with "+ y.getType(), e.getMessage());
+                    }
+                }
+            }
+        }
+    }
+
+    // test merging two records
+    @Test
+    public void testMergeSchema4() throws IOException {
+        Schema x, y, z;
+
+        // fields have the same names and types
+        List<Schema.Field> fields1 = new ArrayList<Schema.Field>();
+        fields1.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
+        x = Schema.createRecord(fields1);
+
+        List<Schema.Field> fields2 = new ArrayList<Schema.Field>();
+        fields2.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
+        y = Schema.createRecord(fields2);
+
+        z = AvroStorageUtils.mergeSchema(x, y);
+        assertEquals(x.getFields().size(), z.getFields().size());
+        assertEquals(Schema.Type.INT, z.getField("f1").schema().getType());
+
+        // fields have the same names and mergeable types
+        List<Schema.Field> fields3 = new ArrayList<Schema.Field>();
+        fields3.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
+        x = Schema.createRecord(fields3);
+
+        List<Schema.Field> fields4 = new ArrayList<Schema.Field>();
+        fields4.add(new Schema.Field("f1", Schema.create(Schema.Type.DOUBLE), null, null));
+        y = Schema.createRecord(fields4);
+
+        z = AvroStorageUtils.mergeSchema(x, y);
+        assertEquals(x.getFields().size(), z.getFields().size());
+        assertEquals(Schema.Type.DOUBLE, z.getField("f1").schema().getType());
+
+        // fields have the same names but not mergeable types
+        List<Schema.Field> fields5 = new ArrayList<Schema.Field>();
+        fields5.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
+        x = Schema.createRecord(fields5);
+
+        List<Schema.Field> fields6 = new ArrayList<Schema.Field>();
+        fields6.add(new Schema.Field("f1", Schema.create(Schema.Type.BOOLEAN), null, null));
+        y = Schema.createRecord(fields6);
+
+        try {
+            z = AvroStorageUtils.mergeSchema(x, y);
+            Assert.fail("exception is expected, but " + z.getType() + " is returned");
+        } catch (IOException e) {
+            assertEquals("Cannot merge "+ x.getField("f1").schema().getType() +
+                         " with "+ y.getField("f1").schema().getType(), e.getMessage());
+        }
+
+        // fields have different names
+        List<Schema.Field> fields7 = new ArrayList<Schema.Field>();
+        fields7.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
+        x = Schema.createRecord(fields7);
+
+        List<Schema.Field> fields8 = new ArrayList<Schema.Field>();
+        fields8.add(new Schema.Field("f2", Schema.create(Schema.Type.DOUBLE), null, null));
+        y = Schema.createRecord(fields8);
+
+        z = AvroStorageUtils.mergeSchema(x, y);
+        assertEquals(x.getFields().size() + y.getFields().size(), z.getFields().size());
+        assertEquals(Schema.Type.INT, z.getField("f1").schema().getType());
+        assertEquals(Schema.Type.DOUBLE, z.getField("f2").schema().getType());
+    }
+
+    // test merging two maps
+    @Test
+    public void testMergeSchema5() throws IOException {
+        Schema x, y, z;
+
+        // element types are mergeable
+        x = Schema.createArray(Schema.create(Schema.Type.INT));
+        y = Schema.createArray(Schema.create(Schema.Type.DOUBLE));
+
+        z = AvroStorageUtils.mergeSchema(x, y);
+        assertEquals(Schema.Type.ARRAY, z.getType());
+        assertEquals(Schema.Type.DOUBLE, z.getElementType().getType());
+
+        // element types are not mergeable
+        x = Schema.createArray(Schema.create(Schema.Type.INT));
+        y = Schema.createArray(Schema.create(Schema.Type.BOOLEAN));
+
+        try {
+            z = AvroStorageUtils.mergeSchema(x, y);
+            Assert.fail("exception is expected, but " + z.getType() + " is returned");
+        } catch (IOException e) {
+            assertEquals("Cannot merge "+ x.getElementType().getType() +
+                         " with "+ y.getElementType().getType(), e.getMessage());
+        }
+    }
+
+    // test merging two maps
+    @Test
+    public void testMergeSchema6() throws IOException {
+        Schema x, y, z;
+
+        // value types are mergeable
+        x = Schema.createMap(Schema.create(Schema.Type.INT));
+        y = Schema.createMap(Schema.create(Schema.Type.DOUBLE));
+
+        z = AvroStorageUtils.mergeSchema(x, y);
+        assertEquals(Schema.Type.MAP, z.getType());
+        assertEquals(Schema.Type.DOUBLE, z.getValueType().getType());
+
+        // value types are not mergeable
+        x = Schema.createMap(Schema.create(Schema.Type.INT));
+        y = Schema.createMap(Schema.create(Schema.Type.BOOLEAN));
+
+        try {
+            z = AvroStorageUtils.mergeSchema(x, y);
+            Assert.fail("exception is expected, but " + z.getType() + " is returned");
+        } catch (IOException e) {
+            assertEquals("Cannot merge "+ x.getValueType().getType() +
+                         " with "+ y.getValueType().getType(), e.getMessage());
+        }
+    }
+
+    // test merging two unions
+    @Test
+    public void testMergeSchema7() throws IOException {
+        Schema x, y, z;
+
+        List<Schema> types1 = new ArrayList<Schema>();
+        types1.add(Schema.create(Schema.Type.INT));
+        List<Schema> types2 = new ArrayList<Schema>();
+        types2.add(Schema.create(Schema.Type.DOUBLE));
+
+        x = Schema.createUnion(types1);
+        y = Schema.createUnion(types2);
+
+        z = AvroStorageUtils.mergeSchema(x, y);
+        assertEquals(Schema.Type.UNION, z.getType());
+        assertTrue(z.getTypes().contains(types1.get(0)));
+        assertTrue(z.getTypes().contains(types2.get(0)));
+    }
+
+    // test merging two fixeds
+    @Test
+    public void testMergeSchema8() throws IOException {
+        Schema x, y, z;
+
+        x = Schema.createFixed("fixed1", null, null, 1);
+        y = Schema.createFixed("fixed2", null, null, 1);
+
+        z = AvroStorageUtils.mergeSchema(x, y);
+        assertEquals(Schema.Type.FIXED, z.getType());
+        assertEquals(x.getFixedSize(), z.getFixedSize());
+
+        x = Schema.createFixed("fixed1", null, null, 1);
+        y = Schema.createFixed("fixed2", null, null, 2);
+
+        try {
+            z = AvroStorageUtils.mergeSchema(x, y);
+            Assert.fail("exception is expected, but " + z.getType() + " is returned");
+        } catch (IOException e) {
+            assertTrue(e.getMessage().contains("Cannot merge FIXED types with different sizes"));
+        }
+    }
 }

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_double.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_double.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_double.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_enum.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_enum.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_enum.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_float.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_float.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_float.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_int.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_int.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_int.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_long.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_long.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_long.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_string.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_string.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_string.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_double.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_double.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_double.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_enum.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_enum.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_enum.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_float.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_float.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_float.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_int.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_int.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_int.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_long.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_long.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_long.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_string.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_string.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_string.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream