You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@avro.apache.org by Scott Carey <sc...@richrelevance.com> on 2010/05/14 07:35:35 UTC

AvroStorage pig adapter

I'm working on some prototypes for an org.apache.avro.pig package for java that contains avro <> pig storage adapters.

This is going fairly well.  I already have org.apache.avro.mapreduce done for InputFormat and OutputFormat (pig requires use of the 0.20 api), once I get testing working I'll submit a patch and JIRA for that.  We also need to package these libraries in a different jar than the core avro content.

However there are some difficulties that I could use a little help on.   All Pig datatypes map to Avro easily except for the Pig MAP datatype.

A Pig map, like an Avro map, must have a string as a key.  However, its value type is essentially Object and can be any pig type.  A single pig map might have the contents:
"city" > "San Francisco"
"elevation" > 25

Avro map values must all have the same type.  A pig schema does not define what type is inside the map.  Other pig serializations dynamically handle it.
In Avro, this seems straightforward at first -- the value of the map must be a Union of all possible pig types:
  [ null, boolean, int, long, float, double, chararray, bytearray, tuple, bag, map ]

The problem comes in with the last three.   I'm fairly sure there is no valid Avro schema to represent this situation.  The tuple in the union can be a tuple of any possible compostion -- avro requires defining its fields in advance.  Likewise, the bag can contain tuples of any possible composition.  The map has to self-reference, and there's a bit of a chicken-egg problem there.  In order to create the map schema I have to have the union containing it already created.

If I support only maps that contain simple value types, then pig can only detect the failure at runtime when the output is written and a complex map value is encountered during serialization.
I can support these arbitrary types by serializing them to byte[] via their Writable API and storing these as an avro bytes type.  That is a hack that I'd rather avoid but looks to be the one way out.

Unless I'm missing something, we can't serialize Pig maps in pure Avro unless either:

* Avro adds some sort of 'dynamic' typing for records that have unknown fields at schema create time.  For example, {"name": "unknown", "type": "dynamic-record"} can signify an untyped collection of fields, each field in binary can be prefixed by a type byte and the field names auto-generated (perhaps "$0", "$1", etc).
* Pig makes their map values strictly typed, like Avro and Hive.  I'd also like to see Pig and Avro support maps that have integer and long keys like Hive but that is a separate concern.


On the other side -- reading an avro container file into a Pig schema, there are a few limitations:

An Avro schema cannot be translated cleanly to Pig if:
  There is a union that is more than a union of NULL and one type, unless it is a map value.

There are some hacks around the above -- the union can be 'expanded':

a field in avro:
 {"name": "foo", "type": ["type": "int", "type": "string"]}
 
becomes two fields in pig, one null:
 (fooInt: int, fooString: string)
  
The above is not a general purpose solution but it can be useful.  It would be nice if Pig supported unions.  In many ways it already does at a lower level, but this is not exposed in the user facing types system.


JIRA's to come when the code is in better shape.

-Scott

RE: AvroStorage pig adapter

Posted by "Thiruvalluvan M. G." <th...@yahoo.com>.
> Yes, it helps a lot.  One question remains, how can I construct a
> recursive schema programmatically?
> I have a couple options for the pig Tuple avro schema -- write it in JSON
> and put that in the source code or programmatically construct it.
> I'm currently programmatically constructing a schema specific to the Pig
> schema that is serialized, which is straightforward until I hit the map
> type and recursion.

For my benchmark, I put the schema in jar (or wherever the class files are
located) and then used ClassLoader.getResourceAsStream(). This reduces the
possibility that the schema file is not found during runtime.

Thanks

Thiru




Re: AvroStorage pig adapter

Posted by Doug Cutting <cu...@apache.org>.
On 05/15/2010 02:08 PM, Scott Carey wrote:
> On May 14, 2010, at 12:18 PM, Doug Cutting wrote:
>> To construct a recursive schema programmatically you need to do
>> what the schema parser does: create a record schema with
>> createSchema(), create it's fields, including one or more that
>> references the record schema, then call setFields() with the
>> fields.
>
> Excellent.  I'll do that.  This means that the recursion points that
> avro supports can only be records, right?

Yes, in the existing API.

> Only a record has the
> equivalent of setFields(), everything else that can contain another
> element (arrays, unions) must declare the inner element(s) at
> creation time.

A union-of-unions is illegal, but an array-of-arrays is not.  So it 
might make sense to add a method like setElementType() if you need to 
use arrays as the recursion point.

Doug

Re: AvroStorage pig adapter

Posted by Scott Carey <sc...@richrelevance.com>.
On May 14, 2010, at 12:18 PM, Doug Cutting wrote:

> On 05/14/2010 11:50 AM, Scott Carey wrote:
>> It doesn't cost on the serialization size but currently it costs a lot on the performance side.
> 
> Not necessarily. For Pig data in Java I think one might reasonably write 
> a custom reader and writer that reads and writes Pig data structures 
> directly.  This could look something like readSchema, writeSchema, 
> readJson and writeJson methods in the patch for AVRO-251.
> 
> https://issues.apache.org/jira/browse/AVRO-251
> 
> Start by looking at json.avsc (an Avro schema for arbitrary JSON data) 
> then see the readJson() method.  It directly reads Avro data 
> corresponding to that schema into a Jackson JsonNode.  ResolvingDecoder 
> enforces the schema.

This has to work through Hadoop via an InputFormat / OutputFormat 0.20 apis, and the o.a.h.mapreduce API implementations I have made closely resemble those you made for the old o.a.h.mapred api.  These currently require using the Generic or Specific API, though one could make one that took a Pig Tuple directly instead of an avro object.

Longer term, I think we can get the Specific and Reflect APIs to perform as well as the above by dynamically compiling a schema's serialization and deserialization via ASM into direct decoder/encoder API calls.  But thats the sort of research project I only wish I had time for :)

> 
>> Yes, it helps a lot.  One question remains, how can I construct a recursive schema programmatically?
>> I have a couple options for the pig Tuple avro schema -- write it in JSON and put that in the source code or programmatically construct it.
>> I'm currently programmatically constructing a schema specific to the Pig schema that is serialized, which is straightforward until I hit the map type and recursion.
> 
> If you're not using a universal Pig schema then the above strategy may 
> or may not work.  It might still work if the specific schema is always a 
> subset of the universal Pig schema, which I suspect it is.

One requirement I may not have communicated well is that I want to persist the Pig schema, not just the pig data.   So if someone in Pig does:

STORE FOO into 'directory' using AvroStorage();

and FOO has pig schema:
{ firstName: string, lastName: string, departmentId: int, salary: long }
The avro schema would ideally be:
{ "name": "FOO", "type": "record", "fields": [
  { "name": "firstName", "type": "string" },
  { "name": "lastName", "type": "string" },
  { "name": "departmentId", "type": "int" },
  { "name": "salary", "type": "long"}
]

Then, when the avro container file is loaded back in pig (or in Java M/R, or Hive, or Cascading, or external applications, etc) the field names are preserved.  Without this feature much of the power of Avro goes away -- resolving schema evolution is manual and lock-step across all data consumers, etc.

I will create a unique schema for each store, and when a Map is encountered the value will be an avro record that can contain generic, nameless pig data using something much like Thiru's schema.

> 
> To construct a recursive schema programmatically you need to do what the 
> schema parser does: create a record schema with createSchema(), create 
> it's fields, including one or more that references the record schema, 
> then call setFields() with the fields.

Excellent.  I'll do that.  This means that the recursion points that avro supports can only be records, right?  Only a record has the equivalent of setFields(), everything else that can contain another element (arrays, unions) must declare the inner element(s) at creation time.

> 
> Doug
> 
> 

Thanks Doug,

-Scott

Re: AvroStorage pig adapter

Posted by Doug Cutting <cu...@apache.org>.
On 05/14/2010 11:50 AM, Scott Carey wrote:
> It doesn't cost on the serialization size but currently it costs a lot on the performance side.

Not necessarily. For Pig data in Java I think one might reasonably write 
a custom reader and writer that reads and writes Pig data structures 
directly.  This could look something like readSchema, writeSchema, 
readJson and writeJson methods in the patch for AVRO-251.

https://issues.apache.org/jira/browse/AVRO-251

Start by looking at json.avsc (an Avro schema for arbitrary JSON data) 
then see the readJson() method.  It directly reads Avro data 
corresponding to that schema into a Jackson JsonNode.  ResolvingDecoder 
enforces the schema.

> Yes, it helps a lot.  One question remains, how can I construct a recursive schema programmatically?
> I have a couple options for the pig Tuple avro schema -- write it in JSON and put that in the source code or programmatically construct it.
> I'm currently programmatically constructing a schema specific to the Pig schema that is serialized, which is straightforward until I hit the map type and recursion.

If you're not using a universal Pig schema then the above strategy may 
or may not work.  It might still work if the specific schema is always a 
subset of the universal Pig schema, which I suspect it is.

To construct a recursive schema programmatically you need to do what the 
schema parser does: create a record schema with createSchema(), create 
it's fields, including one or more that references the record schema, 
then call setFields() with the fields.

Doug



Re: AvroStorage pig adapter

Posted by Scott Carey <sc...@richrelevance.com>.
On May 14, 2010, at 12:17 AM, Thiruvalluvan M. G. wrote:

> 
> What you are looking for is the inner record called Element within the
> AvroTuple. I named the outer record AvroTuple because I wrote IDLs for
> Protocol Buffers and Thrift and wanted the class names to be unambiguous.
> 
> The tuple should actually be an array rather than a record. But since arrays
> cannot be named in Avro, I wrapped the array with a record. Please note
> wrapping objects by records in Avro does not cost anything in the binary
> format.

It doesn't cost on the serialization size but currently it costs a lot on the performance side.

> I use the same technique to represent more than one type by a single
> Avro type. For instance Pig's string and Pig's BigCharArray are both
> represented by Avro string. I use the a record to distinguish between them.
> 
> Does it solve your problem?

Yes, it helps a lot.  One question remains, how can I construct a recursive schema programmatically?
I have a couple options for the pig Tuple avro schema -- write it in JSON and put that in the source code or programmatically construct it.
I'm currently programmatically constructing a schema specific to the Pig schema that is serialized, which is straightforward until I hit the map type and recursion.

Thanks,

-Scott



RE: AvroStorage pig adapter

Posted by "Thiruvalluvan M. G." <th...@yahoo.com>.
Hi Scott,

As part of benchmarking program which compares the serialization and
de-serialization performance of Avro against Protocol Buffers and Thrift, I
used the following schema to represent Pig tuples. (I should publish the
code and results, I didn't find time to clean it up a bit).

{
    "type" : "record",
    "name" : "AvroTuple",
    "namespace" : "org.apache.avro.bench.pig.tuple.avro",
    "fields" :
    [
        {
            "name" : "elements",
            "type" : {
                "type" : "array",
                "items" :
                {
                    "type" : "record",
                    "name" : "Element",
                    "fields" :
                    [
                        {
                            "name" : "value",
                            "type" :
                            [
                                { "type" : "array", "items" : "AvroTuple" },
                                { "type" : "record", "name":
"AvroBigCharArray",
                                    "fields" : [ { "name": "data",
                                        "type": "string" } ] },
                                "boolean",
                                { "type" : "record", "name": "AvroByte",
                                    "fields" : [ { "name": "data",
                                        "type": "int" } ] },
                                "bytes",
                                "string",
                                "double",
                                { "type" : "record", "name": "AvroError",
                                    "fields" : [ { "name": "data",
                                        "type": "null" } ] },
                                "float",
                                "int",
                                { "type" : "record", "name":
"AvroInternalMap",
                                    "fields" : [ { "name": "data",
                                        "type": "null" } ] },
                                "long",
                                { "type" : "map", "values" : "Element" },
                                "null",
                                "AvroTuple"
                            ]
                        }
                    ]
                }
            }
        }
    ]
}

What you are looking for is the inner record called Element within the
AvroTuple. I named the outer record AvroTuple because I wrote IDLs for
Protocol Buffers and Thrift and wanted the class names to be unambiguous.

The tuple should actually be an array rather than a record. But since arrays
cannot be named in Avro, I wrapped the array with a record. Please note
wrapping objects by records in Avro does not cost anything in the binary
format. I use the same technique to represent more than one type by a single
Avro type. For instance Pig's string and Pig's BigCharArray are both
represented by Avro string. I use the a record to distinguish between them.

Does it solve your problem?

Thanks

Thiru


-----Original Message-----
From: Scott Carey [mailto:scott@richrelevance.com] 
Sent: Friday, May 14, 2010 11:06 AM
To: avro-dev@hadoop.apache.org
Subject: AvroStorage pig adapter

I'm working on some prototypes for an org.apache.avro.pig package for java
that contains avro <> pig storage adapters.

This is going fairly well.  I already have org.apache.avro.mapreduce done
for InputFormat and OutputFormat (pig requires use of the 0.20 api), once I
get testing working I'll submit a patch and JIRA for that.  We also need to
package these libraries in a different jar than the core avro content.

However there are some difficulties that I could use a little help on.   All
Pig datatypes map to Avro easily except for the Pig MAP datatype.

A Pig map, like an Avro map, must have a string as a key.  However, its
value type is essentially Object and can be any pig type.  A single pig map
might have the contents:
"city" > "San Francisco"
"elevation" > 25

Avro map values must all have the same type.  A pig schema does not define
what type is inside the map.  Other pig serializations dynamically handle
it.
In Avro, this seems straightforward at first -- the value of the map must be
a Union of all possible pig types:
  [ null, boolean, int, long, float, double, chararray, bytearray, tuple,
bag, map ]

The problem comes in with the last three.   I'm fairly sure there is no
valid Avro schema to represent this situation.  The tuple in the union can
be a tuple of any possible compostion -- avro requires defining its fields
in advance.  Likewise, the bag can contain tuples of any possible
composition.  The map has to self-reference, and there's a bit of a
chicken-egg problem there.  In order to create the map schema I have to have
the union containing it already created.

If I support only maps that contain simple value types, then pig can only
detect the failure at runtime when the output is written and a complex map
value is encountered during serialization.
I can support these arbitrary types by serializing them to byte[] via their
Writable API and storing these as an avro bytes type.  That is a hack that
I'd rather avoid but looks to be the one way out.

Unless I'm missing something, we can't serialize Pig maps in pure Avro
unless either:

* Avro adds some sort of 'dynamic' typing for records that have unknown
fields at schema create time.  For example, {"name": "unknown", "type":
"dynamic-record"} can signify an untyped collection of fields, each field in
binary can be prefixed by a type byte and the field names auto-generated
(perhaps "$0", "$1", etc).
* Pig makes their map values strictly typed, like Avro and Hive.  I'd also
like to see Pig and Avro support maps that have integer and long keys like
Hive but that is a separate concern.


On the other side -- reading an avro container file into a Pig schema, there
are a few limitations:

An Avro schema cannot be translated cleanly to Pig if:
  There is a union that is more than a union of NULL and one type, unless it
is a map value.

There are some hacks around the above -- the union can be 'expanded':

a field in avro:
 {"name": "foo", "type": ["type": "int", "type": "string"]}
 
becomes two fields in pig, one null:
 (fooInt: int, fooString: string)
  
The above is not a general purpose solution but it can be useful.  It would
be nice if Pig supported unions.  In many ways it already does at a lower
level, but this is not exposed in the user facing types system.


JIRA's to come when the code is in better shape.

-Scott=