You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/06/29 00:54:06 UTC

svn commit: r1497954 [1/3] - in /pig/trunk: ./ .eclipse.templates/ ivy/ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/builtin/ src/org/apache/pig/impl/util/avro/ test/ test/org/apache/pig/builtin/ test/org/apache/pig/builtin/avro/ test/o...

Author: cheolsoo
Date: Fri Jun 28 22:54:05 2013
New Revision: 1497954

URL: http://svn.apache.org/r1497954
Log:
PIG-3015: Rewrite of AvroStorage (jadler via cheolsoo)

Added:
    pig/trunk/src/org/apache/pig/builtin/AvroStorage.java
    pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java
    pig/trunk/src/org/apache/pig/impl/util/avro/
    pig/trunk/src/org/apache/pig/impl/util/avro/AvroArrayReader.java
    pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java
    pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java
    pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordReader.java
    pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java
    pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
    pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java
    pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
    pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java
    pig/trunk/test/org/apache/pig/builtin/avro/
    pig/trunk/test/org/apache/pig/builtin/avro/code/
    pig/trunk/test/org/apache/pig/builtin/avro/code/pig/
    pig/trunk/test/org/apache/pig/builtin/avro/code/pig/directory_test.pig
    pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity.pig
    pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ai1_ao2.pig
    pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ao2.pig
    pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_blank_first_args.pig
    pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_codec.pig
    pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_just_ao2.pig
    pig/trunk/test/org/apache/pig/builtin/avro/code/pig/namesWithDoubleColons.pig
    pig/trunk/test/org/apache/pig/builtin/avro/code/pig/projection_test.pig
    pig/trunk/test/org/apache/pig/builtin/avro/code/pig/recursive_tests.pig
    pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_avro.pig
    pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_trevni.pig
    pig/trunk/test/org/apache/pig/builtin/avro/code/pig/with_dates.pig
    pig/trunk/test/org/apache/pig/builtin/avro/data/
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/arrays.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/arraysAsOutputByPig.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/projectionTest.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordWithRepeatedSubRecords.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/records.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsAsOutputByPig.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsAsOutputByPigWithDates.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsOfArrays.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsOfArraysOfRecords.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsSubSchema.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsSubSchemaNullable.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithDoubleUnderscores.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithEnums.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithFixed.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithMaps.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithMapsOfRecords.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithNullableUnions.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithSimpleUnion.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithSimpleUnionOutput.json
    pig/trunk/test/org/apache/pig/builtin/avro/data/json/recursiveRecord.json
    pig/trunk/test/org/apache/pig/builtin/avro/schema/
    pig/trunk/test/org/apache/pig/builtin/avro/schema/arrays.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/arraysAsOutputByPig.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/projectionTest.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordWithRepeatedSubRecords.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/records.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsAsOutputByPig.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsAsOutputByPigWithDates.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsOfArrays.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsOfArraysOfRecords.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsSubSchema.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsSubSchemaNullable.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithDoubleUnderscores.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithEnums.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithFixed.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithMaps.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithMapsOfRecords.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithNullableUnions.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithSimpleUnion.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithSimpleUnionOutput.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/recursiveRecord.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/simpleRecordsTrevni.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/testDirectory.avsc
    pig/trunk/test/org/apache/pig/builtin/avro/schema/testDirectoryCounts.avsc
Modified:
    pig/trunk/.eclipse.templates/.classpath
    pig/trunk/CHANGES.txt
    pig/trunk/build.xml
    pig/trunk/ivy.xml
    pig/trunk/ivy/libraries.properties
    pig/trunk/src/docs/src/documentation/content/xdocs/func.xml
    pig/trunk/test/unit-tests

Modified: pig/trunk/.eclipse.templates/.classpath
URL: http://svn.apache.org/viewvc/pig/trunk/.eclipse.templates/.classpath?rev=1497954&r1=1497953&r2=1497954&view=diff
==============================================================================
--- pig/trunk/.eclipse.templates/.classpath (original)
+++ pig/trunk/.eclipse.templates/.classpath Fri Jun 28 22:54:05 2013
@@ -53,9 +53,11 @@
 	<classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/servlet-api-2.5-6.1.14.jar"/>
 	<classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/slf4j-api-1.6.1.jar"/>
 	<classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/slf4j-log4j12-1.6.1.jar"/>
-  <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/xmlenc-0.52.jar"/>
-  <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/avro-1.7.4.jar" />
-  <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/json-simple-1.1.jar" />
+    <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/xmlenc-0.52.jar"/>
+    <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/avro-1.7.4.jar" />
+    <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/nodeps-1.7.4.jar" />
+    <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/json-simple-1.1.jar" />
+    <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/snappy-java-1.0.5-M3.jar" />
 	<classpathentry exported="true" kind="lib" path="lib/automaton.jar"/>
 	<classpathentry kind="output" path="build/classes"/>
 </classpath>

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1497954&r1=1497953&r2=1497954&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Jun 28 22:54:05 2013
@@ -24,10 +24,12 @@ INCOMPATIBLE CHANGES
 
 PIG-3191: [piggybank] MultiStorage output filenames are not sortable (Danny Antonelli via jcoveney)
 
-PIG-3174:  Remove rpm and deb artifacts from build.xml (gates)
+PIG-3174: Remove rpm and deb artifacts from build.xml (gates)
 
 IMPROVEMENTS
 
+PIG-3015: Rewrite of AvroStorage (jadler via cheolsoo)
+
 PIG-3361: Improve Hadoop version detection logic for Pig unit test (daijy)
 
 PIG-3280: Document IN operator and CASE expression (cheolsoo)

Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1497954&r1=1497953&r2=1497954&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Fri Jun 28 22:54:05 2013
@@ -331,6 +331,11 @@
             <include name="guava-${guava.version}.jar"/>
             <include name="automaton-${automaton.version}.jar"/>
             <include name="jansi-${jansi.version}.jar"/>
+            <include name="avro-${avro.version}.jar"/>     
+            <include name="avro-mapred-${avro.version}.jar"/>     
+            <include name="trevni-core-${avro.version}.jar"/>     
+            <include name="trevni-avro-${avro.version}.jar"/>
+            <include name="snappy-java-${snappy.version}.jar"/>
             <include name="asm*.jar"/>
         </patternset>
     </fileset>
@@ -344,11 +349,15 @@
             <include name="junit-${junit.version}.jar"/>
             <include name="jsch-${jsch.version}.jar"/>
             <include name="protobuf-java-${protobuf-java.version}.jar"/>
-            <include name="avro-${avro.version}.jar"/>
             <include name="commons*.jar"/>
             <include name="log4j*.jar"/>
             <include name="slf4j*.jar"/>
             <include name="jsp-api*.jar"/>
+            <include name="avro-${avro.version}.jar"/>
+            <include name="avro-mapred-${avro.version}.jar"/>    
+            <include name="trevni-core-${avro.version}.jar"/>    
+            <include name="trevni-avro-${avro.version}.jar"/>
+            <include name="snappy-java-${snappy.version}.jar"/>
             <include name="asm*.jar"/>
         </patternset>
     </fileset>

Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1497954&r1=1497953&r2=1497954&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Fri Jun 28 22:54:05 2013
@@ -172,16 +172,28 @@
       conf="compile->master"/>
     <dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}"
       conf="compile->master;checkstyle->master"/>
-     <dependency org="org.slf4j" name="slf4j-log4j12" rev="${slf4j-log4j12.version}"
+    <dependency org="org.slf4j" name="slf4j-log4j12" rev="${slf4j-log4j12.version}"
       conf="compile->master;test->master"/>
     <dependency org="commons-cli" name="commons-cli" rev="${commons-cli.version}"
       conf="compile->master;checkstyle->master"/>
-
     <dependency org="org.apache.avro" name="avro" rev="${avro.version}"
       conf="compile->default;checkstyle->master"/>
+    <dependency org="org.apache.avro" name="avro-mapred" rev="${avro.version}"
+      conf="compile->default;checkstyle->master"/>
+    <dependency org="org.apache.avro" name="trevni-core" rev="${avro.version}"
+      conf="compile->default;checkstyle->master"/>
+    <dependency org="org.apache.avro" name="trevni-avro" rev="${avro.version}"
+      conf="compile->default;checkstyle->master">
+      <exclude org="org.apache.hadoop" module="hadoop-core"/>
+    </dependency>
+    <dependency org="org.apache.avro" name="avro-tools" rev="${avro.version}"
+      conf="test->default">
+      <artifact name="nodeps" type="jar"/>
+    </dependency>
+    <dependency org="org.xerial.snappy" name="snappy-java" rev="${snappy.version}"
+      conf="compile->default;checkstyle->master"/>
     <dependency org="com.googlecode.json-simple" name="json-simple" rev="${json-simple.version}"
       conf="compile->master;checkstyle->master"/>
-
     <dependency org="jdiff" name="jdiff" rev="${jdiff.version}"
       conf="jdiff->default"/>
     <dependency org="xalan" name="xalan" rev="${xalan.version}"

Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1497954&r1=1497953&r2=1497954&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Fri Jun 28 22:54:05 2013
@@ -86,3 +86,4 @@ jsr311-api.version=1.1.1
 mockito.version=1.8.4
 jansi.version=1.9
 asm.version=3.3.1
+snappy.version=1.0.5-M3

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/func.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/func.xml?rev=1497954&r1=1497953&r2=1497954&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/func.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/func.xml Fri Jun 28 22:54:05 2013
@@ -1755,6 +1755,342 @@ STORE A INTO 'hbase://users_table' USING
        rowKey.</p>
    </section>
    </section>
+
+  <!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
+   <section id="AvroStorage">
+   <title>AvroStorage</title>
+   <p>Loads and stores data from Avro files.</p>
+
+   <section>
+   <title>Syntax</title>
+   <table>
+       <tr>
+            <td>
+               <p>Avrostorage(['schema|record name'], ['options'])</p>
+            </td>
+         </tr>
+   </table>
+   </section>
+
+   <section>
+   <title>Terms</title>
+   <table>
+       <tr>
+            <td>
+               <p>schema</p>
+            </td>
+            <td>
+               <p>A JSON string specifying the Avro schema for the input. You may specify an explicit schema
+                  when storing data or when loading data. When you manually provide a schema, Pig
+                  will use the provided schema for serialization and deserialization. This means that
+                  you can provide an explicit schema when saving data to simplify the output (for example
+                  by removing nullable unions), or rename fields. This also means that you can provide
+                  an explicit schema when reading data to only read a subset of the fields in each record.</p>
+                  
+                  <p>See 
+                  <a href="http://avro.apache.org/docs/current/spec.html"> the Apache Avro Documentation</a>
+                  for more details on how to specify a valid schema.</p>
+            </td>
+         </tr>
+       <tr>
+            <td>
+               <p>record name</p>
+            </td>
+            <td>
+               <p>When storing a bag of tuples with AvroStorage, if you do not want to specify
+               the full schema, you may specify the avro record name instead. (AvroStorage will
+               determine that the argument isn't a valid schema definition and use it as a
+               variable name instead.)</p>
+            </td>
+         </tr>
+
+       <tr>
+            <td>
+               <p>'options'</p>
+            </td>
+            <td>
+               <p>A string that contains space-separated options (&lsquo;-optionA valueA -optionB valueB -optionC &rsquo;)</p>
+               <p>Currently supported options are:</p>
+               <ul>
+                <li>-namespace nameSpace or -n nameSpace Explicitly specify the namespace
+                    field in Avro records when storing data</li>
+                <li>-schemfile schemaFile or -f schemaFile Specify the input (or output) schema from
+                    an external file. Pig assumes that the file is located on the default filesystem,
+                    but you may use an explicity URL to unambigously specify the location. (For example, if
+                    the data was on your local file system in /stuff/schemafile.avsc, you
+                    could specify "-f file:///stuff/schemafile.avsc" to specify the location. If the
+                    data was on HDFS under /yourdirectory/schemafile.avsc, you could specify
+                    "-f hdfs:///yourdirectory/schemafile.avsc"). Pig expects this to be a 
+                    text file, containing a valid avro schema.</li>
+                <li>-examplefile exampleFile or -e exampleFile Specify the input (or output)
+                    schema using another Avro file as an example. Pig assumes that the file is located on the default filesystem,
+                    but you may use and explicity URL to specify the location. Pig
+                    expects this to be an Avro data file.</li>
+                <li>-allowrecursive or -r Specify whether to allow recursive schema definitions (the
+                    default is to throw an exception if Pig encounters a recursive schema). When 
+                    reading objects with recursive definitions, Pig will translate Avro records to
+                    schema-less tuples; the Pig Schema for the object may not match the data exactly.</li>
+                <li>-doublecolons or -d Specify how to handle Pig schemas that contain double-colons
+                    when writing data in Avro format. (When you join two bags in Pig, Pig will automatically
+                    label the fields in the output Tuples with names that contain double-colons). If
+                    you select this option, AvroStorage will translate names with double colons into
+                    names with double underscores. </li>
+               </ul>
+            </td>
+         </tr>
+   </table>
+   </section>
+
+   <section>
+   <title>Usage</title>
+   <p>AvroStorage stores and loads data from Avro files. Often, you can load and
+      store data using AvroStorage without knowing much about the Avros serialization format.
+      AvroStorage will attempt to automatically translate a pig schema and pig data to avro data,
+      or avro data to pig data.</p>
+   <p>By default, when you use AvoStorage to load data, AvroStorage will use depth first search to
+      find a valid Avro file on the input path, then use the schema from that file to load the
+      data. When you use AvroStorage to store data, AvroStorage will attempt to translate the
+      Pig schema to an equivalent Avro schema. You can manually specify the schema by providing
+      an explicit schema in Pig, loading a schema from an external schema file, or explicitly telling
+      Pig to read the schema from a specific avro file.</p>
+   <p>To compress your output with AvroStorage, you need to use the correct Avro properties for compression.
+       For example, to enable compression using deflate level 5, you would specify</p>
+<source>
+SET avro.output.codec 'deflate'
+SET avro.mapred.deflate.level 5
+</source>
+   <p>Valid values for avro.output.codec include deflate, snappy, and null.</p>
+   <p>There are a few key differences between Avro and Pig data, and in some cases
+      it helps to understand the differences between the Avro and Pig data models.
+      Before writing Pig data to Avro (or creating Avro files to use in Pig), keep in
+      mind that there might not be an equivalent Avro Schema for every Pig Schema (and
+      vice versa):</p>
+   <ul>
+    <li><strong>Recursive schema definitions</strong> You cannot define schemas recursively in Pig,
+      but you can define schemas recursively in Avro.</li>
+    <li><strong>Allowed characters</strong> Pig schemas may sometimes contain characters like colons (":")
+        that are illegal in Avro names.</li>
+    <li><strong>Unions</strong> In Avro, you can define an object that may be one of several different
+    types (including complex types such as records). In Pig, you cannot.</li>
+    <li><strong>Enums</strong> Avro allows you to define enums to efficiently and abstractly 
+        represent categorical variable, but Pig does not.</li>
+    <li><strong>Fixed Length Byte Arrays</strong> Avro allows you to define fixed length byte arrays, 
+        but Pig does not.</li>
+    <li><strong>Nullable values</strong> In Pig, all types are nullable. In Avro, they are not. </li>
+   </ul>
+   <p>Here is how AvroStorage translates Pig values to Avro:</p>
+   <table>
+    <tr>
+     <td></td>
+     <td>Original Pig Type</td>
+     <td>Translated Avro Type</td>
+    </tr>
+    <tr>
+     <td>Integers</td>
+     <td>int</td>
+     <td>["int","null"]</td>
+    </tr>
+    <tr>
+     <td>Longs</td>
+     <td>long</td>
+     <td>["long,"null"]</td>
+    </tr>
+    <tr>
+     <td>Floats</td>
+     <td>float</td>
+     <td>["float","null"]</td>
+    </tr>
+    <tr>
+     <td>Doubles</td>
+     <td>double</td>
+     <td>["double","null"]</td>
+    </tr>
+    <tr>
+     <td>Strings</td>
+     <td>chararray</td>
+     <td>["string","null"]</td>
+    </tr>
+    <tr>
+     <td>Bytes</td>
+     <td>bytearray</td>
+     <td>["bytes","null"]</td>
+    </tr>
+    <tr>
+     <td>Booleans</td>
+     <td>boolean</td>
+     <td>["boolean","null"]</td>
+    </tr>
+    <tr>
+     <td>Tuples</td>
+     <td>tuple</td>
+     <td>The Pig Tuple schema will be translated to an union of and Avro record with an equivalent
+         schem and null.</td>
+    </tr>
+    <tr>
+     <td>Bags of Tuples</td>
+     <td>bag</td>
+     <td>The Pig Tuple schema will be translated to a union of an array of records with an equivalent
+         schema and null.</td>
+    </tr>
+    <tr>
+     <td>Maps</td>
+     <td>map</td>
+     <td>The Pig Tuple schema will be translated to a union of a map of records with an equivalent
+         schema and null.</td>
+    </tr>
+   </table>
+
+   <p>Here is how AvroStorage translates Avro values to Pig:</p>
+   <table>
+    <tr>
+     <td></td>
+     <td>Original Avro Types</td>
+     <td>Translated Pig Type</td>
+    </tr>
+    <tr>
+     <td>Integers</td>
+     <td>["int","null"] or "int"</td>
+     <td>int</td>
+    </tr>
+    <tr>
+     <td>Longs</td>
+     <td>["long,"null"] or "long"</td>
+     <td>long</td>
+    </tr>
+    <tr>
+     <td>Floats</td>
+     <td>["float","null"] or "float"</td>
+     <td>float</td>
+    </tr>
+    <tr>
+     <td>Doubles</td>
+     <td>["double","null"] or "double"</td>
+     <td>double</td>
+    </tr>
+    <tr>
+     <td>Strings</td>
+     <td>["string","null"] or "string"</td> 
+     <td>chararray</td>
+    </tr>
+    <tr>
+     <td>Enums</td>
+     <td>Either an enum or a union of an enum and null</td> 
+     <td>chararray</td>
+    </tr>
+    <tr>
+     <td>Bytes</td>
+     <td>["bytes","null"] or "bytes"</td>
+     <td>bytearray</td>
+    </tr>
+    <tr>
+     <td>Fixes</td>
+     <td>Either a fixed length byte array, or a union of a fixed length array and null</td> 
+     <td>bytearray</td>
+    </tr>    
+    <tr>
+     <td>Booleans</td>
+     <td>["boolean","null"] or "boolean"</td>
+     <td>boolean</td>
+    </tr>
+    <tr>
+     <td>Tuples</td>
+     <td>Either a record type, or a union or a record and null</td>
+     <td>tuple</td>
+    </tr>
+    <tr>
+     <td>Bags of Tuples</td>
+     <td>Either an array, or a union of an array and null</td>
+     <td>bag</td>
+    </tr>
+    <tr>
+     <td>Maps</td>
+     <td>Either a map, or a union of a map and null</td>
+     <td>map</td>
+    </tr>
+   </table>
+
+   <p> In many cases, AvroStorage will automatically translate your data correctly and you will not
+       need to provide any more information to AvroStorage. But sometimes, it may be convenient to
+       manually provide a schema to AvroStorge. See the example selection below for examples
+       on manually specifying a schema with AvroStorage.
+   </p>   
+   </section>
+   <section>
+   <title>Load Examples</title>
+    <p>Suppose that you were provided with a file of avro data (located in 'stuff') 
+       with the following schema:</p>
+<source>
+{"type" : "record",
+ "name" : "stuff",
+ "fields" : [
+   {"name" : "label", "type" : "string"}, 
+   {"name" : "value", "type" : "int"},
+   {"name" : "marketingPlans", "type" : ["string", "bytearray", "null"]}
+  ]
+}
+</source>
+    <p>Additionally,  suppose that you don't need the value of the field "marketingPlans." 
+       (That's a good thing, because AvroStorage doesn't know how to translate that Avro schema
+       to a Pig schema). To load only the fieds "label" and "value" into Pig, you can
+       manually specify the schema passed to AvroStorage:</p>
+<source>
+measurements = LOAD 'stuff' USING AvroStorage(
+  '{"type":"record","name":"measurement","fields":[{"name":"label","type":"string"},{"name":"value","type":"int"}]}'
+  );
+</source>
+   </section>
+
+   <section>
+   <title>Store Examples</title>
+    <p>Suppose that you are saving a bag called measurements with the schema:</p>
+<source>
+measurements:{measurement:(label:chararray,value:int)}
+</source>
+    <p>To store this bag into a file called "measurements", you can use a statement like:</p>
+<source>
+STORE measurements INTO 'measurements' USING AvroStorage('measurement');
+</source>
+    <p>AvroStorage will translate this to the Avro schema</p>
+<source>
+{"type":"record", 
+ "name":"measurement",
+ "fields" : [
+   {"name" : "label", "type" : ["string", "null"]}, 
+   {"name" : "value", "type" : ["int", "null"]}
+  ]
+} 
+</source>
+    <p>But suppose that you knew that the label and value fields would never be null. You could
+       define a more precise schema manually using a statement like:</p>
+<source>
+STORE measurements INTO 'measurements' USING AvroStorage(
+  '{"type":"record","name":"measurement","fields":[{"name":"label","type":"string"},{"name":"value","type":"int"}]}'
+  );
+</source>
+   </section>
+   </section>
+
+  <!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
+   <section id="TrevniStorage">
+   <title>TrevniStorage</title>
+   <p>Loads and stores data from Trevni files.</p>
+
+   <section>
+   <title>Syntax</title>
+   <table>
+    <tr>
+     <td>
+      <p>TrevniStorage(['schema|record name'], ['options'])</p>
+     </td>
+    </tr>
+   </table>
+   </section>
+   <p>Trevni is a column-oriented storage format that is part of the Apache Avro project. Trevni is
+   closely related to Avro.</p>
+   <p>Likewise, TrevniStorage is very closely related to AvroStorage, and shares the same options as
+   AvroStorage. See <a href="#AvroStorage">AvroStorage</a> for a detailed description of the
+   arguments for TrevniStorage.</p>
+   </section>
 </section>
 
 <!-- ======================================================== -->  

Added: pig/trunk/src/org/apache/pig/builtin/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AvroStorage.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AvroStorage.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/AvroStorage.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,721 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.mapred.AvroOutputFormat;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.PigWarning;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.avro.AvroArrayReader;
+import org.apache.pig.impl.util.avro.AvroRecordReader;
+import org.apache.pig.impl.util.avro.AvroRecordWriter;
+import org.apache.pig.impl.util.avro.AvroStorageSchemaConversionUtilities;
+import org.apache.pig.impl.util.avro.AvroTupleWrapper;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Longs;
+
+/**
+ * Pig UDF for reading and writing Avro data.
+ *
+ */
+public class AvroStorage extends LoadFunc
+    implements StoreFuncInterface, LoadMetadata, LoadPushDown {
+
+  /**
+   *  Creates new instance of Pig Storage function, without specifying
+   *  the schema. Useful for just loading in data.
+   */
+  public AvroStorage() {
+    this(null, null);
+  }
+
+  /**
+   *  Creates new instance of Pig Storage function.
+   *  @param sn Specifies the input/output schema or record name.
+   */
+  public AvroStorage(final String sn) {
+    this(sn, null);
+  }
+
+  private String schemaName = "record";
+  private String schemaNameSpace = null;
+  protected boolean allowRecursive = false;
+  protected boolean doubleColonsToDoubleUnderscores = false;
+  protected Schema schema;
+  protected final Log log = LogFactory.getLog(getClass());
+
+  /**
+   *  Creates new instance of AvroStorage function, specifying output schema
+   *  properties.
+   *  @param sn Specifies the input/output schema or record name.
+   *  @param opts Options for AvroStorage:
+   *  <li><code>-namespace</code> Namespace for an automatically generated
+   *    output schema.</li>
+   *  <li><code>-schemafile</code> Specifies URL for avro schema file
+   *    from which to read the input schema (can be local file, hdfs,
+   *    url, etc).</li>
+   *  <li><code>-examplefile</code> Specifies URL for avro data file from
+   *    which to copy the input schema (can be local file, hdfs, url, etc).</li>
+   *  <li><code>-allowrecursive</code> Option to allow recursive schema
+   *    definitions (default is false).</li>
+   *  <li><code>-doublecolons</code> Option to translate Pig schema names
+   *    with double colons to names with double underscores (default is false).</li>
+   *
+   */
+  public AvroStorage(final String sn, final String opts) {
+    super();
+
+    if (sn != null) {
+      try {
+        Schema s = (new Schema.Parser()).parse(sn);
+        // must be a valid schema
+        setInputAvroSchema(s);
+        setOutputAvroSchema(s);
+      } catch (SchemaParseException e) {
+        // not a valid schema, use as a record name
+        schemaName = sn;
+      }
+    }
+
+    if (opts != null) {
+      String[] optsArr = opts.split(" ");
+      Options validOptions = new Options();
+      try {
+        CommandLineParser parser = new GnuParser();
+        validOptions.addOption("n", "namespace", true,
+            "Namespace for an automatically generated output schema");
+        validOptions.addOption("f", "schemafile", true,
+            "Specifies URL for avro schema file from which to read "
+            + "the input or output schema");
+        validOptions.addOption("e", "examplefile", true,
+            "Specifies URL for avro data file from which to copy "
+            + "the output schema");
+        validOptions.addOption("r", "allowrecursive", false,
+            "Option to allow recursive schema definitions (default is false)");
+        validOptions.addOption("d", "doublecolons", false,
+            "Option to translate Pig schema names with double colons "
+            + "to names with double underscores (default is false)");
+        CommandLine configuredOptions = parser.parse(validOptions, optsArr);
+        schemaNameSpace = configuredOptions.getOptionValue("namespace", null);
+        allowRecursive = configuredOptions.hasOption('r');
+        doubleColonsToDoubleUnderscores = configuredOptions.hasOption('d');
+
+        if (configuredOptions.hasOption('f')) {
+          try {
+            Path p = new Path(configuredOptions.getOptionValue('f'));
+            Schema s = new Schema.Parser()
+              .parse((FileSystem.get(p.toUri(), new Configuration()).open(p)));  
+            setInputAvroSchema(s);
+            setOutputAvroSchema(s);
+          } catch (FileNotFoundException fnfe) {
+            System.err.printf("file not found exception\n");
+            log.warn("Schema file not found when instantiating AvroStorage. (If the " + 
+                "schema was described in a local file on the front end, and this message " + 
+                "is in the back end log, you can ignore this mesasge.)", fnfe);
+          }
+        } else if (configuredOptions.hasOption('e')) {
+          setOutputAvroSchema(
+              getAvroSchema(configuredOptions.getOptionValue('e'),
+                  new Job(new Configuration())));
+        }
+
+      } catch (ParseException e) {
+        log.error("Exception in AvroStorage", e);
+        log.error("AvroStorage called with arguments " + sn + ", " + opts);
+        warn("ParseException in AvroStorage", PigWarning.UDF_WARNING_1);
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("AvroStorage(',', '[options]')", validOptions);
+        throw new RuntimeException(e);
+      } catch (IOException e) {
+        log.warn("Exception in AvroStorage", e);
+        log.warn("AvroStorage called with arguments " + sn + ", " + opts);
+        warn("IOException in AvroStorage", PigWarning.UDF_WARNING_1);
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
+   * Context signature for this UDF instance.
+   */
+  protected String udfContextSignature = null;
+
+  @Override
+  public final void setUDFContextSignature(final String signature) {
+    udfContextSignature = signature;
+    super.setUDFContextSignature(signature);
+  }
+
+  /**
+   * Internal function for getting the Properties object associated with
+   * this UDF instance.
+   * @return The Properties object associated with this UDF instance
+   */
+  protected final Properties getProperties() {
+    if (udfContextSignature == null) {
+      return getProperties(AvroStorage.class, null);
+    } else {
+      return getProperties(AvroStorage.class, udfContextSignature);
+    }
+  }
+
+  /**
+   * Internal function for getting the Properties object associated with
+   * this UDF instance.
+   * @param c Class of this UDF
+   * @param signature Signature string
+   * @return The Properties object associated with this UDF instance
+   */
+  @SuppressWarnings("rawtypes")
+  protected final Properties getProperties(final Class c,
+      final String signature) {
+    UDFContext context = UDFContext.getUDFContext();
+    if (signature == null) {
+      return context.getUDFProperties(c);
+    } else {
+      return context.getUDFProperties(c, new String[] {signature});
+    }
+
+  }
+
+  /*
+   * @see org.apache.pig.LoadMetadata#getSchema(java.lang.String,
+   * org.apache.hadoop.mapreduce.Job)
+   */
+  @Override
+  public final ResourceSchema getSchema(final String location,
+      final Job job) throws IOException {
+    if (schema == null) {
+      Schema s = getAvroSchema(location, job);
+      setInputAvroSchema(s);
+    }
+
+    ResourceSchema rs = AvroStorageSchemaConversionUtilities
+        .avroSchemaToResourceSchema(schema, allowRecursive);
+
+    return rs;
+  }
+
+  /**
+   * Reads the avro schema at the specified location.
+   * @param location Location of file
+   * @param job Hadoop job object
+   * @return an Avro Schema object derived from the specified file
+   * @throws IOException
+   *
+   */
+  protected final  Schema getAvroSchema(final String location,
+      final Job job) throws IOException {
+    return getAvroSchema(new Path(location), job);
+  }
+
+  /**
+   * A PathFilter that filters out invisible files.
+   */
+  protected static final PathFilter VISIBLE_FILES = new PathFilter() {
+    @Override
+    public boolean accept(final Path p) {
+      return (!(p.getName().startsWith("_") || p.getName().startsWith(".")));
+    }
+  };
+
+  /**
+   * Reads the avro schema at the specified location.
+   * @param p Location of file
+   * @param job Hadoop job object
+   * @return an Avro Schema object derived from the specified file
+   * @throws IOException
+   *
+   */
+  public Schema getAvroSchema(final Path p, final Job job)
+      throws IOException {
+    GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
+    FileSystem fs = FileSystem.get(p.toUri(), job.getConfiguration());
+    FileStatus[] statusArray = fs.globStatus(p);
+
+    if (statusArray == null) {
+      throw new IOException("Path " + p.toString() + " does not exist.");
+    }
+
+    if (statusArray.length == 0) {
+      throw new IOException("No path matches pattern " + p.toString());
+    }
+
+    Path filePath = depthFirstSearchForFile(statusArray, fs);
+
+    if (filePath == null) {
+      throw new IOException("No path matches pattern " + p.toString());
+    }
+
+    InputStream hdfsInputStream = fs.open(filePath);
+    DataFileStream<Object> avroDataStream =
+        new DataFileStream<Object>(hdfsInputStream, avroReader);
+    Schema s = avroDataStream.getSchema();
+    avroDataStream.close();
+    return s;
+  }
+
+  /**
+   * Finds a valid path for a file from a FileStatus object.
+   * @param fileStatus FileStatus object corresponding to a file,
+   * or a directory.
+   * @param fileSystem FileSystem in with the file should be found
+   * @return The first file found
+   * @throws IOException
+   */
+
+  private Path depthFirstSearchForFile(final FileStatus fileStatus,
+      final FileSystem fileSystem) throws IOException {
+    if (fileSystem.isFile(fileStatus.getPath())) {
+      return fileStatus.getPath();
+    } else {
+      return depthFirstSearchForFile(
+          fileSystem.listStatus(fileStatus.getPath(), VISIBLE_FILES),
+          fileSystem);
+    }
+
+  }
+
+  /**
+   * Finds a valid path for a file from an array of FileStatus objects.
+   * @param statusArray Array of FileStatus objects in which to search
+   * for the file.
+   * @param fileSystem FileSystem in which to search for the first file.
+   * @return The first file found.
+   * @throws IOException
+   */
+  protected Path depthFirstSearchForFile(final FileStatus[] statusArray,
+      final FileSystem fileSystem) throws IOException {
+
+    // Most recent files first
+    Arrays.sort(statusArray,
+        new Comparator<FileStatus>() {
+          @Override
+          public int compare(final FileStatus fs1, final FileStatus fs2) {
+              return Longs.compare(fs2.getModificationTime(),fs1.getModificationTime());
+            }
+          }
+    );
+
+    for (FileStatus f : statusArray) {
+      Path p = depthFirstSearchForFile(f, fileSystem);
+      if (p != null) {
+        return p;
+      }
+    }
+
+    return null;
+
+  }
+
+  /*
+   * @see org.apache.pig.LoadMetadata#getStatistics(java.lang.String,
+   * org.apache.hadoop.mapreduce.Job)
+   */
+  @Override
+  public final ResourceStatistics getStatistics(final String location,
+      final Job job) throws IOException {
+    return null;
+  }
+
+  /*
+   * @see org.apache.pig.LoadMetadata#getPartitionKeys(java.lang.String,
+   * org.apache.hadoop.mapreduce.Job)
+   */
+  @Override
+  public final String[] getPartitionKeys(final String location,
+      final Job job) throws IOException {
+    return null;
+  }
+
+  /*
+   * @see
+   * org.apache.pig.LoadMetadata#setPartitionFilter(org.apache.pig.Expression)
+   */
+  @Override
+  public void setPartitionFilter(final Expression partitionFilter)
+      throws IOException {
+  }
+
+  /*
+   * @see
+   * org.apache.pig.StoreFuncInterface#relToAbsPathForStoreLocation(java.lang
+   * .String, org.apache.hadoop.fs.Path)
+   */
+  @Override
+  public final String relToAbsPathForStoreLocation(final String location,
+      final Path curDir) throws IOException {
+    return LoadFunc.getAbsolutePath(location, curDir);
+  }
+
+  /*
+   * @see org.apache.pig.StoreFuncInterface#getOutputFormat()
+   */
+  @Override
+  public OutputFormat<NullWritable, Object> getOutputFormat()
+      throws IOException {
+
+    /**
+     * Hadoop output format for AvroStorage.
+     */
+    class AvroStorageOutputFormat extends
+      FileOutputFormat<NullWritable, Object> {
+
+      @Override
+      public RecordWriter<NullWritable, Object> getRecordWriter(
+          final TaskAttemptContext tc) throws IOException,
+      InterruptedException {
+
+        return new AvroRecordWriter(
+            // avroStorageOutputFormatSchema,
+            getDefaultWorkFile(tc, AvroOutputFormat.EXT),
+            tc.getConfiguration());
+
+      }
+    }
+
+    return new AvroStorageOutputFormat();
+
+  }
+
+  /*
+   * @see org.apache.pig.StoreFuncInterface#setStoreLocation(java.lang.String,
+   * org.apache.hadoop.mapreduce.Job)
+   */
+  @Override
+  public final void setStoreLocation(final String location,
+      final Job job) throws IOException {
+    FileOutputFormat.setOutputPath(job, new Path(location));
+  }
+
+  /**
+   * Pig property name for the output avro schema.
+   */
+  public static final String OUTPUT_AVRO_SCHEMA =
+      "org.apache.pig.builtin.AvroStorage.output.schema";
+
+  /*
+   * @see
+   * org.apache.pig.StoreFuncInterface#checkSchema(org.apache.pig.ResourceSchema
+   * )
+   */
+  @Override
+  public final void checkSchema(final ResourceSchema rs) throws IOException {
+    if (rs == null) {
+      throw new IOException("checkSchema: called with null ResourceSchema");
+    }
+    Schema avroSchema = AvroStorageSchemaConversionUtilities
+        .resourceSchemaToAvroSchema(rs,
+            (schemaName == null || schemaName.length() == 0)
+                ? "pig_output" : schemaName,
+                schemaNameSpace,
+                Maps.<String, List<Schema>> newHashMap(),
+                doubleColonsToDoubleUnderscores);
+    if (avroSchema == null) {
+      throw new IOException("checkSchema: could not translate ResourceSchema to Avro Schema");
+    }
+    setOutputAvroSchema(avroSchema);
+  }
+
+  /**
+   * Sets the output avro schema to {@s}.
+   * @param s An Avro schema
+   */
+  protected final void setOutputAvroSchema(final Schema s) {
+    schema = s;
+    getProperties()
+      .setProperty(OUTPUT_AVRO_SCHEMA, s.toString());
+  }
+
+  /**
+   * Utility function that gets the output schema from the udf
+   * properties for this instance of the store function.
+   * @return the output schema associated with this UDF
+   */
+  protected final Schema getOutputAvroSchema() {
+    if (schema == null) {
+      String schemaString = 
+              getProperties()
+          .getProperty(OUTPUT_AVRO_SCHEMA);
+      if (schemaString != null) {
+        schema = (new Schema.Parser()).parse(schemaString);
+      }
+    }
+    return schema;
+
+  }
+  
+  /**
+   * RecordWriter used by this UDF instance.
+   */
+  private RecordWriter<NullWritable, Object> writer;
+
+  /*
+   * @see
+   * org.apache.pig.StoreFuncInterface#prepareToWrite(org.apache.hadoop.mapreduce
+   * .RecordWriter)
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Override
+  public final void prepareToWrite(final RecordWriter w) throws IOException {
+    if (this.udfContextSignature == null)
+      throw new IOException(this.getClass().toString() + ".prepareToWrite called without setting udf context signature");
+    writer = (RecordWriter<NullWritable, Object>) w;
+    ((AvroRecordWriter) writer).prepareToWrite(getOutputAvroSchema());
+  }
+
+  /*
+   * @see org.apache.pig.StoreFuncInterface#putNext(org.apache.pig.data.Tuple)
+   */
+  @Override
+  public final void putNext(final Tuple t) throws IOException {
+    try {
+      writer.write(null, t);
+    } catch (InterruptedException e) {
+      log.error("InterruptedException in putNext");
+      throw new IOException(e);
+    }
+  }
+
+  /*
+   * @see
+   * org.apache.pig.StoreFuncInterface#setStoreFuncUDFContextSignature(java.
+   * lang.String)
+   */
+  @Override
+  public final void setStoreFuncUDFContextSignature(final String signature) {
+    udfContextSignature = signature;
+    super.setUDFContextSignature(signature);
+  }
+
+  /*
+   * @see org.apache.pig.StoreFuncInterface#cleanupOnFailure(java.lang.String,
+   * org.apache.hadoop.mapreduce.Job)
+   */
+  @Override
+  public final void cleanupOnFailure(final String location,
+      final Job job) throws IOException {
+    StoreFunc.cleanupOnFailureImpl(location, job);
+  }
+
+  /**
+   * Pig property name for the input avro schema.
+   */
+  public static final String INPUT_AVRO_SCHEMA =
+      "org.apache.pig.builtin.AvroStorage.input.schema";
+
+  /*
+   * @see org.apache.pig.LoadFunc#setLocation(java.lang.String,
+   * org.apache.hadoop.mapreduce.Job)
+   */
+
+  @Override
+  public void setLocation(final String location, final Job job)
+      throws IOException {
+    FileInputFormat.setInputPaths(job, location);
+    if (schema == null) {
+      schema = getInputAvroSchema();
+      if (schema == null) {
+        schema = getAvroSchema(location, job);
+        if (schema == null) {
+          throw new IOException(
+              "Could not determine avro schema for location " + location);
+        }
+        setInputAvroSchema(schema);
+      }
+    }
+  }
+
+  /**
+   * Sets the input avro schema to {@s}.
+   * @param s The specified schema
+   */
+  protected final void setInputAvroSchema(final Schema s) {
+    schema = s;
+    getProperties().setProperty(INPUT_AVRO_SCHEMA, s.toString());
+  }
+
+  /**
+   * Helper function reads the input avro schema from the UDF
+   * Properties.
+   * @return The input avro schema
+   */
+  public final Schema getInputAvroSchema() {
+    if (schema == null) {
+      String schemaString = getProperties().getProperty(INPUT_AVRO_SCHEMA);
+      if (schemaString != null) {
+        Schema s = new Schema.Parser().parse(schemaString);
+        schema = s;
+      }
+    }
+    return schema;
+  }
+
+  /*
+   * @see org.apache.pig.LoadFunc#getInputFormat()
+   */
+  @Override
+  public InputFormat<NullWritable, GenericData.Record> getInputFormat()
+      throws IOException {
+
+    return new org.apache.pig.backend.hadoop.executionengine.mapReduceLayer
+        .PigFileInputFormat<NullWritable, GenericData.Record>() {
+
+      @Override
+      public RecordReader<NullWritable, GenericData.Record>
+        createRecordReader(final InputSplit is, final TaskAttemptContext tc)
+          throws IOException, InterruptedException {
+        Schema s = getInputAvroSchema();
+        RecordReader<NullWritable, GenericData.Record> rr = null;
+        if (s.getType() == Type.ARRAY) {
+          rr = new AvroArrayReader(s);
+        } else {
+          rr = new AvroRecordReader(s);
+        }
+        rr.initialize(is, tc);
+        tc.setStatus(is.toString());
+        return rr;
+      }
+    };
+
+  }
+
+  @SuppressWarnings("rawtypes") private RecordReader reader;
+  PigSplit split;
+
+  /*
+   * @see
+   * org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader
+   * , org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit)
+   */
+  @SuppressWarnings("rawtypes")
+  @Override
+  public final void prepareToRead(final RecordReader r, final PigSplit s)
+      throws IOException {
+    reader = r;
+    split = s;
+  }
+
+  /*
+   * @see org.apache.pig.LoadFunc#getNext()
+   */
+
+  @Override
+  public final Tuple getNext() throws IOException {
+    try {
+      if (reader.nextKeyValue()) {
+        return new AvroTupleWrapper<GenericData.Record>(
+            (GenericData.Record) reader.getCurrentValue());
+      } else {
+        return null;
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Wrapped Interrupted Exception", e);
+    }
+  }
+
+  @Override
+  public void cleanupOnSuccess(final String location, final Job job)
+      throws IOException {
+  }
+
+  @Override
+  public List<OperatorSet> getFeatures() {
+      return Lists.newArrayList(LoadPushDown.OperatorSet.PROJECTION);
+  }
+
+  /**
+   * List of required fields passed by pig in a push down projection.
+   */
+  protected RequiredFieldList requiredFieldList;
+
+  /*
+   * @see
+   * org.apache.pig.LoadPushDown#pushProjection(org.apache.pig.LoadPushDown.
+   * RequiredFieldList)
+   */
+  @Override
+  public RequiredFieldResponse pushProjection(final RequiredFieldList rfl)
+          throws FrontendException {
+      requiredFieldList = rfl;
+      
+      Schema newSchema = AvroStorageSchemaConversionUtilities
+              .newSchemaFromRequiredFieldList(schema, rfl);
+      if (newSchema != null) {
+          schema = newSchema;
+          setInputAvroSchema(schema);
+          return new RequiredFieldResponse(true);
+      } else {
+          log.warn("could not select fields subset " + rfl + "\n");
+          warn("could not select fields subset", PigWarning.UDF_WARNING_2);
+          return new RequiredFieldResponse(false);
+      }
+
+  }
+
+}

Added: pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.avro.AvroRecordWriter;
+import org.apache.pig.impl.util.avro.AvroStorageDataConversionUtilities;
+import org.apache.trevni.ColumnFileMetaData;
+import org.apache.trevni.MetaData;
+import org.apache.trevni.avro.AvroColumnReader;
+import org.apache.trevni.avro.AvroColumnWriter;
+import org.apache.trevni.avro.AvroTrevniOutputFormat;
+import org.apache.trevni.avro.HadoopInput;
+
+import com.google.common.collect.Lists;
+
+/**
+ *
+ * Pig Store/Load Function for Trevni.
+ *
+ */
+
+public class TrevniStorage extends AvroStorage implements LoadPushDown{
+
+  /**
+   * Create new instance of TrevniStorage with no arguments (useful
+   * for loading files without specifying parameters).
+   */
+  public TrevniStorage() {
+    super();
+  }
+
+  /**
+   * Create new instance of TrevniStorage.
+   *  @param sn Specifies the input/output schema or record name.
+   *  @param opts Options for AvroStorage:
+   *  <li><code>-namespace</code> Namespace for an automatically generated
+   *    output schema.</li>
+   *  <li><code>-schemafile</code> Specifies URL for avro schema file
+   *    from which to read the input schema (can be local file, hdfs,
+   *    url, etc).</li>
+   *  <li><code>-examplefile</code> Specifies URL for avro data file from
+   *    which to copy the input schema (can be local file, hdfs, url, etc).</li>
+   *  <li><code>-allowrecursive</code> Option to allow recursive schema
+   *    definitions (default is false).</li>
+   */
+  public TrevniStorage(final String sn, final String opts) {
+    super(sn, opts);
+  }
+
+  /*
+   * @see org.apache.pig.LoadFunc#getInputFormat()
+   */
+  @Override
+  public InputFormat<NullWritable, GenericData.Record> getInputFormat()
+      throws IOException {
+
+    class TrevniStorageInputFormat
+    extends PigFileInputFormat<NullWritable, GenericData.Record> {
+
+      @Override protected boolean isSplitable(JobContext jc, Path p) {
+        return false;
+      }
+
+      @Override protected  List<FileStatus> listStatus(final JobContext job)
+          throws IOException {
+        List<FileStatus> results = Lists.newArrayList();
+        job.getConfiguration().setBoolean("mapred.input.dir.recursive", true);
+        for (FileStatus file : super.listStatus(job)) {
+          if (VISIBLE_FILES.accept(file.getPath())) {
+            results.add(file);
+          }
+        }
+        return results;
+      }
+
+      @Override
+      public RecordReader<NullWritable, GenericData.Record>
+      createRecordReader(final InputSplit is, final TaskAttemptContext tc)
+          throws IOException, InterruptedException {
+        RecordReader<NullWritable, GenericData.Record> rr =
+            new RecordReader<NullWritable, GenericData.Record>() {
+
+          private FileSplit fsplit;
+          private AvroColumnReader.Params params;
+          private AvroColumnReader<GenericData.Record> reader;
+          private float rows;
+          private long row = 0;
+          private GenericData.Record currentRecord = null;
+
+          @Override
+          public void close() throws IOException {
+            reader.close();
+          }
+
+          @Override
+          public NullWritable getCurrentKey()
+              throws IOException, InterruptedException {
+            return NullWritable.get();
+          }
+
+          @Override
+          public Record getCurrentValue()
+              throws IOException, InterruptedException {
+            return currentRecord;
+          }
+
+          @Override
+          public float getProgress()
+              throws IOException, InterruptedException {
+            return row / rows;
+          }
+
+          @Override
+          public void initialize(final InputSplit isplit,
+              final TaskAttemptContext tac)
+                  throws IOException, InterruptedException {
+            fsplit = (FileSplit) isplit;
+            params = new AvroColumnReader.Params(
+                new HadoopInput(fsplit.getPath(), tac.getConfiguration()));
+            Schema inputSchema = getInputAvroSchema();
+            params.setSchema(inputSchema);
+            reader = new AvroColumnReader<GenericData.Record>(params);
+            rows = reader.getRowCount();
+          }
+
+          @Override
+          public boolean nextKeyValue()
+              throws IOException, InterruptedException {
+            if (reader.hasNext()) {
+              currentRecord = reader.next();
+              row++;
+              return true;
+            } else {
+              return false;
+            }
+          }
+        };
+
+        // rr.initialize(is, tc);
+        tc.setStatus(is.toString());
+        return rr;
+      }
+
+    }
+
+    return new TrevniStorageInputFormat();
+
+  }
+
+  /*
+   * @see org.apache.pig.StoreFuncInterface#getOutputFormat()
+   */
+  @Override
+  public OutputFormat<NullWritable, Object> getOutputFormat()
+      throws IOException {
+    class TrevniStorageOutputFormat
+        extends FileOutputFormat<NullWritable, Object> {
+
+      private Schema schema;
+
+      TrevniStorageOutputFormat(final Schema s) {
+        schema = s;
+        if (s == null) {
+          String schemaString = getProperties(
+                AvroStorage.class, udfContextSignature)
+              .getProperty(OUTPUT_AVRO_SCHEMA);
+          if (schemaString != null) {
+            schema = (new Schema.Parser()).parse(schemaString);
+          }
+        }
+
+      }
+
+      @Override
+      public RecordWriter<NullWritable, Object>
+          getRecordWriter(final TaskAttemptContext tc)
+          throws IOException, InterruptedException {
+
+        if (schema == null) {
+          String schemaString = getProperties(
+                AvroStorage.class, udfContextSignature)
+              .getProperty(OUTPUT_AVRO_SCHEMA);
+          if (schemaString != null) {
+            schema = (new Schema.Parser()).parse(schemaString);
+          }
+          if (schema == null) {
+            throw new IOException("Null output schema");
+          }
+        }
+
+        final ColumnFileMetaData meta = new ColumnFileMetaData();
+
+        for (Entry<String, String> e : tc.getConfiguration()) {
+          if (e.getKey().startsWith(
+              org.apache.trevni.avro.AvroTrevniOutputFormat.META_PREFIX)) {
+            meta.put(e.getKey().substring(AvroJob.TEXT_PREFIX.length()), 
+                e.getValue().getBytes(MetaData.UTF8));
+          }
+        }
+        
+        final Path dir = getOutputPath(tc);
+        final FileSystem fs = FileSystem.get(tc.getConfiguration());
+        final long blockSize = fs.getDefaultBlockSize();
+
+        if (!fs.mkdirs(dir)) {
+          throw new IOException("Failed to create directory: " + dir);
+        }
+
+        meta.setCodec("deflate");
+
+        return new AvroRecordWriter(dir, tc.getConfiguration()) {
+          private int part = 0;
+          private Schema avroRecordWriterSchema;
+          private AvroColumnWriter<GenericData.Record> writer;
+          
+          private void flush() throws IOException {
+            Integer taskAttemptId = tc.getTaskAttemptID().getTaskID().getId();
+            String partName = String.format("%05d_%03d", taskAttemptId, part++);
+            OutputStream out = fs.create(
+                new Path(dir, "part-" + partName + AvroTrevniOutputFormat.EXT));
+            try {
+              writer.writeTo(out);
+            } finally {
+              out.flush();
+              out.close();
+            }
+          }
+
+          @Override
+          public void close(final TaskAttemptContext arg0)
+              throws IOException, InterruptedException {
+            flush();
+          }
+
+          @Override
+          public void write(final NullWritable n, final Object o)
+              throws IOException, InterruptedException {
+            GenericData.Record r =
+                AvroStorageDataConversionUtilities
+                .packIntoAvro((Tuple) o, schema);
+            writer.write(r);
+            if (writer.sizeEstimate() >= blockSize) {
+              flush();
+              writer = new AvroColumnWriter<GenericData.Record>(
+                  avroRecordWriterSchema, meta);
+            }
+          }
+
+          @Override
+          public void prepareToWrite(Schema s) throws IOException {
+            avroRecordWriterSchema = s;
+            writer = new AvroColumnWriter<GenericData.Record>(
+                avroRecordWriterSchema, meta);
+          }
+        };
+      }
+    }
+
+    return new TrevniStorageOutputFormat(schema);
+  }
+
+  @Override
+  public  Schema getAvroSchema(Path p, final Job job) throws IOException {
+
+    FileSystem fs = FileSystem.get(p.toUri(), job.getConfiguration());
+    FileStatus[] statusArray = fs.globStatus(p, VISIBLE_FILES);
+
+    if (statusArray == null) {
+      throw new IOException("Path " + p.toString() + " does not exist.");
+    }
+    
+    if (statusArray.length == 0) {
+      throw new IOException("No path matches pattern " + p.toString());
+    }
+
+    Path filePath = depthFirstSearchForFile(statusArray, fs);
+    
+    if (filePath == null) {
+      throw new IOException("No path matches pattern " + p.toString());
+    }
+
+    AvroColumnReader.Params params =
+        new AvroColumnReader.Params(
+            new HadoopInput(filePath, job.getConfiguration()));
+    AvroColumnReader<GenericData.Record> reader =
+        new AvroColumnReader<GenericData.Record>(params);
+    Schema s = reader.getFileSchema();
+    reader.close();
+    return s;
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroArrayReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroArrayReader.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroArrayReader.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroArrayReader.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util.avro;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import com.google.common.collect.Lists;
+
+
+/**
+ * RecordReader for Avro files
+ */
+public final class AvroArrayReader
+  extends RecordReader<NullWritable, GenericData.Record> {
+
+  private FileReader<GenericData.Array<Object>> reader;
+  private long start;
+  private long end;
+  private Schema schema;
+  private GenericData.Array<Object> currentArray;
+
+  /**
+   * Creates new instance of AvroRecordReader.
+   * @param s The input schema.
+   */
+  public AvroArrayReader(final Schema s) {
+    schema = s;
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+
+  /**
+   * Returns current value.
+   * @return the current value
+   * @throws IOException when an IO error occurs
+   * @throws InterruptedException when interrupted
+   */
+  @Override
+  public GenericData.Record getCurrentValue()
+      throws IOException, InterruptedException {
+    if (currentArray != null) {
+      GenericData.Record r = new GenericData.Record(
+          Schema.createRecord(
+              Lists.newArrayList(
+                  new Schema.Field(schema.getName(), schema, null, null))
+              )
+          );
+      r.put(0, currentArray);
+      return r;
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public NullWritable getCurrentKey()
+      throws IOException, InterruptedException {
+    return NullWritable.get();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    if (start == end) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f,
+          ((float) (reader.tell() - start)) / ((float) (end - start)));
+    }
+  }
+
+  @Override
+  public void initialize(final InputSplit isplit, final TaskAttemptContext tc)
+      throws IOException, InterruptedException {
+
+    FileSplit fsplit = (FileSplit) isplit;
+    start  = fsplit.getStart();
+    end    = fsplit.getStart() + fsplit.getLength();
+    DatumReader<GenericData.Array<Object>> datumReader
+      = new GenericDatumReader<GenericData.Array<Object>>(schema);
+    reader = DataFileReader.openReader(
+        new FsInput(fsplit.getPath(), tc.getConfiguration()),
+        datumReader);
+    reader.sync(start);
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+
+    if (reader.pastSync(end)) {
+      return false;
+    }
+    try {
+      currentArray = reader.next();
+    } catch (NoSuchElementException e) {
+      return false;
+    }
+    return true;
+  }
+
+}

Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util.avro;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+/**
+ * Class that implements the Pig bag interface, wrapping an Avro array.
+ * Designed to reduce data copying.
+ * @param <T> Type of objects in Avro array
+ */
+public final class AvroBagWrapper<T> implements DataBag {
+
+  /**
+   * The array object wrapped in this AvroBagWrapper object.
+   */
+  private GenericArray<T> theArray;
+
+  /**
+   * Create new AvroBagWrapper instance.
+   * @param a Avro array to wrap in bag
+   */
+  public AvroBagWrapper(final GenericArray<T> a) { theArray = a; }
+
+  @Override
+  public long spill() { return 0; }
+
+  @Override
+  public long getMemorySize() {
+    return 0;
+  }
+
+  @Override
+  public void readFields(final DataInput d) throws IOException {
+    throw new IOException(
+        this.getClass().toString() + ".readFields not implemented yet");
+  }
+
+  @Override
+  public void write(final DataOutput d) throws IOException {
+    throw new IOException(
+        this.getClass().toString() + ".write not implemented yet");
+  }
+
+  @Override
+  public int compareTo(final Object o) {
+    return GenericData.get().compare(theArray, o, theArray.getSchema());
+  }
+
+  @Override public long size() { return theArray.size(); }
+  @Override public boolean isSorted() { return false; }
+  @Override public boolean isDistinct() { return false; }
+
+  @Override
+  public Iterator<Tuple> iterator() {
+    return Iterators.transform(theArray.iterator(),
+        new Function<T, Tuple>() {
+            @Override
+            public Tuple apply(final T arg) {
+              if (arg instanceof IndexedRecord) {
+                return new AvroTupleWrapper<IndexedRecord>((IndexedRecord) arg);
+              } else {
+                return TupleFactory.getInstance().newTuple(arg);
+              }
+            }
+          }
+        );
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override public void add(final Tuple t) { theArray.add((T) t); }
+
+  @Override
+  public void addAll(final DataBag b) {
+    for (Tuple t : b) {
+      add(t);
+    }
+  }
+
+  @Override public void clear() { theArray.clear(); }
+  @Override public void markStale(final boolean stale) { }
+
+}

Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util.avro;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.util.Utf8;
+
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Sets;
+import com.google.common.base.Function;
+
+/**
+ * Wrapper for map objects, so we can translate UTF8 objects to
+ * Strings if we encounter them.
+ */
+public final class AvroMapWrapper implements Map<CharSequence, Object> {
+
+  /**
+   * The map contained in the wrapper object.
+   */
+  private Map<CharSequence, Object> innerMap;
+
+  /**
+   * Creates a new AvroMapWrapper object from the map object {@m}.
+   * @param m The map to wrap.
+   */
+  public AvroMapWrapper(final Map<CharSequence, Object> m) {
+    innerMap = m;
+  }
+
+  @Override
+  public int size() {
+    return innerMap.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return innerMap.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(final Object key) {
+    return innerMap.containsKey(key);
+  }
+
+  @Override
+  public boolean containsValue(final Object value) {
+    return innerMap.containsValue(value);
+  }
+
+  @Override
+  public Object get(final Object key) {
+    Object v = innerMap.get(key);
+    if (v instanceof Utf8) {
+      return v.toString();
+    } else {
+      return v;
+    }
+  }
+
+  @Override
+  public Object put(final CharSequence key, final Object value) {
+    return innerMap.put(key, value);
+  }
+
+  @Override
+  public Object remove(final Object key) {
+    return innerMap.remove(key);
+  }
+
+  @Override
+  public void putAll(
+      final Map<? extends CharSequence, ? extends Object> m) {
+    innerMap.putAll(m);
+  }
+
+  @Override
+  public void clear() {
+    innerMap.clear();
+  }
+
+  @Override
+  public Set<CharSequence> keySet() {
+    return innerMap.keySet();
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Override
+  public Collection<Object> values() {
+    return Collections2.transform(innerMap.values(),
+        new Function() {
+            @Override
+            public Object apply(final Object v) {
+              if (v instanceof Utf8) {
+                return v.toString();
+              } else {
+                return v;
+              }
+            }
+          }
+        );
+  }
+
+  @Override
+  public Set<java.util.Map.Entry<CharSequence, Object>> entrySet() {
+    Set<java.util.Map.Entry<CharSequence, Object>> theSet =
+        Sets.newHashSetWithExpectedSize(innerMap.size());
+    for (java.util.Map.Entry<CharSequence, Object> e : innerMap.entrySet()) {
+      CharSequence k = e.getKey();
+      Object v = e.getValue();
+      if (k instanceof Utf8) {
+        k = k.toString();
+      }
+      if (v instanceof Utf8) {
+        v = v.toString();
+      }
+      theSet.add(new AbstractMap.SimpleEntry<CharSequence, Object>(k, v));
+    }
+
+    return theSet;
+
+  }
+
+}

Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordReader.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordReader.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordReader.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util.avro;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+
+/**
+ * RecordReader for Avro files
+ */
+public final class AvroRecordReader
+  extends RecordReader<NullWritable, GenericData.Record> {
+
+  private FileReader<GenericData.Record> reader;
+  private long start;
+  private long end;
+  private Schema schema;
+  private GenericData.Record currentRecord;
+
+  /**
+   * Creates new instance of AvroRecordReader.
+   * @param s The input schema.
+   */
+  public AvroRecordReader(final Schema s) {
+    schema = s;
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+
+  /**
+   * Returns current value.
+   * @return the current value
+   * @throws IOException when an IO error occurs
+   * @throws InterruptedException when interrupted
+   */
+  @Override
+  public GenericData.Record getCurrentValue()
+      throws IOException, InterruptedException {
+    if (currentRecord != null) {
+      return currentRecord;
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public NullWritable getCurrentKey()
+      throws IOException, InterruptedException {
+    return NullWritable.get();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    if (start == end) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f,
+          ((float) (reader.tell() - start)) / ((float) (end - start)));
+    }
+  }
+
+  @Override
+  public void initialize(final InputSplit isplit, final TaskAttemptContext tc)
+      throws IOException, InterruptedException {
+
+    FileSplit fsplit = (FileSplit) isplit;
+    start  = fsplit.getStart();
+    end    = fsplit.getStart() + fsplit.getLength();
+    DatumReader<GenericData.Record> datumReader
+      = new GenericDatumReader<GenericData.Record>(schema);
+    reader = DataFileReader.openReader(
+        new FsInput(fsplit.getPath(), tc.getConfiguration()),
+        datumReader);
+    reader.sync(start);
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+
+    if (reader.pastSync(end)) {
+      return false;
+    }
+
+    try {
+      currentRecord = reader.next(new GenericData.Record(schema));
+    } catch (NoSuchElementException e) {
+      return false;
+    } catch (IOException ioe) {
+      reader.sync(reader.tell()+1);
+      throw ioe;
+    }
+
+    return true;
+  }
+
+}

Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util.avro;
+
+import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
+import static org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.Map;
+
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.data.Tuple;
+
+/**
+ * RecordWriter for Avro objects.
+ */
+public class AvroRecordWriter extends RecordWriter<NullWritable, Object> {
+
+  private Schema schema = null;
+  private DataFileWriter<GenericData.Record> writer;
+  private Path out;
+  private Configuration conf;
+  
+  /**
+   * Creates new AvroRecordWriter.
+   * @param s Schema for the files on this output path
+   * @param o Output path
+   * @param c Hadoop configuration
+   * @throws IOException
+   */
+  public AvroRecordWriter(/*final Schema s, */final Path o, final Configuration c)
+      throws IOException {
+    out = o;
+    conf = c;
+  }
+
+  // copied from org.apache.avro.mapred.AvroOutputFormat
+  static void configureDataFileWriter(DataFileWriter<GenericData.Record> writer,
+      JobConf job) throws UnsupportedEncodingException {
+    if (FileOutputFormat.getCompressOutput(job)) {
+      int level = job.getInt(DEFLATE_LEVEL_KEY,
+          DEFAULT_DEFLATE_LEVEL);
+      String codecName = job.get(AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
+      CodecFactory factory = codecName.equals(DEFLATE_CODEC)
+        ? CodecFactory.deflateCodec(level)
+        : CodecFactory.fromString(codecName);
+      writer.setCodec(factory);
+    }
+
+    writer.setSyncInterval(job.getInt(SYNC_INTERVAL_KEY,
+        DEFAULT_SYNC_INTERVAL));
+
+    // copy metadata from job
+    for (Map.Entry<String,String> e : job) {
+      if (e.getKey().startsWith(AvroJob.TEXT_PREFIX))
+        writer.setMeta(e.getKey().substring(AvroJob.TEXT_PREFIX.length()),
+                       e.getValue());
+      if (e.getKey().startsWith(AvroJob.BINARY_PREFIX))
+        writer.setMeta(e.getKey().substring(AvroJob.BINARY_PREFIX.length()),
+                       URLDecoder.decode(e.getValue(), "ISO-8859-1")
+                       .getBytes("ISO-8859-1"));
+    }
+  }
+  
+  @Override
+  public void close(final TaskAttemptContext arg0)
+      throws IOException, InterruptedException {
+    writer.close();
+  }
+
+  @Override
+  public void write(final NullWritable key, final Object value)
+      throws IOException, InterruptedException {
+
+    if (value instanceof GenericData.Record) {
+      // whoo-hoo! already avro
+      writer.append((GenericData.Record) value);
+    } else if (value instanceof Tuple) {
+      // pack the object into an Avro record
+      writer.append(AvroStorageDataConversionUtilities
+          .packIntoAvro((Tuple) value, schema));
+    }
+  }
+  
+  public void prepareToWrite(Schema s) throws IOException {
+    if (s == null) {
+      throw new IOException(
+          this.getClass().getName() + ".prepareToWrite called with null schema");
+    }
+    schema = s;
+    DatumWriter<GenericData.Record> datumWriter =
+        new GenericDatumWriter<GenericData.Record>(s);
+    writer = new DataFileWriter<GenericData.Record>(datumWriter);
+    configureDataFileWriter(writer, new JobConf(conf));
+    writer.create(s, out.getFileSystem(conf).create(out));
+
+  }
+  
+}