You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2013/06/14 15:34:40 UTC

svn commit: r1493062 - in /pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/ contrib/piggybank/java/src/test/java/org/apache/pig/pi...

Author: rohini
Date: Fri Jun 14 13:34:39 2013
New Revision: 1493062

URL: http://svn.apache.org/r1493062
Log:
PIG-3318: AVRO: 'default value' not honored when merging schemas on load with AvroStorage (viraj via rohini)

Added:
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee3.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee4.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee6.avro   (with props)
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1493062&r1=1493061&r2=1493062&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Jun 14 13:34:39 2013
@@ -196,6 +196,8 @@ PIG-3013: BinInterSedes improve chararra
 
 BUG FIXES
 
+PIG-3318: AVRO: 'default value' not honored when merging schemas on load with AvroStorage (viraj via rohini)
+
 PIG-3250: Pig dryrun generates wrong output in .expanded file for 'SPLIT....OTHERWISE...' command (dreambird via cheolsoo)
 
 PIG-3331: Default values not stored in avro file when using specific schemas during store in AvroStorage (viraj via rohini)

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1493062&r1=1493061&r2=1493062&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Fri Jun 14 13:34:39 2013
@@ -319,15 +319,8 @@ public class AvroStorage extends FileInp
         AvroStorageLog.funcCall("getInputFormat");
         InputFormat result = null;
         if(inputAvroSchema != null) {
-            if (useMultipleSchemas) {
-                // When merging multiple avro schemas, we use embedded schemas
-                // to load input files. So no input avro schema is passed.
-                result = new PigAvroInputFormat(
-                        null, ignoreBadFiles, schemaToMergedSchemaMap);
-            } else {
-                result = new PigAvroInputFormat(
-                        inputAvroSchema, ignoreBadFiles, schemaToMergedSchemaMap);
-            }
+            result = new PigAvroInputFormat(
+            inputAvroSchema, ignoreBadFiles, schemaToMergedSchemaMap, useMultipleSchemas);
         } else {
             result = new TextInputFormat();
         }

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java?rev=1493062&r1=1493061&r2=1493062&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java Fri Jun 14 13:34:39 2013
@@ -48,7 +48,7 @@ import org.apache.pig.ResourceSchema.Res
 import org.apache.pig.data.DataType;
 import org.apache.pig.piggybank.storage.avro.AvroStorageLog;
 
-
+import org.codehaus.jackson.JsonNode;
 /**
  * This is utility class for this package
  */
@@ -305,12 +305,15 @@ public class AvroStorageUtils {
                 List<Schema.Field> yFields = y.getFields();
 
                 // LinkedHashMap is used to keep fields in insertion order.
-                // It's convenient for testing to have determinitic behaviors.
+                // It's convenient for testing to have deterministic behaviors.
                 Map<String, Schema> fieldName2Schema =
                         new LinkedHashMap<String, Schema>(xFields.size() + yFields.size());
+                Map<String, JsonNode> fieldName2Default =
+                        new LinkedHashMap<String, JsonNode>(xFields.size() + yFields.size());
 
                 for (Schema.Field xField : xFields) {
                     fieldName2Schema.put(xField.name(), xField.schema());
+                    fieldName2Default.put(xField.name(),xField.defaultValue());
                 }
                 for (Schema.Field yField : yFields) {
                     String name = yField.name();
@@ -318,14 +321,28 @@ public class AvroStorageUtils {
                     Schema prevSchema = fieldName2Schema.get(name);
                     if (prevSchema == null) {
                         fieldName2Schema.put(name, currSchema);
+                        fieldName2Default.put(name, yField.defaultValue());
                     } else {
                         fieldName2Schema.put(name, mergeSchema(prevSchema, currSchema));
+                        //during merging of schemas for records it to okay to have one field with a default
+                        // and another null so the one with the default will be considered
+                        JsonNode xDefaultValue = fieldName2Default.get(name);
+                        JsonNode yDefaultValue = yField.defaultValue();
+                        if (xDefaultValue != null) {
+                            // need to check if the default values in the schemas are the same
+                            if (yDefaultValue != null && !xDefaultValue.equals(yDefaultValue)) {
+                                throw new IOException(
+                                     "Cannot merge schema's which have different default values - " + xDefaultValue +
+                                     " and " + yDefaultValue);
+                            }
+                        } else {
+                            fieldName2Default.put(name, yDefaultValue);
+                        }
                     }
                 }
-
                 List<Schema.Field> mergedFields = new ArrayList<Schema.Field>(fieldName2Schema.size());
                 for (Entry<String, Schema> entry : fieldName2Schema.entrySet()) {
-                    mergedFields.add(new Schema.Field(entry.getKey(), entry.getValue(), "auto-gen", null));
+                    mergedFields.add(new Schema.Field(entry.getKey(), entry.getValue(), "auto-gen", fieldName2Default.get(entry.getKey())));
                 }
                 Schema result = Schema.createRecord(
                         "merged", null, "merged schema (generated by AvroStorage)", false);

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java?rev=1493062&r1=1493061&r2=1493062&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java Fri Jun 14 13:34:39 2013
@@ -40,6 +40,8 @@ import org.apache.hadoop.mapreduce.lib.i
 public class PigAvroInputFormat extends FileInputFormat<NullWritable, Writable> {
 
     private Schema readerSchema = null;  /* avro schema */
+    /* establish is multiple_schema flag is used to pass this to the RecordReader*/
+    private boolean useMultipleSchemas = false;
     private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
 
     /* if multiple avro record schemas are merged, this map associates each input
@@ -62,10 +64,12 @@ public class PigAvroInputFormat extends 
      * with a remapping of its fields relative to the merged schema
      */
     public PigAvroInputFormat(Schema readerSchema, boolean ignoreBadFiles,
-            Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap) {
+            Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap,
+            boolean useMultipleSchemas) {
         this.readerSchema = readerSchema;
         this.ignoreBadFiles = ignoreBadFiles;
         this.schemaToMergedSchemaMap = schemaToMergedSchemaMap;
+        this.useMultipleSchemas = useMultipleSchemas;
     }
 
     /**
@@ -79,7 +83,7 @@ public class PigAvroInputFormat extends 
     throws IOException,  InterruptedException {
         context.setStatus(split.toString());
         return new PigAvroRecordReader(context, (FileSplit) split, readerSchema,
-                ignoreBadFiles, schemaToMergedSchemaMap);
+                ignoreBadFiles, schemaToMergedSchemaMap, useMultipleSchemas);
     }
 
 }

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1493062&r1=1493061&r2=1493062&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java Fri Jun 14 13:34:39 2013
@@ -35,8 +35,11 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
+import org.codehaus.jackson.JsonNode;
+
 /**
  * This is an implementation of record reader which reads in avro data and
  * convert them into <NullWritable, Writable> pairs.
@@ -59,6 +62,9 @@ public class PigAvroRecordReader extends
      */
     private ArrayList<Object> mProtoTuple;
 
+    /* establish is multiple_schema flag is used to pass this to the RecordReader*/
+    private boolean useMultipleSchemas = false;
+
     /* if multiple avro record schemas are merged, this map associates each input
      * record with a remapping of its fields relative to the merged schema. please
      * see AvroStorageUtils.getSchemaToMergedSchemaMap() for more details.
@@ -70,9 +76,11 @@ public class PigAvroRecordReader extends
      */
     public PigAvroRecordReader(TaskAttemptContext context, FileSplit split,
             Schema readerSchema, boolean ignoreBadFiles,
-            Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap) throws IOException {
+            Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap,
+            boolean useMultipleSchemas) throws IOException {
         this.path = split.getPath();
         this.in = new AvroStorageInputStream(path, context);
+        this.useMultipleSchemas = useMultipleSchemas;
         if(readerSchema == null) {
             AvroStorageLog.details("No avro schema given; assuming the schema is embedded");
         }
@@ -87,7 +95,12 @@ public class PigAvroRecordReader extends
         }
 
         try {
-            this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(writerSchema, readerSchema));
+            if (useMultipleSchemas) {
+                this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(writerSchema, null));
+            }
+            else {
+                this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(writerSchema, readerSchema));
+            }
         } catch (IOException e) {
           throw new IOException("Error initializing data file reader for file (" +
               split.getPath() + ")", e);
@@ -98,7 +111,7 @@ public class PigAvroRecordReader extends
         this.ignoreBadFiles = ignoreBadFiles;
         this.schemaToMergedSchemaMap = schemaToMergedSchemaMap;
         if (schemaToMergedSchemaMap != null) {
-            // initialize mProtoTuple
+            // initialize mProtoTuple with the right default values
             int maxPos = 0;
             for (Map<Integer, Integer> map : schemaToMergedSchemaMap.values()) {
                 for (Integer i : map.values()) {
@@ -109,7 +122,44 @@ public class PigAvroRecordReader extends
             AvroStorageLog.details("Creating proto tuple of fixed size: " + tupleSize);
             mProtoTuple = new ArrayList<Object>(tupleSize);
             for (int i = 0; i < tupleSize; i++) {
-                mProtoTuple.add(i, null);
+                // Get the list of fields from the passed schema
+                List<Schema.Field> subFields = readerSchema.getFields();
+                JsonNode defValue = subFields.get(i).defaultValue();
+                if (defValue != null) {
+                    Schema.Type type = subFields.get(i).schema().getType();
+                    switch (type) {
+                        case BOOLEAN:
+                            mProtoTuple.add(i, defValue.getBooleanValue());
+                            break;
+                        case ENUM:
+                            mProtoTuple.add(i, defValue.getTextValue());
+                            break;
+                        case FIXED:
+                            mProtoTuple.add(i, defValue.getTextValue());
+                            break;
+                        case INT:
+                            mProtoTuple.add(i, defValue.getIntValue());
+                            break;
+                        case LONG:
+                            mProtoTuple.add(i, defValue.getIntValue());
+                            break;
+                        case FLOAT:
+                            mProtoTuple.add(i, defValue.getNumberValue().floatValue());
+                            break;
+                        case DOUBLE:
+                            mProtoTuple.add(i, defValue.getNumberValue().doubleValue());
+                            break;
+                        case STRING:
+                            mProtoTuple.add(i, defValue.getTextValue());
+                            break;
+                        default:
+                            mProtoTuple.add(i, null);
+                            break;
+                    }
+                }
+                else {
+                    mProtoTuple.add(i, null);
+                }
             }
         }
     }

Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1493062&r1=1493061&r2=1493062&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Fri Jun 14 13:34:39 2013
@@ -186,6 +186,7 @@ public class TestAvroStorage {
     final private String testCorruptedFile = getInputFile("test_corrupted_file.avro");
     final private String testMultipleSchemas1File = getInputFile("test_primitive_types/*");
     final private String testMultipleSchemas2File = getInputFile("test_complex_types/*");
+    final private String testMultipleSchemasWithDefaultValue = getInputFile("test_merge_schemas_default/{Employee{3,4,6}.avro}");
     final private String testUserDefinedLoadSchemaFile = getInputFile("test_user_defined_load_schema/*");
     final private String testLoadwithNullValues = getInputFile("test_loadavrowithnulls.avro");
 
@@ -627,6 +628,97 @@ public class TestAvroStorage {
     }
 
     @Test
+    public void testMultipleSchemasWithDefaultValue() throws IOException {
+        //        ==> Employee3.avro <==
+        //            {
+        //            "type" : "record",
+        //            "name" : "employee",
+        //            "fields":[
+        //                    {"name" : "name", "type" : "string", "default" : "NU"},
+        //                    {"name" : "age", "type" : "int", "default" : 0 },
+        //                    {"name" : "dept", "type": "string", "default" : "DU"} ] }
+        //
+        //            ==> Employee4.avro <==
+        //            {
+        //            "type" : "record",
+        //            "name" : "employee",
+        //            "fields":[
+        //                    {"name" : "name", "type" : "string", "default" : "NU"},
+        //                    {"name" : "age", "type" : "int", "default" : 0},
+        //                    {"name" : "dept", "type": "string", "default" : "DU"},
+        //                    {"name" : "office", "type": "string", "default" : "OU"} ] }
+        //
+        //            ==> Employee6.avro <==
+        //            {
+        //            "type" : "record",
+        //            "name" : "employee",
+        //            "fields":[
+        //                    {"name" : "name", "type" : "string", "default" : "NU"},
+        //                    {"name" : "lastname", "type": "string", "default" : "LNU"},
+        //                    {"name" : "age", "type" : "int","default" : 0},
+        //                    {"name" : "salary", "type": "int", "default" : 0},
+        //                    {"name" : "dept", "type": "string","default" : "DU"},
+        //                    {"name" : "office", "type": "string","default" : "OU"} ] }
+        // The relation 'in' looks like this: (order of rows can be different.)
+        // Avro file stored after processing looks like this:
+        // The relation 'in' looks like this: (order of rows can be different.)
+        //      Employee3.avro
+        //        (Milo,30,DH)
+        //        (Asmya,34,PQ)
+        //        (Baljit,23,RS)
+        //
+        //      Employee4.avro
+        //        (Praj,54,RMX,Champaign)
+        //        (Buba,767,HD,Sunnyvale)
+        //        (Manku,375,MS,New York)
+        //
+        //      Employee6.avro
+        //        (Pune,Warriors,60,5466,Astrophysics,UTA)
+        //        (Rajsathan,Royals,20,1378,Biochemistry,Stanford)
+        //        (Chennai,Superkings,50,7338,Microbiology,Hopkins)
+        //        (Mumbai,Indians,20,4468,Applied Math,UAH)
+
+        // Data file stored after without looks like this with the
+        // following schema and data
+        // {name: chararray,age: int,dept: chararray,office: chararray,
+        // lastname: chararray,salary: int}
+        //(Asmya,34,PQ,OU,LNU,0)
+        //(Baljit,23,RS,OU,LNU,0)
+        //(Buba,767,HD,Sunnyvale,LNU,0)
+        //(Chennai,50,Microbiology,Hopkins,Superkings,7338)
+        //(Manku,375,MS,New York,LNU,0)
+        //(Milo,30,DH,OU,LNU,0)
+        //(Mumbai,20,Applied Math,UAH,Indians,4468)
+        //(Praj,54,RMX,Champaign,LNU,0)
+        //(Pune,60,Astrophysics,UTA,Warriors,5466)
+        //(Rajsathan,20,Biochemistry,Stanford,Royals,1378)
+
+        Data data = resetData(pigServerLocal);
+        String output= outbasedir + "testMultipleSchemasWithDefaultValue";
+        deleteDirectory(new File(output));
+        String expected = basedir + "expected_testMultipleSchemasWithDefaultValue.avro";
+        String [] queries = {
+          " a = LOAD '" + testMultipleSchemasWithDefaultValue +
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('multiple_schemas');",
+          " b = foreach a generate name,age,dept,office,lastname,salary;",
+          " c = filter b by age < 40 ;",
+          " d = order c by  name;",
+          " STORE d INTO '" + output+ "' using mock.Storage();"
+           };
+        testAvroStorage(queries);
+        List<Tuple> out = data.get(output);
+        assertEquals(out + " size", 5, out.size());
+        assertEquals(
+               schema("name: chararray,age: int,dept: chararray,office: chararray,lastname: chararray,salary: int"),
+                data.getSchema(output));
+        assertEquals(tuple("Asmya", 34, "PQ", "OU", "LNU", 0), out.get(0));
+        assertEquals(tuple("Baljit", 23, "RS", "OU", "LNU", 0), out.get(1));
+        assertEquals(tuple("Milo", 30, "DH", "OU", "LNU", 0), out.get(2));
+        assertEquals(tuple("Mumbai", 20, "Applied Math", "UAH", "Indians", 4468), out.get(3));
+        assertEquals(tuple("Rajsathan", 20, "Biochemistry", "Stanford", "Royals", 1378), out.get(4));
+    }
+
+    @Test
     // Verify the default values specified in the schema in AvroStorage
     // are actually written to the schema in the output avro file
     public void testDefaultValueSchemaWrite() throws IOException {
@@ -1185,9 +1277,6 @@ public class TestAvroStorage {
     }
 
     private void verifyResults(String outPath, String expectedOutpath, String expectedCodec) throws IOException {
-        // Seems compress for Avro is broken in 23. Skip this test and open Jira PIG-
-        if (Util.isHadoop23())
-            return;
 
         FileSystem fs = FileSystem.getLocal(new Configuration()) ;
 

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee3.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee3.avro?rev=1493062&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee3.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee4.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee4.avro?rev=1493062&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee4.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee6.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee6.avro?rev=1493062&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee6.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream