You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2013/05/16 01:46:14 UTC

svn commit: r1483127 - in /pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/ contrib/piggybank/java/src/test/java/org/apache/pig/pi...

Author: rohini
Date: Wed May 15 23:46:14 2013
New Revision: 1483127

URL: http://svn.apache.org/r1483127
Log:
PIG-3321: AVRO: Support user specified schema on load (harveyc via rohini)

Added:
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testUserDefinedLoadSchema.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input1.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input2.avro   (with props)
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1483127&r1=1483126&r2=1483127&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed May 15 23:46:14 2013
@@ -28,6 +28,8 @@ PIG-3174:  Remove rpm and deb artifacts 
 
 IMPROVEMENTS
 
+PIG-3321: AVRO: Support user specified schema on load (harveyc via rohini)
+
 PIG-2959: Add a pig.cmd for Pig to run under Windows (daijy)
 
 PIG-3311: add pig-withouthadoop-h2 to mvn-jar (julien)

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1483127&r1=1483126&r2=1483127&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Wed May 15 23:46:14 2013
@@ -89,6 +89,7 @@ public class AvroStorage extends FileInp
     /* loadFunc parameters */
     private PigAvroRecordReader reader = null;   /* avro record writer */
     private Schema inputAvroSchema = null;    /* input avro schema */
+    private Schema userSpecifiedAvroSchema = null;    /* avro schema specified in constructor args */
 
     /* if multiple avro record schemas are merged, this map associates each input
      * record with a remapping of its fields relative to the merged schema. please
@@ -171,8 +172,13 @@ public class AvroStorage extends FileInp
      * @throws IOException
      */
     protected void setInputAvroSchema(Set<Path> paths, Configuration conf) throws IOException {
-        inputAvroSchema = useMultipleSchemas ? getMergedSchema(paths, conf)
-                                             : getAvroSchema(paths, conf);
+        if(userSpecifiedAvroSchema != null) {
+            inputAvroSchema = userSpecifiedAvroSchema;
+        }
+        else {
+            inputAvroSchema = useMultipleSchemas ? getMergedSchema(paths, conf)
+                                                 : getAvroSchema(paths, conf);
+        }
     }
 
     /**
@@ -280,17 +286,7 @@ public class AvroStorage extends FileInp
      * @throws IOException
      */
     protected Schema getSchema(Path path, FileSystem fs) throws IOException {
-        /* get path of the last file */
-        Path lastFile = AvroStorageUtils.getLast(path, fs);
-
-        /* read in file and obtain schema */
-        GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
-        InputStream hdfsInputStream = fs.open(lastFile);
-        DataFileStream<Object> avroDataStream = new DataFileStream<Object>(hdfsInputStream, avroReader);
-        Schema ret = avroDataStream.getSchema();
-        avroDataStream.close();
-
-        return ret;
+        return AvroStorageUtils.getSchema(path, fs);
     }
 
     /**
@@ -538,16 +534,19 @@ public class AvroStorage extends FileInp
                 AvroStorageLog.details("data path=" + path.toUri().toString());
                 FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
                 outputAvroSchema = getAvroSchema(path, fs);
+                userSpecifiedAvroSchema = outputAvroSchema;
             } else if (name.equalsIgnoreCase("nullable")) {
                 nullable = (Boolean) value;
             } else if (name.equalsIgnoreCase("schema")) {
                 outputAvroSchema = Schema.parse((String) value);
+                userSpecifiedAvroSchema = outputAvroSchema;
             } else if (name.equalsIgnoreCase("schema_uri")) {
                 /* use the contents of the specified path as output schema */
                 Path path = new Path( ((String) value).trim());
                 AvroStorageLog.details("schema_uri path=" + path.toUri().toString());
                 FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
                 outputAvroSchema = getSchemaFromFile(path, fs);
+                userSpecifiedAvroSchema = outputAvroSchema;
             } else if (name.matches("field\\d+")) {
                 /*set schema of dth field */
                 if (fields == null)

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java?rev=1483127&r1=1483126&r2=1483127&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java Wed May 15 23:46:14 2013
@@ -19,6 +19,7 @@ package org.apache.pig.piggybank.storage
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -32,6 +33,8 @@ import java.util.Set;
 import java.net.URI;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.file.DataFileStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -672,4 +675,28 @@ public class AvroStorageUtils {
         }
     }
 
+    /**
+     * This method is called by {@link #getAvroSchema}. The default implementation
+     * returns the schema of an avro file; or the schema of the last file in a first-level
+     * directory (it does not contain sub-directories).
+     *
+     * @param path  path of a file or first level directory
+     * @param fs  file system
+     * @return avro schema
+     * @throws IOException
+     */
+    public static Schema getSchema(Path path, FileSystem fs) throws IOException {
+        /* get path of the last file */
+        Path lastFile = AvroStorageUtils.getLast(path, fs);
+
+        /* read in file and obtain schema */
+        GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
+        InputStream hdfsInputStream = fs.open(lastFile);
+        DataFileStream<Object> avroDataStream = new DataFileStream<Object>(hdfsInputStream, avroReader);
+        Schema ret = avroDataStream.getSchema();
+        avroDataStream.close();
+
+        return ret;
+    }
+
 }

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java?rev=1483127&r1=1483126&r2=1483127&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java Wed May 15 23:46:14 2013
@@ -60,11 +60,17 @@ public class PigAvroDatumReader extends 
     @Override
     protected Object readRecord(Object old, Schema expected, ResolvingDecoder in) throws IOException {
 
+        // find out the order in which we will receive fields from the ResolvingDecoder
+        Field[] readOrderedFields = in.readFieldOrder();
+
         /* create an empty tuple */
-        Tuple tuple = (Tuple) newRecord(old, expected);
+        Tuple tuple = TupleFactory.getInstance().newTuple(readOrderedFields.length);
 
-        for (Field f : in.readFieldOrder()) {
-            tuple.append(read(null, f.schema(), in));
+        /* read fields and put in output order in tuple
+         * The ResolvingDecoder figures out the writer schema to reader schema mapping for us
+         */
+        for (Field f : readOrderedFields) {
+            tuple.set(f.pos(), read(old, f.schema(), in));
         }
 
         return tuple;

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java?rev=1483127&r1=1483126&r2=1483127&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java Wed May 15 23:46:14 2013
@@ -39,7 +39,7 @@ import org.apache.hadoop.mapreduce.lib.i
  */
 public class PigAvroInputFormat extends FileInputFormat<NullWritable, Writable> {
 
-    private Schema schema = null;  /* avro schema */
+    private Schema readerSchema = null;  /* avro schema */
     private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
 
     /* if multiple avro record schemas are merged, this map associates each input
@@ -56,14 +56,14 @@ public class PigAvroInputFormat extends 
 
     /**
      * constructor called by AvroStorage to pass in schema and ignoreBadFiles.
-     * @param schema input data schema
+     * @param readerSchema reader schema
      * @param ignoreBadFiles whether ignore corrupted files during load
      * @param schemaToMergedSchemaMap map that associates each input record
      * with a remapping of its fields relative to the merged schema
      */
-    public PigAvroInputFormat(Schema schema, boolean ignoreBadFiles,
+    public PigAvroInputFormat(Schema readerSchema, boolean ignoreBadFiles,
             Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap) {
-        this.schema = schema;
+        this.readerSchema = readerSchema;
         this.ignoreBadFiles = ignoreBadFiles;
         this.schemaToMergedSchemaMap = schemaToMergedSchemaMap;
     }
@@ -78,7 +78,7 @@ public class PigAvroInputFormat extends 
     createRecordReader(InputSplit split, TaskAttemptContext context)
     throws IOException,  InterruptedException {
         context.setStatus(split.toString());
-        return new PigAvroRecordReader(context, (FileSplit) split, schema,
+        return new PigAvroRecordReader(context, (FileSplit) split, readerSchema,
                 ignoreBadFiles, schemaToMergedSchemaMap);
     }
 

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1483127&r1=1483126&r2=1483127&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java Wed May 15 23:46:14 2013
@@ -24,6 +24,7 @@ import org.apache.avro.file.DataFileRead
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -68,16 +69,25 @@ public class PigAvroRecordReader extends
      * constructor to initialize input and avro data reader
      */
     public PigAvroRecordReader(TaskAttemptContext context, FileSplit split,
-            Schema schema, boolean ignoreBadFiles,
+            Schema readerSchema, boolean ignoreBadFiles,
             Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap) throws IOException {
         this.path = split.getPath();
         this.in = new AvroStorageInputStream(path, context);
-        if(schema == null) {
+        if(readerSchema == null) {
             AvroStorageLog.details("No avro schema given; assuming the schema is embedded");
         }
 
+        Schema writerSchema;
         try {
-          this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(schema));
+            FileSystem fs = FileSystem.get(path.toUri(), context.getConfiguration());
+            writerSchema = AvroStorageUtils.getSchema(path, fs);
+        } catch (IOException e) {
+            AvroStorageLog.details("No avro writer schema found in '"+path+"'; assuming writer schema matches reader schema");
+            writerSchema = null;
+        }
+
+        try {
+            this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(writerSchema, readerSchema));
         } catch (IOException e) {
           throw new IOException("Error initializing data file reader for file (" +
               split.getPath() + ")", e);

Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1483127&r1=1483126&r2=1483127&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Wed May 15 23:46:14 2013
@@ -180,6 +180,7 @@ public class TestAvroStorage {
     final private String testCorruptedFile = getInputFile("test_corrupted_file.avro");
     final private String testMultipleSchemas1File = getInputFile("test_primitive_types/*");
     final private String testMultipleSchemas2File = getInputFile("test_complex_types/*");
+    final private String testUserDefinedLoadSchemaFile = getInputFile("test_user_defined_load_schema/*");
 
     @BeforeClass
     public static void setup() throws ExecException, IOException {
@@ -584,6 +585,41 @@ public class TestAvroStorage {
     }
 
     @Test
+    public void testUserDefinedLoadSchema() throws IOException {
+        // Verify that user specified schema correctly maps to input schemas
+        // Input Avro files have the following schemas:
+        //   name:"string", address:[customField1:"int", addressLine:"string"]
+        //   address:[addressLine:"string", customField2:"int"], name:"string"
+        // User Avro schema looks like this:
+        //   name:"string", address:[customField1:"int", customField2:"int", customField3:"int"]
+        // This test will confirm that AvroStorage correctly maps fields from writer to reader schema,
+        // dropping, adding, and reordering fields where needed.
+        String output= outbasedir + "testUserDefinedLoadSchema";
+        String expected = basedir + "expected_testUserDefinedLoadSchema.avro";
+        String customSchema = 
+                    "{\"type\": \"record\", \"name\": \"employee\", \"fields\": [ "
+                        +"{ \"default\": \"***\", \"type\": \"string\", \"name\": \"name\" }, "
+                        +"{ \"name\": \"address\", \"type\": { "
+                            +"\"type\": \"record\", \"name\": \"addressDetails\", \"fields\": [ "
+                                +"{ \"default\": 0, \"type\": \"int\", \"name\": \"customField1\" }, "
+                                +"{ \"default\": 0, \"type\": \"int\", \"name\": \"customField2\" }, "
+                                +"{ \"default\": 0, \"type\": \"int\", \"name\": \"customField3\" } "
+                            +"] "
+                        +"} } "
+                    +"] } ";
+
+        deleteDirectory(new File(output));
+        String [] queries = {
+            " in = LOAD '" + testUserDefinedLoadSchemaFile
+                + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('schema', '" + customSchema + "');",
+            " o = ORDER in BY name;",
+            " STORE o INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();" 
+           };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+
+    @Test
     public void testDir() throws IOException {
         // Verify that all files in a directory including its sub-directories are loaded.
         String output= outbasedir + "testDir";

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testUserDefinedLoadSchema.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testUserDefinedLoadSchema.avro?rev=1483127&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testUserDefinedLoadSchema.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input1.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input1.avro?rev=1483127&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input1.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input2.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input2.avro?rev=1483127&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input2.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream