You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/06/29 00:54:06 UTC
svn commit: r1497954 [1/3] - in /pig/trunk: ./ .eclipse.templates/ ivy/
src/docs/src/documentation/content/xdocs/ src/org/apache/pig/builtin/
src/org/apache/pig/impl/util/avro/ test/ test/org/apache/pig/builtin/
test/org/apache/pig/builtin/avro/ test/o...
Author: cheolsoo
Date: Fri Jun 28 22:54:05 2013
New Revision: 1497954
URL: http://svn.apache.org/r1497954
Log:
PIG-3015: Rewrite of AvroStorage (jadler via cheolsoo)
Added:
pig/trunk/src/org/apache/pig/builtin/AvroStorage.java
pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java
pig/trunk/src/org/apache/pig/impl/util/avro/
pig/trunk/src/org/apache/pig/impl/util/avro/AvroArrayReader.java
pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java
pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java
pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordReader.java
pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java
pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java
pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java
pig/trunk/test/org/apache/pig/builtin/avro/
pig/trunk/test/org/apache/pig/builtin/avro/code/
pig/trunk/test/org/apache/pig/builtin/avro/code/pig/
pig/trunk/test/org/apache/pig/builtin/avro/code/pig/directory_test.pig
pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity.pig
pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ai1_ao2.pig
pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_ao2.pig
pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_blank_first_args.pig
pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_codec.pig
pig/trunk/test/org/apache/pig/builtin/avro/code/pig/identity_just_ao2.pig
pig/trunk/test/org/apache/pig/builtin/avro/code/pig/namesWithDoubleColons.pig
pig/trunk/test/org/apache/pig/builtin/avro/code/pig/projection_test.pig
pig/trunk/test/org/apache/pig/builtin/avro/code/pig/recursive_tests.pig
pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_avro.pig
pig/trunk/test/org/apache/pig/builtin/avro/code/pig/trevni_to_trevni.pig
pig/trunk/test/org/apache/pig/builtin/avro/code/pig/with_dates.pig
pig/trunk/test/org/apache/pig/builtin/avro/data/
pig/trunk/test/org/apache/pig/builtin/avro/data/json/
pig/trunk/test/org/apache/pig/builtin/avro/data/json/arrays.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/arraysAsOutputByPig.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/projectionTest.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordWithRepeatedSubRecords.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/records.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsAsOutputByPig.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsAsOutputByPigWithDates.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsOfArrays.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsOfArraysOfRecords.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsSubSchema.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsSubSchemaNullable.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithDoubleUnderscores.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithEnums.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithFixed.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithMaps.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithMapsOfRecords.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithNullableUnions.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithSimpleUnion.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recordsWithSimpleUnionOutput.json
pig/trunk/test/org/apache/pig/builtin/avro/data/json/recursiveRecord.json
pig/trunk/test/org/apache/pig/builtin/avro/schema/
pig/trunk/test/org/apache/pig/builtin/avro/schema/arrays.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/arraysAsOutputByPig.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/projectionTest.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordWithRepeatedSubRecords.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/records.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsAsOutputByPig.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsAsOutputByPigWithDates.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsOfArrays.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsOfArraysOfRecords.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsSubSchema.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsSubSchemaNullable.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithDoubleUnderscores.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithEnums.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithFixed.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithMaps.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithMapsOfRecords.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithNullableUnions.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithSimpleUnion.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recordsWithSimpleUnionOutput.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/recursiveRecord.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/simpleRecordsTrevni.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/testDirectory.avsc
pig/trunk/test/org/apache/pig/builtin/avro/schema/testDirectoryCounts.avsc
Modified:
pig/trunk/.eclipse.templates/.classpath
pig/trunk/CHANGES.txt
pig/trunk/build.xml
pig/trunk/ivy.xml
pig/trunk/ivy/libraries.properties
pig/trunk/src/docs/src/documentation/content/xdocs/func.xml
pig/trunk/test/unit-tests
Modified: pig/trunk/.eclipse.templates/.classpath
URL: http://svn.apache.org/viewvc/pig/trunk/.eclipse.templates/.classpath?rev=1497954&r1=1497953&r2=1497954&view=diff
==============================================================================
--- pig/trunk/.eclipse.templates/.classpath (original)
+++ pig/trunk/.eclipse.templates/.classpath Fri Jun 28 22:54:05 2013
@@ -53,9 +53,11 @@
<classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/servlet-api-2.5-6.1.14.jar"/>
<classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/slf4j-api-1.6.1.jar"/>
<classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/slf4j-log4j12-1.6.1.jar"/>
- <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/xmlenc-0.52.jar"/>
- <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/avro-1.7.4.jar" />
- <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/json-simple-1.1.jar" />
+ <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/xmlenc-0.52.jar"/>
+ <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/avro-1.7.4.jar" />
+ <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/nodeps-1.7.4.jar" />
+ <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/json-simple-1.1.jar" />
+ <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/snappy-java-1.0.5-M3.jar" />
<classpathentry exported="true" kind="lib" path="lib/automaton.jar"/>
<classpathentry kind="output" path="build/classes"/>
</classpath>
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1497954&r1=1497953&r2=1497954&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Jun 28 22:54:05 2013
@@ -24,10 +24,12 @@ INCOMPATIBLE CHANGES
PIG-3191: [piggybank] MultiStorage output filenames are not sortable (Danny Antonelli via jcoveney)
-PIG-3174: Remove rpm and deb artifacts from build.xml (gates)
+PIG-3174: Remove rpm and deb artifacts from build.xml (gates)
IMPROVEMENTS
+PIG-3015: Rewrite of AvroStorage (jadler via cheolsoo)
+
PIG-3361: Improve Hadoop version detection logic for Pig unit test (daijy)
PIG-3280: Document IN operator and CASE expression (cheolsoo)
Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1497954&r1=1497953&r2=1497954&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Fri Jun 28 22:54:05 2013
@@ -331,6 +331,11 @@
<include name="guava-${guava.version}.jar"/>
<include name="automaton-${automaton.version}.jar"/>
<include name="jansi-${jansi.version}.jar"/>
+ <include name="avro-${avro.version}.jar"/>
+ <include name="avro-mapred-${avro.version}.jar"/>
+ <include name="trevni-core-${avro.version}.jar"/>
+ <include name="trevni-avro-${avro.version}.jar"/>
+ <include name="snappy-java-${snappy.version}.jar"/>
<include name="asm*.jar"/>
</patternset>
</fileset>
@@ -344,11 +349,15 @@
<include name="junit-${junit.version}.jar"/>
<include name="jsch-${jsch.version}.jar"/>
<include name="protobuf-java-${protobuf-java.version}.jar"/>
- <include name="avro-${avro.version}.jar"/>
<include name="commons*.jar"/>
<include name="log4j*.jar"/>
<include name="slf4j*.jar"/>
<include name="jsp-api*.jar"/>
+ <include name="avro-${avro.version}.jar"/>
+ <include name="avro-mapred-${avro.version}.jar"/>
+ <include name="trevni-core-${avro.version}.jar"/>
+ <include name="trevni-avro-${avro.version}.jar"/>
+ <include name="snappy-java-${snappy.version}.jar"/>
<include name="asm*.jar"/>
</patternset>
</fileset>
Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1497954&r1=1497953&r2=1497954&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Fri Jun 28 22:54:05 2013
@@ -172,16 +172,28 @@
conf="compile->master"/>
<dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}"
conf="compile->master;checkstyle->master"/>
- <dependency org="org.slf4j" name="slf4j-log4j12" rev="${slf4j-log4j12.version}"
+ <dependency org="org.slf4j" name="slf4j-log4j12" rev="${slf4j-log4j12.version}"
conf="compile->master;test->master"/>
<dependency org="commons-cli" name="commons-cli" rev="${commons-cli.version}"
conf="compile->master;checkstyle->master"/>
-
<dependency org="org.apache.avro" name="avro" rev="${avro.version}"
conf="compile->default;checkstyle->master"/>
+ <dependency org="org.apache.avro" name="avro-mapred" rev="${avro.version}"
+ conf="compile->default;checkstyle->master"/>
+ <dependency org="org.apache.avro" name="trevni-core" rev="${avro.version}"
+ conf="compile->default;checkstyle->master"/>
+ <dependency org="org.apache.avro" name="trevni-avro" rev="${avro.version}"
+ conf="compile->default;checkstyle->master">
+ <exclude org="org.apache.hadoop" module="hadoop-core"/>
+ </dependency>
+ <dependency org="org.apache.avro" name="avro-tools" rev="${avro.version}"
+ conf="test->default">
+ <artifact name="nodeps" type="jar"/>
+ </dependency>
+ <dependency org="org.xerial.snappy" name="snappy-java" rev="${snappy.version}"
+ conf="compile->default;checkstyle->master"/>
<dependency org="com.googlecode.json-simple" name="json-simple" rev="${json-simple.version}"
conf="compile->master;checkstyle->master"/>
-
<dependency org="jdiff" name="jdiff" rev="${jdiff.version}"
conf="jdiff->default"/>
<dependency org="xalan" name="xalan" rev="${xalan.version}"
Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1497954&r1=1497953&r2=1497954&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Fri Jun 28 22:54:05 2013
@@ -86,3 +86,4 @@ jsr311-api.version=1.1.1
mockito.version=1.8.4
jansi.version=1.9
asm.version=3.3.1
+snappy.version=1.0.5-M3
Modified: pig/trunk/src/docs/src/documentation/content/xdocs/func.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/func.xml?rev=1497954&r1=1497953&r2=1497954&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/func.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/func.xml Fri Jun 28 22:54:05 2013
@@ -1755,6 +1755,342 @@ STORE A INTO 'hbase://users_table' USING
rowKey.</p>
</section>
</section>
+
+ <!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
+ <section id="AvroStorage">
+ <title>AvroStorage</title>
+ <p>Loads and stores data from Avro files.</p>
+
+ <section>
+ <title>Syntax</title>
+ <table>
+ <tr>
+ <td>
+ <p>Avrostorage(['schema|record name'], ['options'])</p>
+ </td>
+ </tr>
+ </table>
+ </section>
+
+ <section>
+ <title>Terms</title>
+ <table>
+ <tr>
+ <td>
+ <p>schema</p>
+ </td>
+ <td>
+ <p>A JSON string specifying the Avro schema for the input. You may specify an explicit schema
+ when storing data or when loading data. When you manually provide a schema, Pig
+ will use the provided schema for serialization and deserialization. This means that
+ you can provide an explicit schema when saving data to simplify the output (for example
+ by removing nullable unions), or rename fields. This also means that you can provide
+ an explicit schema when reading data to only read a subset of the fields in each record.</p>
+
+ <p>See
+ <a href="http://avro.apache.org/docs/current/spec.html"> the Apache Avro Documentation</a>
+ for more details on how to specify a valid schema.</p>
+ </td>
+ </tr>
+ <tr>
+ <td>
+ <p>record name</p>
+ </td>
+ <td>
+ <p>When storing a bag of tuples with AvroStorage, if you do not want to specify
+ the full schema, you may specify the avro record name instead. (AvroStorage will
+ determine that the argument isn't a valid schema definition and use it as a
+ variable name instead.)</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ <p>'options'</p>
+ </td>
+ <td>
+ <p>A string that contains space-separated options (‘-optionA valueA -optionB valueB -optionC ’)</p>
+ <p>Currently supported options are:</p>
+ <ul>
+ <li>-namespace nameSpace or -n nameSpace Explicitly specify the namespace
+ field in Avro records when storing data</li>
+ <li>-schemfile schemaFile or -f schemaFile Specify the input (or output) schema from
+ an external file. Pig assumes that the file is located on the default filesystem,
+ but you may use an explicity URL to unambigously specify the location. (For example, if
+ the data was on your local file system in /stuff/schemafile.avsc, you
+ could specify "-f file:///stuff/schemafile.avsc" to specify the location. If the
+ data was on HDFS under /yourdirectory/schemafile.avsc, you could specify
+ "-f hdfs:///yourdirectory/schemafile.avsc"). Pig expects this to be a
+ text file, containing a valid avro schema.</li>
+ <li>-examplefile exampleFile or -e exampleFile Specify the input (or output)
+ schema using another Avro file as an example. Pig assumes that the file is located on the default filesystem,
+ but you may use and explicity URL to specify the location. Pig
+ expects this to be an Avro data file.</li>
+ <li>-allowrecursive or -r Specify whether to allow recursive schema definitions (the
+ default is to throw an exception if Pig encounters a recursive schema). When
+ reading objects with recursive definitions, Pig will translate Avro records to
+ schema-less tuples; the Pig Schema for the object may not match the data exactly.</li>
+ <li>-doublecolons or -d Specify how to handle Pig schemas that contain double-colons
+ when writing data in Avro format. (When you join two bags in Pig, Pig will automatically
+ label the fields in the output Tuples with names that contain double-colons). If
+ you select this option, AvroStorage will translate names with double colons into
+ names with double underscores. </li>
+ </ul>
+ </td>
+ </tr>
+ </table>
+ </section>
+
+ <section>
+ <title>Usage</title>
+ <p>AvroStorage stores and loads data from Avro files. Often, you can load and
+ store data using AvroStorage without knowing much about the Avros serialization format.
+ AvroStorage will attempt to automatically translate a pig schema and pig data to avro data,
+ or avro data to pig data.</p>
+ <p>By default, when you use AvoStorage to load data, AvroStorage will use depth first search to
+ find a valid Avro file on the input path, then use the schema from that file to load the
+ data. When you use AvroStorage to store data, AvroStorage will attempt to translate the
+ Pig schema to an equivalent Avro schema. You can manually specify the schema by providing
+ an explicit schema in Pig, loading a schema from an external schema file, or explicitly telling
+ Pig to read the schema from a specific avro file.</p>
+ <p>To compress your output with AvroStorage, you need to use the correct Avro properties for compression.
+ For example, to enable compression using deflate level 5, you would specify</p>
+<source>
+SET avro.output.codec 'deflate'
+SET avro.mapred.deflate.level 5
+</source>
+ <p>Valid values for avro.output.codec include deflate, snappy, and null.</p>
+ <p>There are a few key differences between Avro and Pig data, and in some cases
+ it helps to understand the differences between the Avro and Pig data models.
+ Before writing Pig data to Avro (or creating Avro files to use in Pig), keep in
+ mind that there might not be an equivalent Avro Schema for every Pig Schema (and
+ vice versa):</p>
+ <ul>
+ <li><strong>Recursive schema definitions</strong> You cannot define schemas recursively in Pig,
+ but you can define schemas recursively in Avro.</li>
+ <li><strong>Allowed characters</strong> Pig schemas may sometimes contain characters like colons (":")
+ that are illegal in Avro names.</li>
+ <li><strong>Unions</strong> In Avro, you can define an object that may be one of several different
+ types (including complex types such as records). In Pig, you cannot.</li>
+ <li><strong>Enums</strong> Avro allows you to define enums to efficiently and abstractly
+ represent categorical variable, but Pig does not.</li>
+ <li><strong>Fixed Length Byte Arrays</strong> Avro allows you to define fixed length byte arrays,
+ but Pig does not.</li>
+ <li><strong>Nullable values</strong> In Pig, all types are nullable. In Avro, they are not. </li>
+ </ul>
+ <p>Here is how AvroStorage translates Pig values to Avro:</p>
+ <table>
+ <tr>
+ <td></td>
+ <td>Original Pig Type</td>
+ <td>Translated Avro Type</td>
+ </tr>
+ <tr>
+ <td>Integers</td>
+ <td>int</td>
+ <td>["int","null"]</td>
+ </tr>
+ <tr>
+ <td>Longs</td>
+ <td>long</td>
+ <td>["long,"null"]</td>
+ </tr>
+ <tr>
+ <td>Floats</td>
+ <td>float</td>
+ <td>["float","null"]</td>
+ </tr>
+ <tr>
+ <td>Doubles</td>
+ <td>double</td>
+ <td>["double","null"]</td>
+ </tr>
+ <tr>
+ <td>Strings</td>
+ <td>chararray</td>
+ <td>["string","null"]</td>
+ </tr>
+ <tr>
+ <td>Bytes</td>
+ <td>bytearray</td>
+ <td>["bytes","null"]</td>
+ </tr>
+ <tr>
+ <td>Booleans</td>
+ <td>boolean</td>
+ <td>["boolean","null"]</td>
+ </tr>
+ <tr>
+ <td>Tuples</td>
+ <td>tuple</td>
+ <td>The Pig Tuple schema will be translated to an union of and Avro record with an equivalent
+ schem and null.</td>
+ </tr>
+ <tr>
+ <td>Bags of Tuples</td>
+ <td>bag</td>
+ <td>The Pig Tuple schema will be translated to a union of an array of records with an equivalent
+ schema and null.</td>
+ </tr>
+ <tr>
+ <td>Maps</td>
+ <td>map</td>
+ <td>The Pig Tuple schema will be translated to a union of a map of records with an equivalent
+ schema and null.</td>
+ </tr>
+ </table>
+
+ <p>Here is how AvroStorage translates Avro values to Pig:</p>
+ <table>
+ <tr>
+ <td></td>
+ <td>Original Avro Types</td>
+ <td>Translated Pig Type</td>
+ </tr>
+ <tr>
+ <td>Integers</td>
+ <td>["int","null"] or "int"</td>
+ <td>int</td>
+ </tr>
+ <tr>
+ <td>Longs</td>
+ <td>["long,"null"] or "long"</td>
+ <td>long</td>
+ </tr>
+ <tr>
+ <td>Floats</td>
+ <td>["float","null"] or "float"</td>
+ <td>float</td>
+ </tr>
+ <tr>
+ <td>Doubles</td>
+ <td>["double","null"] or "double"</td>
+ <td>double</td>
+ </tr>
+ <tr>
+ <td>Strings</td>
+ <td>["string","null"] or "string"</td>
+ <td>chararray</td>
+ </tr>
+ <tr>
+ <td>Enums</td>
+ <td>Either an enum or a union of an enum and null</td>
+ <td>chararray</td>
+ </tr>
+ <tr>
+ <td>Bytes</td>
+ <td>["bytes","null"] or "bytes"</td>
+ <td>bytearray</td>
+ </tr>
+ <tr>
+ <td>Fixes</td>
+ <td>Either a fixed length byte array, or a union of a fixed length array and null</td>
+ <td>bytearray</td>
+ </tr>
+ <tr>
+ <td>Booleans</td>
+ <td>["boolean","null"] or "boolean"</td>
+ <td>boolean</td>
+ </tr>
+ <tr>
+ <td>Tuples</td>
+ <td>Either a record type, or a union or a record and null</td>
+ <td>tuple</td>
+ </tr>
+ <tr>
+ <td>Bags of Tuples</td>
+ <td>Either an array, or a union of an array and null</td>
+ <td>bag</td>
+ </tr>
+ <tr>
+ <td>Maps</td>
+ <td>Either a map, or a union of a map and null</td>
+ <td>map</td>
+ </tr>
+ </table>
+
+ <p> In many cases, AvroStorage will automatically translate your data correctly and you will not
+ need to provide any more information to AvroStorage. But sometimes, it may be convenient to
+ manually provide a schema to AvroStorge. See the example selection below for examples
+ on manually specifying a schema with AvroStorage.
+ </p>
+ </section>
+ <section>
+ <title>Load Examples</title>
+ <p>Suppose that you were provided with a file of avro data (located in 'stuff')
+ with the following schema:</p>
+<source>
+{"type" : "record",
+ "name" : "stuff",
+ "fields" : [
+ {"name" : "label", "type" : "string"},
+ {"name" : "value", "type" : "int"},
+ {"name" : "marketingPlans", "type" : ["string", "bytearray", "null"]}
+ ]
+}
+</source>
+ <p>Additionally, suppose that you don't need the value of the field "marketingPlans."
+ (That's a good thing, because AvroStorage doesn't know how to translate that Avro schema
+ to a Pig schema). To load only the fieds "label" and "value" into Pig, you can
+ manually specify the schema passed to AvroStorage:</p>
+<source>
+measurements = LOAD 'stuff' USING AvroStorage(
+ '{"type":"record","name":"measurement","fields":[{"name":"label","type":"string"},{"name":"value","type":"int"}]}'
+ );
+</source>
+ </section>
+
+ <section>
+ <title>Store Examples</title>
+ <p>Suppose that you are saving a bag called measurements with the schema:</p>
+<source>
+measurements:{measurement:(label:chararray,value:int)}
+</source>
+ <p>To store this bag into a file called "measurements", you can use a statement like:</p>
+<source>
+STORE measurements INTO 'measurements' USING AvroStorage('measurement');
+</source>
+ <p>AvroStorage will translate this to the Avro schema</p>
+<source>
+{"type":"record",
+ "name":"measurement",
+ "fields" : [
+ {"name" : "label", "type" : ["string", "null"]},
+ {"name" : "value", "type" : ["int", "null"]}
+ ]
+}
+</source>
+ <p>But suppose that you knew that the label and value fields would never be null. You could
+ define a more precise schema manually using a statement like:</p>
+<source>
+STORE measurements INTO 'measurements' USING AvroStorage(
+ '{"type":"record","name":"measurement","fields":[{"name":"label","type":"string"},{"name":"value","type":"int"}]}'
+ );
+</source>
+ </section>
+ </section>
+
+ <!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
+ <section id="TrevniStorage">
+ <title>TrevniStorage</title>
+ <p>Loads and stores data from Trevni files.</p>
+
+ <section>
+ <title>Syntax</title>
+ <table>
+ <tr>
+ <td>
+ <p>TrevniStorage(['schema|record name'], ['options'])</p>
+ </td>
+ </tr>
+ </table>
+ </section>
+ <p>Trevni is a column-oriented storage format that is part of the Apache Avro project. Trevni is
+ closely related to Avro.</p>
+ <p>Likewise, TrevniStorage is very closely related to AvroStorage, and shares the same options as
+ AvroStorage. See <a href="#AvroStorage">AvroStorage</a> for a detailed description of the
+ arguments for TrevniStorage.</p>
+ </section>
</section>
<!-- ======================================================== -->
Added: pig/trunk/src/org/apache/pig/builtin/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AvroStorage.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AvroStorage.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/AvroStorage.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,721 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.mapred.AvroOutputFormat;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.PigWarning;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.avro.AvroArrayReader;
+import org.apache.pig.impl.util.avro.AvroRecordReader;
+import org.apache.pig.impl.util.avro.AvroRecordWriter;
+import org.apache.pig.impl.util.avro.AvroStorageSchemaConversionUtilities;
+import org.apache.pig.impl.util.avro.AvroTupleWrapper;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Longs;
+
+/**
+ * Pig UDF for reading and writing Avro data.
+ *
+ */
+public class AvroStorage extends LoadFunc
+ implements StoreFuncInterface, LoadMetadata, LoadPushDown {
+
+ /**
+ * Creates new instance of Pig Storage function, without specifying
+ * the schema. Useful for just loading in data.
+ */
+ public AvroStorage() {
+ this(null, null);
+ }
+
+ /**
+ * Creates new instance of Pig Storage function.
+ * @param sn Specifies the input/output schema or record name.
+ */
+ public AvroStorage(final String sn) {
+ this(sn, null);
+ }
+
+ private String schemaName = "record";
+ private String schemaNameSpace = null;
+ protected boolean allowRecursive = false;
+ protected boolean doubleColonsToDoubleUnderscores = false;
+ protected Schema schema;
+ protected final Log log = LogFactory.getLog(getClass());
+
+ /**
+ * Creates new instance of AvroStorage function, specifying output schema
+ * properties.
+ * @param sn Specifies the input/output schema or record name.
+ * @param opts Options for AvroStorage:
+ * <li><code>-namespace</code> Namespace for an automatically generated
+ * output schema.</li>
+ * <li><code>-schemafile</code> Specifies URL for avro schema file
+ * from which to read the input schema (can be local file, hdfs,
+ * url, etc).</li>
+ * <li><code>-examplefile</code> Specifies URL for avro data file from
+ * which to copy the input schema (can be local file, hdfs, url, etc).</li>
+ * <li><code>-allowrecursive</code> Option to allow recursive schema
+ * definitions (default is false).</li>
+ * <li><code>-doublecolons</code> Option to translate Pig schema names
+ * with double colons to names with double underscores (default is false).</li>
+ *
+ */
+ public AvroStorage(final String sn, final String opts) {
+ super();
+
+ if (sn != null) {
+ try {
+ Schema s = (new Schema.Parser()).parse(sn);
+ // must be a valid schema
+ setInputAvroSchema(s);
+ setOutputAvroSchema(s);
+ } catch (SchemaParseException e) {
+ // not a valid schema, use as a record name
+ schemaName = sn;
+ }
+ }
+
+ if (opts != null) {
+ String[] optsArr = opts.split(" ");
+ Options validOptions = new Options();
+ try {
+ CommandLineParser parser = new GnuParser();
+ validOptions.addOption("n", "namespace", true,
+ "Namespace for an automatically generated output schema");
+ validOptions.addOption("f", "schemafile", true,
+ "Specifies URL for avro schema file from which to read "
+ + "the input or output schema");
+ validOptions.addOption("e", "examplefile", true,
+ "Specifies URL for avro data file from which to copy "
+ + "the output schema");
+ validOptions.addOption("r", "allowrecursive", false,
+ "Option to allow recursive schema definitions (default is false)");
+ validOptions.addOption("d", "doublecolons", false,
+ "Option to translate Pig schema names with double colons "
+ + "to names with double underscores (default is false)");
+ CommandLine configuredOptions = parser.parse(validOptions, optsArr);
+ schemaNameSpace = configuredOptions.getOptionValue("namespace", null);
+ allowRecursive = configuredOptions.hasOption('r');
+ doubleColonsToDoubleUnderscores = configuredOptions.hasOption('d');
+
+ if (configuredOptions.hasOption('f')) {
+ try {
+ Path p = new Path(configuredOptions.getOptionValue('f'));
+ Schema s = new Schema.Parser()
+ .parse((FileSystem.get(p.toUri(), new Configuration()).open(p)));
+ setInputAvroSchema(s);
+ setOutputAvroSchema(s);
+ } catch (FileNotFoundException fnfe) {
+ System.err.printf("file not found exception\n");
+ log.warn("Schema file not found when instantiating AvroStorage. (If the " +
+ "schema was described in a local file on the front end, and this message " +
+ "is in the back end log, you can ignore this mesasge.)", fnfe);
+ }
+ } else if (configuredOptions.hasOption('e')) {
+ setOutputAvroSchema(
+ getAvroSchema(configuredOptions.getOptionValue('e'),
+ new Job(new Configuration())));
+ }
+
+ } catch (ParseException e) {
+ log.error("Exception in AvroStorage", e);
+ log.error("AvroStorage called with arguments " + sn + ", " + opts);
+ warn("ParseException in AvroStorage", PigWarning.UDF_WARNING_1);
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("AvroStorage(',', '[options]')", validOptions);
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ log.warn("Exception in AvroStorage", e);
+ log.warn("AvroStorage called with arguments " + sn + ", " + opts);
+ warn("IOException in AvroStorage", PigWarning.UDF_WARNING_1);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Context signature for this UDF instance.
+ */
+ protected String udfContextSignature = null;
+
+ @Override
+ public final void setUDFContextSignature(final String signature) {
+ udfContextSignature = signature;
+ super.setUDFContextSignature(signature);
+ }
+
+ /**
+ * Internal function for getting the Properties object associated with
+ * this UDF instance.
+ * @return The Properties object associated with this UDF instance
+ */
+ protected final Properties getProperties() {
+ if (udfContextSignature == null) {
+ return getProperties(AvroStorage.class, null);
+ } else {
+ return getProperties(AvroStorage.class, udfContextSignature);
+ }
+ }
+
+ /**
+ * Internal function for getting the Properties object associated with
+ * this UDF instance.
+ * @param c Class of this UDF
+ * @param signature Signature string
+ * @return The Properties object associated with this UDF instance
+ */
+ @SuppressWarnings("rawtypes")
+ protected final Properties getProperties(final Class c,
+ final String signature) {
+ UDFContext context = UDFContext.getUDFContext();
+ if (signature == null) {
+ return context.getUDFProperties(c);
+ } else {
+ return context.getUDFProperties(c, new String[] {signature});
+ }
+
+ }
+
+ /*
+ * @see org.apache.pig.LoadMetadata#getSchema(java.lang.String,
+ * org.apache.hadoop.mapreduce.Job)
+ */
+ @Override
+ public final ResourceSchema getSchema(final String location,
+ final Job job) throws IOException {
+ if (schema == null) {
+ Schema s = getAvroSchema(location, job);
+ setInputAvroSchema(s);
+ }
+
+ ResourceSchema rs = AvroStorageSchemaConversionUtilities
+ .avroSchemaToResourceSchema(schema, allowRecursive);
+
+ return rs;
+ }
+
+ /**
+ * Reads the avro schema at the specified location.
+ * @param location Location of file
+ * @param job Hadoop job object
+ * @return an Avro Schema object derived from the specified file
+ * @throws IOException
+ *
+ */
+ protected final Schema getAvroSchema(final String location,
+ final Job job) throws IOException {
+ return getAvroSchema(new Path(location), job);
+ }
+
+ /**
+ * A PathFilter that filters out invisible files.
+ */
+ protected static final PathFilter VISIBLE_FILES = new PathFilter() {
+ @Override
+ public boolean accept(final Path p) {
+ return (!(p.getName().startsWith("_") || p.getName().startsWith(".")));
+ }
+ };
+
+ /**
+ * Reads the avro schema at the specified location.
+ * @param p Location of file
+ * @param job Hadoop job object
+ * @return an Avro Schema object derived from the specified file
+ * @throws IOException
+ *
+ */
+ public Schema getAvroSchema(final Path p, final Job job)
+ throws IOException {
+ GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
+ FileSystem fs = FileSystem.get(p.toUri(), job.getConfiguration());
+ FileStatus[] statusArray = fs.globStatus(p);
+
+ if (statusArray == null) {
+ throw new IOException("Path " + p.toString() + " does not exist.");
+ }
+
+ if (statusArray.length == 0) {
+ throw new IOException("No path matches pattern " + p.toString());
+ }
+
+ Path filePath = depthFirstSearchForFile(statusArray, fs);
+
+ if (filePath == null) {
+ throw new IOException("No path matches pattern " + p.toString());
+ }
+
+ InputStream hdfsInputStream = fs.open(filePath);
+ DataFileStream<Object> avroDataStream =
+ new DataFileStream<Object>(hdfsInputStream, avroReader);
+ Schema s = avroDataStream.getSchema();
+ avroDataStream.close();
+ return s;
+ }
+
+ /**
+ * Finds a valid path for a file from a FileStatus object.
+ * @param fileStatus FileStatus object corresponding to a file,
+ * or a directory.
+ * @param fileSystem FileSystem in with the file should be found
+ * @return The first file found
+ * @throws IOException
+ */
+
+ private Path depthFirstSearchForFile(final FileStatus fileStatus,
+ final FileSystem fileSystem) throws IOException {
+ if (fileSystem.isFile(fileStatus.getPath())) {
+ return fileStatus.getPath();
+ } else {
+ return depthFirstSearchForFile(
+ fileSystem.listStatus(fileStatus.getPath(), VISIBLE_FILES),
+ fileSystem);
+ }
+
+ }
+
+ /**
+ * Finds a valid path for a file from an array of FileStatus objects.
+ * @param statusArray Array of FileStatus objects in which to search
+ * for the file.
+ * @param fileSystem FileSystem in which to search for the first file.
+ * @return The first file found.
+ * @throws IOException
+ */
+ protected Path depthFirstSearchForFile(final FileStatus[] statusArray,
+ final FileSystem fileSystem) throws IOException {
+
+ // Most recent files first
+ Arrays.sort(statusArray,
+ new Comparator<FileStatus>() {
+ @Override
+ public int compare(final FileStatus fs1, final FileStatus fs2) {
+ return Longs.compare(fs2.getModificationTime(),fs1.getModificationTime());
+ }
+ }
+ );
+
+ for (FileStatus f : statusArray) {
+ Path p = depthFirstSearchForFile(f, fileSystem);
+ if (p != null) {
+ return p;
+ }
+ }
+
+ return null;
+
+ }
+
+ /*
+ * @see org.apache.pig.LoadMetadata#getStatistics(java.lang.String,
+ * org.apache.hadoop.mapreduce.Job)
+ */
+ @Override
+ public final ResourceStatistics getStatistics(final String location,
+ final Job job) throws IOException {
+ return null;
+ }
+
+ /*
+ * @see org.apache.pig.LoadMetadata#getPartitionKeys(java.lang.String,
+ * org.apache.hadoop.mapreduce.Job)
+ */
+ @Override
+ public final String[] getPartitionKeys(final String location,
+ final Job job) throws IOException {
+ return null;
+ }
+
+ /*
+ * @see
+ * org.apache.pig.LoadMetadata#setPartitionFilter(org.apache.pig.Expression)
+ */
+ @Override
+ public void setPartitionFilter(final Expression partitionFilter)
+ throws IOException {
+ }
+
+ /*
+ * @see
+ * org.apache.pig.StoreFuncInterface#relToAbsPathForStoreLocation(java.lang
+ * .String, org.apache.hadoop.fs.Path)
+ */
+ @Override
+ public final String relToAbsPathForStoreLocation(final String location,
+ final Path curDir) throws IOException {
+ return LoadFunc.getAbsolutePath(location, curDir);
+ }
+
+ /*
+ * @see org.apache.pig.StoreFuncInterface#getOutputFormat()
+ */
+ @Override
+ public OutputFormat<NullWritable, Object> getOutputFormat()
+ throws IOException {
+
+ /**
+ * Hadoop output format for AvroStorage.
+ */
+ class AvroStorageOutputFormat extends
+ FileOutputFormat<NullWritable, Object> {
+
+ @Override
+ public RecordWriter<NullWritable, Object> getRecordWriter(
+ final TaskAttemptContext tc) throws IOException,
+ InterruptedException {
+
+ return new AvroRecordWriter(
+ // avroStorageOutputFormatSchema,
+ getDefaultWorkFile(tc, AvroOutputFormat.EXT),
+ tc.getConfiguration());
+
+ }
+ }
+
+ return new AvroStorageOutputFormat();
+
+ }
+
+ /*
+ * @see org.apache.pig.StoreFuncInterface#setStoreLocation(java.lang.String,
+ * org.apache.hadoop.mapreduce.Job)
+ */
+ @Override
+ public final void setStoreLocation(final String location,
+ final Job job) throws IOException {
+ FileOutputFormat.setOutputPath(job, new Path(location));
+ }
+
+ /**
+ * Pig property name for the output avro schema.
+ */
+ public static final String OUTPUT_AVRO_SCHEMA =
+ "org.apache.pig.builtin.AvroStorage.output.schema";
+
+ /*
+ * @see
+ * org.apache.pig.StoreFuncInterface#checkSchema(org.apache.pig.ResourceSchema
+ * )
+ */
+ @Override
+ public final void checkSchema(final ResourceSchema rs) throws IOException {
+ if (rs == null) {
+ throw new IOException("checkSchema: called with null ResourceSchema");
+ }
+ Schema avroSchema = AvroStorageSchemaConversionUtilities
+ .resourceSchemaToAvroSchema(rs,
+ (schemaName == null || schemaName.length() == 0)
+ ? "pig_output" : schemaName,
+ schemaNameSpace,
+ Maps.<String, List<Schema>> newHashMap(),
+ doubleColonsToDoubleUnderscores);
+ if (avroSchema == null) {
+ throw new IOException("checkSchema: could not translate ResourceSchema to Avro Schema");
+ }
+ setOutputAvroSchema(avroSchema);
+ }
+
+ /**
+ * Sets the output avro schema to {@s}.
+ * @param s An Avro schema
+ */
+ protected final void setOutputAvroSchema(final Schema s) {
+ schema = s;
+ getProperties()
+ .setProperty(OUTPUT_AVRO_SCHEMA, s.toString());
+ }
+
+ /**
+ * Utility function that gets the output schema from the udf
+ * properties for this instance of the store function.
+ * @return the output schema associated with this UDF
+ */
+ protected final Schema getOutputAvroSchema() {
+ if (schema == null) {
+ String schemaString =
+ getProperties()
+ .getProperty(OUTPUT_AVRO_SCHEMA);
+ if (schemaString != null) {
+ schema = (new Schema.Parser()).parse(schemaString);
+ }
+ }
+ return schema;
+
+ }
+
+ /**
+ * RecordWriter used by this UDF instance.
+ */
+ private RecordWriter<NullWritable, Object> writer;
+
+ /*
+ * @see
+ * org.apache.pig.StoreFuncInterface#prepareToWrite(org.apache.hadoop.mapreduce
+ * .RecordWriter)
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public final void prepareToWrite(final RecordWriter w) throws IOException {
+ if (this.udfContextSignature == null)
+ throw new IOException(this.getClass().toString() + ".prepareToWrite called without setting udf context signature");
+ writer = (RecordWriter<NullWritable, Object>) w;
+ ((AvroRecordWriter) writer).prepareToWrite(getOutputAvroSchema());
+ }
+
+ /*
+ * @see org.apache.pig.StoreFuncInterface#putNext(org.apache.pig.data.Tuple)
+ */
+ @Override
+ public final void putNext(final Tuple t) throws IOException {
+ try {
+ writer.write(null, t);
+ } catch (InterruptedException e) {
+ log.error("InterruptedException in putNext");
+ throw new IOException(e);
+ }
+ }
+
+ /*
+ * @see
+ * org.apache.pig.StoreFuncInterface#setStoreFuncUDFContextSignature(java.
+ * lang.String)
+ */
+ @Override
+ public final void setStoreFuncUDFContextSignature(final String signature) {
+ udfContextSignature = signature;
+ super.setUDFContextSignature(signature);
+ }
+
+ /*
+ * @see org.apache.pig.StoreFuncInterface#cleanupOnFailure(java.lang.String,
+ * org.apache.hadoop.mapreduce.Job)
+ */
+ @Override
+ public final void cleanupOnFailure(final String location,
+ final Job job) throws IOException {
+ StoreFunc.cleanupOnFailureImpl(location, job);
+ }
+
+ /**
+ * Pig property name for the input avro schema.
+ */
+ public static final String INPUT_AVRO_SCHEMA =
+ "org.apache.pig.builtin.AvroStorage.input.schema";
+
+ /*
+ * @see org.apache.pig.LoadFunc#setLocation(java.lang.String,
+ * org.apache.hadoop.mapreduce.Job)
+ */
+
+ @Override
+ public void setLocation(final String location, final Job job)
+ throws IOException {
+ FileInputFormat.setInputPaths(job, location);
+ if (schema == null) {
+ schema = getInputAvroSchema();
+ if (schema == null) {
+ schema = getAvroSchema(location, job);
+ if (schema == null) {
+ throw new IOException(
+ "Could not determine avro schema for location " + location);
+ }
+ setInputAvroSchema(schema);
+ }
+ }
+ }
+
+ /**
+ * Sets the input avro schema to {@s}.
+ * @param s The specified schema
+ */
+ protected final void setInputAvroSchema(final Schema s) {
+ schema = s;
+ getProperties().setProperty(INPUT_AVRO_SCHEMA, s.toString());
+ }
+
+ /**
+ * Helper function reads the input avro schema from the UDF
+ * Properties.
+ * @return The input avro schema
+ */
+ public final Schema getInputAvroSchema() {
+ if (schema == null) {
+ String schemaString = getProperties().getProperty(INPUT_AVRO_SCHEMA);
+ if (schemaString != null) {
+ Schema s = new Schema.Parser().parse(schemaString);
+ schema = s;
+ }
+ }
+ return schema;
+ }
+
+ /*
+ * @see org.apache.pig.LoadFunc#getInputFormat()
+ */
+ @Override
+ public InputFormat<NullWritable, GenericData.Record> getInputFormat()
+ throws IOException {
+
+ return new org.apache.pig.backend.hadoop.executionengine.mapReduceLayer
+ .PigFileInputFormat<NullWritable, GenericData.Record>() {
+
+ @Override
+ public RecordReader<NullWritable, GenericData.Record>
+ createRecordReader(final InputSplit is, final TaskAttemptContext tc)
+ throws IOException, InterruptedException {
+ Schema s = getInputAvroSchema();
+ RecordReader<NullWritable, GenericData.Record> rr = null;
+ if (s.getType() == Type.ARRAY) {
+ rr = new AvroArrayReader(s);
+ } else {
+ rr = new AvroRecordReader(s);
+ }
+ rr.initialize(is, tc);
+ tc.setStatus(is.toString());
+ return rr;
+ }
+ };
+
+ }
+
+ @SuppressWarnings("rawtypes") private RecordReader reader;
+ PigSplit split;
+
+ /*
+ * @see
+ * org.apache.pig.LoadFunc#prepareToRead(org.apache.hadoop.mapreduce.RecordReader
+ * , org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit)
+ */
+ @SuppressWarnings("rawtypes")
+ @Override
+ public final void prepareToRead(final RecordReader r, final PigSplit s)
+ throws IOException {
+ reader = r;
+ split = s;
+ }
+
+ /*
+ * @see org.apache.pig.LoadFunc#getNext()
+ */
+
+ @Override
+ public final Tuple getNext() throws IOException {
+ try {
+ if (reader.nextKeyValue()) {
+ return new AvroTupleWrapper<GenericData.Record>(
+ (GenericData.Record) reader.getCurrentValue());
+ } else {
+ return null;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Wrapped Interrupted Exception", e);
+ }
+ }
+
+ @Override
+ public void cleanupOnSuccess(final String location, final Job job)
+ throws IOException {
+ }
+
+ @Override
+ public List<OperatorSet> getFeatures() {
+ return Lists.newArrayList(LoadPushDown.OperatorSet.PROJECTION);
+ }
+
+ /**
+ * List of required fields passed by pig in a push down projection.
+ */
+ protected RequiredFieldList requiredFieldList;
+
+ /*
+ * @see
+ * org.apache.pig.LoadPushDown#pushProjection(org.apache.pig.LoadPushDown.
+ * RequiredFieldList)
+ */
+ @Override
+ public RequiredFieldResponse pushProjection(final RequiredFieldList rfl)
+ throws FrontendException {
+ requiredFieldList = rfl;
+
+ Schema newSchema = AvroStorageSchemaConversionUtilities
+ .newSchemaFromRequiredFieldList(schema, rfl);
+ if (newSchema != null) {
+ schema = newSchema;
+ setInputAvroSchema(schema);
+ return new RequiredFieldResponse(true);
+ } else {
+ log.warn("could not select fields subset " + rfl + "\n");
+ warn("could not select fields subset", PigWarning.UDF_WARNING_2);
+ return new RequiredFieldResponse(false);
+ }
+
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.avro.AvroRecordWriter;
+import org.apache.pig.impl.util.avro.AvroStorageDataConversionUtilities;
+import org.apache.trevni.ColumnFileMetaData;
+import org.apache.trevni.MetaData;
+import org.apache.trevni.avro.AvroColumnReader;
+import org.apache.trevni.avro.AvroColumnWriter;
+import org.apache.trevni.avro.AvroTrevniOutputFormat;
+import org.apache.trevni.avro.HadoopInput;
+
+import com.google.common.collect.Lists;
+
+/**
+ *
+ * Pig Store/Load Function for Trevni.
+ *
+ */
+
+public class TrevniStorage extends AvroStorage implements LoadPushDown{
+
+ /**
+ * Create new instance of TrevniStorage with no arguments (useful
+ * for loading files without specifying parameters).
+ */
+ public TrevniStorage() {
+ super();
+ }
+
+ /**
+ * Create new instance of TrevniStorage.
+ * @param sn Specifies the input/output schema or record name.
+ * @param opts Options for AvroStorage:
+ * <li><code>-namespace</code> Namespace for an automatically generated
+ * output schema.</li>
+ * <li><code>-schemafile</code> Specifies URL for avro schema file
+ * from which to read the input schema (can be local file, hdfs,
+ * url, etc).</li>
+ * <li><code>-examplefile</code> Specifies URL for avro data file from
+ * which to copy the input schema (can be local file, hdfs, url, etc).</li>
+ * <li><code>-allowrecursive</code> Option to allow recursive schema
+ * definitions (default is false).</li>
+ */
+ public TrevniStorage(final String sn, final String opts) {
+ super(sn, opts);
+ }
+
+ /*
+ * @see org.apache.pig.LoadFunc#getInputFormat()
+ */
+ @Override
+ public InputFormat<NullWritable, GenericData.Record> getInputFormat()
+ throws IOException {
+
+ class TrevniStorageInputFormat
+ extends PigFileInputFormat<NullWritable, GenericData.Record> {
+
+ @Override protected boolean isSplitable(JobContext jc, Path p) {
+ return false;
+ }
+
+ @Override protected List<FileStatus> listStatus(final JobContext job)
+ throws IOException {
+ List<FileStatus> results = Lists.newArrayList();
+ job.getConfiguration().setBoolean("mapred.input.dir.recursive", true);
+ for (FileStatus file : super.listStatus(job)) {
+ if (VISIBLE_FILES.accept(file.getPath())) {
+ results.add(file);
+ }
+ }
+ return results;
+ }
+
+ @Override
+ public RecordReader<NullWritable, GenericData.Record>
+ createRecordReader(final InputSplit is, final TaskAttemptContext tc)
+ throws IOException, InterruptedException {
+ RecordReader<NullWritable, GenericData.Record> rr =
+ new RecordReader<NullWritable, GenericData.Record>() {
+
+ private FileSplit fsplit;
+ private AvroColumnReader.Params params;
+ private AvroColumnReader<GenericData.Record> reader;
+ private float rows;
+ private long row = 0;
+ private GenericData.Record currentRecord = null;
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public NullWritable getCurrentKey()
+ throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public Record getCurrentValue()
+ throws IOException, InterruptedException {
+ return currentRecord;
+ }
+
+ @Override
+ public float getProgress()
+ throws IOException, InterruptedException {
+ return row / rows;
+ }
+
+ @Override
+ public void initialize(final InputSplit isplit,
+ final TaskAttemptContext tac)
+ throws IOException, InterruptedException {
+ fsplit = (FileSplit) isplit;
+ params = new AvroColumnReader.Params(
+ new HadoopInput(fsplit.getPath(), tac.getConfiguration()));
+ Schema inputSchema = getInputAvroSchema();
+ params.setSchema(inputSchema);
+ reader = new AvroColumnReader<GenericData.Record>(params);
+ rows = reader.getRowCount();
+ }
+
+ @Override
+ public boolean nextKeyValue()
+ throws IOException, InterruptedException {
+ if (reader.hasNext()) {
+ currentRecord = reader.next();
+ row++;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
+
+ // rr.initialize(is, tc);
+ tc.setStatus(is.toString());
+ return rr;
+ }
+
+ }
+
+ return new TrevniStorageInputFormat();
+
+ }
+
+ /*
+ * @see org.apache.pig.StoreFuncInterface#getOutputFormat()
+ */
+ @Override
+ public OutputFormat<NullWritable, Object> getOutputFormat()
+ throws IOException {
+ class TrevniStorageOutputFormat
+ extends FileOutputFormat<NullWritable, Object> {
+
+ private Schema schema;
+
+ TrevniStorageOutputFormat(final Schema s) {
+ schema = s;
+ if (s == null) {
+ String schemaString = getProperties(
+ AvroStorage.class, udfContextSignature)
+ .getProperty(OUTPUT_AVRO_SCHEMA);
+ if (schemaString != null) {
+ schema = (new Schema.Parser()).parse(schemaString);
+ }
+ }
+
+ }
+
+ @Override
+ public RecordWriter<NullWritable, Object>
+ getRecordWriter(final TaskAttemptContext tc)
+ throws IOException, InterruptedException {
+
+ if (schema == null) {
+ String schemaString = getProperties(
+ AvroStorage.class, udfContextSignature)
+ .getProperty(OUTPUT_AVRO_SCHEMA);
+ if (schemaString != null) {
+ schema = (new Schema.Parser()).parse(schemaString);
+ }
+ if (schema == null) {
+ throw new IOException("Null output schema");
+ }
+ }
+
+ final ColumnFileMetaData meta = new ColumnFileMetaData();
+
+ for (Entry<String, String> e : tc.getConfiguration()) {
+ if (e.getKey().startsWith(
+ org.apache.trevni.avro.AvroTrevniOutputFormat.META_PREFIX)) {
+ meta.put(e.getKey().substring(AvroJob.TEXT_PREFIX.length()),
+ e.getValue().getBytes(MetaData.UTF8));
+ }
+ }
+
+ final Path dir = getOutputPath(tc);
+ final FileSystem fs = FileSystem.get(tc.getConfiguration());
+ final long blockSize = fs.getDefaultBlockSize();
+
+ if (!fs.mkdirs(dir)) {
+ throw new IOException("Failed to create directory: " + dir);
+ }
+
+ meta.setCodec("deflate");
+
+ return new AvroRecordWriter(dir, tc.getConfiguration()) {
+ private int part = 0;
+ private Schema avroRecordWriterSchema;
+ private AvroColumnWriter<GenericData.Record> writer;
+
+ private void flush() throws IOException {
+ Integer taskAttemptId = tc.getTaskAttemptID().getTaskID().getId();
+ String partName = String.format("%05d_%03d", taskAttemptId, part++);
+ OutputStream out = fs.create(
+ new Path(dir, "part-" + partName + AvroTrevniOutputFormat.EXT));
+ try {
+ writer.writeTo(out);
+ } finally {
+ out.flush();
+ out.close();
+ }
+ }
+
+ @Override
+ public void close(final TaskAttemptContext arg0)
+ throws IOException, InterruptedException {
+ flush();
+ }
+
+ @Override
+ public void write(final NullWritable n, final Object o)
+ throws IOException, InterruptedException {
+ GenericData.Record r =
+ AvroStorageDataConversionUtilities
+ .packIntoAvro((Tuple) o, schema);
+ writer.write(r);
+ if (writer.sizeEstimate() >= blockSize) {
+ flush();
+ writer = new AvroColumnWriter<GenericData.Record>(
+ avroRecordWriterSchema, meta);
+ }
+ }
+
+ @Override
+ public void prepareToWrite(Schema s) throws IOException {
+ avroRecordWriterSchema = s;
+ writer = new AvroColumnWriter<GenericData.Record>(
+ avroRecordWriterSchema, meta);
+ }
+ };
+ }
+ }
+
+ return new TrevniStorageOutputFormat(schema);
+ }
+
+ @Override
+ public Schema getAvroSchema(Path p, final Job job) throws IOException {
+
+ FileSystem fs = FileSystem.get(p.toUri(), job.getConfiguration());
+ FileStatus[] statusArray = fs.globStatus(p, VISIBLE_FILES);
+
+ if (statusArray == null) {
+ throw new IOException("Path " + p.toString() + " does not exist.");
+ }
+
+ if (statusArray.length == 0) {
+ throw new IOException("No path matches pattern " + p.toString());
+ }
+
+ Path filePath = depthFirstSearchForFile(statusArray, fs);
+
+ if (filePath == null) {
+ throw new IOException("No path matches pattern " + p.toString());
+ }
+
+ AvroColumnReader.Params params =
+ new AvroColumnReader.Params(
+ new HadoopInput(filePath, job.getConfiguration()));
+ AvroColumnReader<GenericData.Record> reader =
+ new AvroColumnReader<GenericData.Record>(params);
+ Schema s = reader.getFileSchema();
+ reader.close();
+ return s;
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroArrayReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroArrayReader.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroArrayReader.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroArrayReader.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util.avro;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import com.google.common.collect.Lists;
+
+
+/**
+ * RecordReader for Avro files
+ */
+public final class AvroArrayReader
+ extends RecordReader<NullWritable, GenericData.Record> {
+
+ private FileReader<GenericData.Array<Object>> reader;
+ private long start;
+ private long end;
+ private Schema schema;
+ private GenericData.Array<Object> currentArray;
+
+ /**
+ * Creates new instance of AvroRecordReader.
+ * @param s The input schema.
+ */
+ public AvroArrayReader(final Schema s) {
+ schema = s;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ /**
+ * Returns current value.
+ * @return the current value
+ * @throws IOException when an IO error occurs
+ * @throws InterruptedException when interrupted
+ */
+ @Override
+ public GenericData.Record getCurrentValue()
+ throws IOException, InterruptedException {
+ if (currentArray != null) {
+ GenericData.Record r = new GenericData.Record(
+ Schema.createRecord(
+ Lists.newArrayList(
+ new Schema.Field(schema.getName(), schema, null, null))
+ )
+ );
+ r.put(0, currentArray);
+ return r;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public NullWritable getCurrentKey()
+ throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ if (start == end) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f,
+ ((float) (reader.tell() - start)) / ((float) (end - start)));
+ }
+ }
+
+ @Override
+ public void initialize(final InputSplit isplit, final TaskAttemptContext tc)
+ throws IOException, InterruptedException {
+
+ FileSplit fsplit = (FileSplit) isplit;
+ start = fsplit.getStart();
+ end = fsplit.getStart() + fsplit.getLength();
+ DatumReader<GenericData.Array<Object>> datumReader
+ = new GenericDatumReader<GenericData.Array<Object>>(schema);
+ reader = DataFileReader.openReader(
+ new FsInput(fsplit.getPath(), tc.getConfiguration()),
+ datumReader);
+ reader.sync(start);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+
+ if (reader.pastSync(end)) {
+ return false;
+ }
+ try {
+ currentArray = reader.next();
+ } catch (NoSuchElementException e) {
+ return false;
+ }
+ return true;
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util.avro;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+/**
+ * Class that implements the Pig bag interface, wrapping an Avro array.
+ * Designed to reduce data copying.
+ * @param <T> Type of objects in Avro array
+ */
+public final class AvroBagWrapper<T> implements DataBag {
+
+ /**
+ * The array object wrapped in this AvroBagWrapper object.
+ */
+ private GenericArray<T> theArray;
+
+ /**
+ * Create new AvroBagWrapper instance.
+ * @param a Avro array to wrap in bag
+ */
+ public AvroBagWrapper(final GenericArray<T> a) { theArray = a; }
+
+ @Override
+ public long spill() { return 0; }
+
+ @Override
+ public long getMemorySize() {
+ return 0;
+ }
+
+ @Override
+ public void readFields(final DataInput d) throws IOException {
+ throw new IOException(
+ this.getClass().toString() + ".readFields not implemented yet");
+ }
+
+ @Override
+ public void write(final DataOutput d) throws IOException {
+ throw new IOException(
+ this.getClass().toString() + ".write not implemented yet");
+ }
+
+ @Override
+ public int compareTo(final Object o) {
+ return GenericData.get().compare(theArray, o, theArray.getSchema());
+ }
+
+ @Override public long size() { return theArray.size(); }
+ @Override public boolean isSorted() { return false; }
+ @Override public boolean isDistinct() { return false; }
+
+ @Override
+ public Iterator<Tuple> iterator() {
+ return Iterators.transform(theArray.iterator(),
+ new Function<T, Tuple>() {
+ @Override
+ public Tuple apply(final T arg) {
+ if (arg instanceof IndexedRecord) {
+ return new AvroTupleWrapper<IndexedRecord>((IndexedRecord) arg);
+ } else {
+ return TupleFactory.getInstance().newTuple(arg);
+ }
+ }
+ }
+ );
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override public void add(final Tuple t) { theArray.add((T) t); }
+
+ @Override
+ public void addAll(final DataBag b) {
+ for (Tuple t : b) {
+ add(t);
+ }
+ }
+
+ @Override public void clear() { theArray.clear(); }
+ @Override public void markStale(final boolean stale) { }
+
+}
Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util.avro;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.util.Utf8;
+
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Sets;
+import com.google.common.base.Function;
+
+/**
+ * Wrapper for map objects, so we can translate UTF8 objects to
+ * Strings if we encounter them.
+ */
+public final class AvroMapWrapper implements Map<CharSequence, Object> {
+
+ /**
+ * The map contained in the wrapper object.
+ */
+ private Map<CharSequence, Object> innerMap;
+
+ /**
+ * Creates a new AvroMapWrapper object from the map object {@m}.
+ * @param m The map to wrap.
+ */
+ public AvroMapWrapper(final Map<CharSequence, Object> m) {
+ innerMap = m;
+ }
+
+ @Override
+ public int size() {
+ return innerMap.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return innerMap.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(final Object key) {
+ return innerMap.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(final Object value) {
+ return innerMap.containsValue(value);
+ }
+
+ @Override
+ public Object get(final Object key) {
+ Object v = innerMap.get(key);
+ if (v instanceof Utf8) {
+ return v.toString();
+ } else {
+ return v;
+ }
+ }
+
+ @Override
+ public Object put(final CharSequence key, final Object value) {
+ return innerMap.put(key, value);
+ }
+
+ @Override
+ public Object remove(final Object key) {
+ return innerMap.remove(key);
+ }
+
+ @Override
+ public void putAll(
+ final Map<? extends CharSequence, ? extends Object> m) {
+ innerMap.putAll(m);
+ }
+
+ @Override
+ public void clear() {
+ innerMap.clear();
+ }
+
+ @Override
+ public Set<CharSequence> keySet() {
+ return innerMap.keySet();
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public Collection<Object> values() {
+ return Collections2.transform(innerMap.values(),
+ new Function() {
+ @Override
+ public Object apply(final Object v) {
+ if (v instanceof Utf8) {
+ return v.toString();
+ } else {
+ return v;
+ }
+ }
+ }
+ );
+ }
+
+ @Override
+ public Set<java.util.Map.Entry<CharSequence, Object>> entrySet() {
+ Set<java.util.Map.Entry<CharSequence, Object>> theSet =
+ Sets.newHashSetWithExpectedSize(innerMap.size());
+ for (java.util.Map.Entry<CharSequence, Object> e : innerMap.entrySet()) {
+ CharSequence k = e.getKey();
+ Object v = e.getValue();
+ if (k instanceof Utf8) {
+ k = k.toString();
+ }
+ if (v instanceof Utf8) {
+ v = v.toString();
+ }
+ theSet.add(new AbstractMap.SimpleEntry<CharSequence, Object>(k, v));
+ }
+
+ return theSet;
+
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordReader.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordReader.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordReader.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util.avro;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+
+/**
+ * RecordReader for Avro files
+ */
+public final class AvroRecordReader
+ extends RecordReader<NullWritable, GenericData.Record> {
+
+ private FileReader<GenericData.Record> reader;
+ private long start;
+ private long end;
+ private Schema schema;
+ private GenericData.Record currentRecord;
+
+ /**
+ * Creates new instance of AvroRecordReader.
+ * @param s The input schema.
+ */
+ public AvroRecordReader(final Schema s) {
+ schema = s;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ /**
+ * Returns current value.
+ * @return the current value
+ * @throws IOException when an IO error occurs
+ * @throws InterruptedException when interrupted
+ */
+ @Override
+ public GenericData.Record getCurrentValue()
+ throws IOException, InterruptedException {
+ if (currentRecord != null) {
+ return currentRecord;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public NullWritable getCurrentKey()
+ throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ if (start == end) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f,
+ ((float) (reader.tell() - start)) / ((float) (end - start)));
+ }
+ }
+
+ @Override
+ public void initialize(final InputSplit isplit, final TaskAttemptContext tc)
+ throws IOException, InterruptedException {
+
+ FileSplit fsplit = (FileSplit) isplit;
+ start = fsplit.getStart();
+ end = fsplit.getStart() + fsplit.getLength();
+ DatumReader<GenericData.Record> datumReader
+ = new GenericDatumReader<GenericData.Record>(schema);
+ reader = DataFileReader.openReader(
+ new FsInput(fsplit.getPath(), tc.getConfiguration()),
+ datumReader);
+ reader.sync(start);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+
+ if (reader.pastSync(end)) {
+ return false;
+ }
+
+ try {
+ currentRecord = reader.next(new GenericData.Record(schema));
+ } catch (NoSuchElementException e) {
+ return false;
+ } catch (IOException ioe) {
+ reader.sync(reader.tell()+1);
+ throw ioe;
+ }
+
+ return true;
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java?rev=1497954&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java Fri Jun 28 22:54:05 2013
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util.avro;
+
+import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
+import static org.apache.avro.mapred.AvroOutputFormat.SYNC_INTERVAL_KEY;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.Map;
+
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.data.Tuple;
+
+/**
+ * RecordWriter for Avro objects.
+ */
+public class AvroRecordWriter extends RecordWriter<NullWritable, Object> {
+
+ private Schema schema = null;
+ private DataFileWriter<GenericData.Record> writer;
+ private Path out;
+ private Configuration conf;
+
+ /**
+ * Creates new AvroRecordWriter.
+ * @param s Schema for the files on this output path
+ * @param o Output path
+ * @param c Hadoop configuration
+ * @throws IOException
+ */
+ public AvroRecordWriter(/*final Schema s, */final Path o, final Configuration c)
+ throws IOException {
+ out = o;
+ conf = c;
+ }
+
+ // copied from org.apache.avro.mapred.AvroOutputFormat
+ static void configureDataFileWriter(DataFileWriter<GenericData.Record> writer,
+ JobConf job) throws UnsupportedEncodingException {
+ if (FileOutputFormat.getCompressOutput(job)) {
+ int level = job.getInt(DEFLATE_LEVEL_KEY,
+ DEFAULT_DEFLATE_LEVEL);
+ String codecName = job.get(AvroJob.OUTPUT_CODEC, DEFLATE_CODEC);
+ CodecFactory factory = codecName.equals(DEFLATE_CODEC)
+ ? CodecFactory.deflateCodec(level)
+ : CodecFactory.fromString(codecName);
+ writer.setCodec(factory);
+ }
+
+ writer.setSyncInterval(job.getInt(SYNC_INTERVAL_KEY,
+ DEFAULT_SYNC_INTERVAL));
+
+ // copy metadata from job
+ for (Map.Entry<String,String> e : job) {
+ if (e.getKey().startsWith(AvroJob.TEXT_PREFIX))
+ writer.setMeta(e.getKey().substring(AvroJob.TEXT_PREFIX.length()),
+ e.getValue());
+ if (e.getKey().startsWith(AvroJob.BINARY_PREFIX))
+ writer.setMeta(e.getKey().substring(AvroJob.BINARY_PREFIX.length()),
+ URLDecoder.decode(e.getValue(), "ISO-8859-1")
+ .getBytes("ISO-8859-1"));
+ }
+ }
+
+ @Override
+ public void close(final TaskAttemptContext arg0)
+ throws IOException, InterruptedException {
+ writer.close();
+ }
+
+ @Override
+ public void write(final NullWritable key, final Object value)
+ throws IOException, InterruptedException {
+
+ if (value instanceof GenericData.Record) {
+ // whoo-hoo! already avro
+ writer.append((GenericData.Record) value);
+ } else if (value instanceof Tuple) {
+ // pack the object into an Avro record
+ writer.append(AvroStorageDataConversionUtilities
+ .packIntoAvro((Tuple) value, schema));
+ }
+ }
+
+ public void prepareToWrite(Schema s) throws IOException {
+ if (s == null) {
+ throw new IOException(
+ this.getClass().getName() + ".prepareToWrite called with null schema");
+ }
+ schema = s;
+ DatumWriter<GenericData.Record> datumWriter =
+ new GenericDatumWriter<GenericData.Record>(s);
+ writer = new DataFileWriter<GenericData.Record>(datumWriter);
+ configureDataFileWriter(writer, new JobConf(conf));
+ writer.create(s, out.getFileSystem(conf).create(out));
+
+ }
+
+}