You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@avro.apache.org by "Scott Carey (JIRA)" <ji...@apache.org> on 2011/07/14 18:43:00 UTC

[jira] [Created] (AVRO-859) Java: Data Flow Overhaul -- Composition and Symmetry

Java: Data Flow Overhaul -- Composition and Symmetry
----------------------------------------------------

                 Key: AVRO-859
                 URL: https://issues.apache.org/jira/browse/AVRO-859
             Project: Avro
          Issue Type: New Feature
          Components: java
            Reporter: Scott Carey
            Assignee: Scott Carey


Data flow in Avro is currently broken into two parts:  Read and Write.  These share many common patterns but almost no common code.  
Additionally, the APIs for this are DatumReader and DatumWriter, which requires that implementations know how to traverse Schemas and use the Resolver.

This is a proposal to overhaul the inner workings of Avro Java between the Decoder/Encoder APIs and DatumReader/DatumWriter such that there is significantly more code re-use and much greater opportunity for new features that can all share in general optimizations and dynamic code generation.

The two primary concepts involved are:
* _*Functional Composition*_
* _*Symmetry*_

h4. Functional Composition
All read and write operations can be broken into functional bits and composed rather than writing monolithic classes.  This allows a "DatumWriter2" to be a graph of functions that pre-compute all state required from a schema rather than traverse a schema for each write.

h4. Symmetry
Avro's data flow can be made symmetric.  Rather than thinking in terms of Read and Write, think in terms of:
* _*Source*_: Where data that is represented by an Avro schema comes from -- this may be a Decoder, or an Object graph.
* _*Target*_: Where data that represents an Avro schema is sent -- this may be an Encoder or an Object graph.

(More detail in the comments)

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (AVRO-859) Java: Data Flow Overhaul -- Composition and Symmetry

Posted by "Douglas Creager (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/AVRO-859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13069091#comment-13069091 ] 

Douglas Creager commented on AVRO-859:
--------------------------------------

Awesome stuff.  Whenever we decided to implement the Haskell Avro library, this will be a good definition of the inevitable monad that we'll have to write.  :-)

I've also been working on something similar in the C library.  Hopefully we can have some cross-pollination of ideas here.

It started off with the “consumer” interface that I introduced in AVRO-762.  I think this corresponds to the Target in your description above.  In addition to the generic consumer interface, I wrote an implementation of that consumer interface that would perform schema resolution.  And then a generic function that would consume binary Avro data, and pass the results into a consumer.

The natural next step would've been to add a “producer” interface, which would've corresponded to the Source in your model.  However, the one main issue I had with this approach is that you'd have two competing models: one where you push data through a chain of consumers, and one where you pull data through a chain of producers.  It didn't seem like either pushing or pulling could be used as the “one true way”.

To get around this, I decided to go with a new “value” interface (AVRO-837), rather than separate consumer and producer interfaces.  In this model, an {{avro_value_t}} is anything that can mimic an Avro value.  It's basically a big collection of getter and setter methods for the content of an Avro value of a particular schema.  Binary decoding doesn't have its own value implementation, but it can use the setter methods to fill in any value implementation — including one that just immediately serializes the contents into a JSON encoding, for instance.

Schema resolution can then be implemented as two separate value implementations.  (I have this one coded up, but I don't have an issue open for it yet.  I should get on that.)  The schema resolution classes provide a “view” into an existing Avro value, allowing you to treat it as if it were an instance of a different schema.  You need two classes because the wrapped value might be on either the “writer schema” or “reader schema” end of the resolution process.

> Java: Data Flow Overhaul -- Composition and Symmetry
> ----------------------------------------------------
>
>                 Key: AVRO-859
>                 URL: https://issues.apache.org/jira/browse/AVRO-859
>             Project: Avro
>          Issue Type: New Feature
>          Components: java
>            Reporter: Scott Carey
>            Assignee: Scott Carey
>
> Data flow in Avro is currently broken into two parts:  Read and Write.  These share many common patterns but almost no common code.  
> Additionally, the APIs for this are DatumReader and DatumWriter, which requires that implementations know how to traverse Schemas and use the Resolver.
> This is a proposal to overhaul the inner workings of Avro Java between the Decoder/Encoder APIs and DatumReader/DatumWriter such that there is significantly more code re-use and much greater opportunity for new features that can all share in general optimizations and dynamic code generation.
> The two primary concepts involved are:
> * _*Functional Composition*_
> * _*Symmetry*_
> h4. Functional Composition
> All read and write operations can be broken into functional bits and composed rather than writing monolithic classes.  This allows a "DatumWriter2" to be a graph of functions that pre-compute all state required from a schema rather than traverse a schema for each write.
> h4. Symmetry
> Avro's data flow can be made symmetric.  Rather than thinking in terms of Read and Write, think in terms of:
> * _*Source*_: Where data that is represented by an Avro schema comes from -- this may be a Decoder, or an Object graph.
> * _*Target*_: Where data that represents an Avro schema is sent -- this may be an Encoder or an Object graph.
> (More detail in the comments)

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (AVRO-859) Java: Data Flow Overhaul -- Composition and Symmetry

Posted by "Scott Carey (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/AVRO-859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13065402#comment-13065402 ] 

Scott Carey commented on AVRO-859:
----------------------------------

h4. Functional Composition
All read and write operations can be broken into functional bits and composed rather than writing monolithic classes.  This allows a "DatumWriter2" to be a graph of functions that pre-compute all state required from a schema rather than traverse a schema for each write.  Additionally, if the functions are all of a common set of types, it becomes easy to use code generation:  either directly or by parsing the resulting function graph and converting to code that the JVM can better optimize.

h4. Symmetry
Avro's data flow can be made symmetric.  Rather than thinking in terms of Read and Write, think in terms of:
* _*Source*_: Where data that is represented by an Avro schema comes from -- this may be a Decoder, or an Object graph.
* _*Target*_: Where data that represents an Avro schema is sent -- this may be an Encoder or an Object graph.

Combine the two ideas together and you can create _*Flows*_ -- The combination of a Source and a Target for a specific Schema (or resolvable Schema pair).
The machinery that requires traversing and resolving schemas can be written once, and "DatumReader" written once, with different source and targets combined to make different tools:
* An Decoder source + GenericData target = GenericDatumReader
* A SpecificData source +  Encoder target = GenericDatumWriter
* BinaryDecoder source + JsonEncoder target = transform from binary to json without any intermediate objects!
* SpecificData source + GeneridData target = transform one object type to another

Add in new sources and targets (Pig, ProtoBuf, Thrift objects; Pig binary, Protobuf binary, Thrift binary) and you can mix/match more transformation tasks.

Additinally, one can write a generic Equals/Compare implementation that takes two Sources, and compares them or checks for equality.  Then, you can compare binary with an object, or two objects.
Data flow could also tee:  one source with many targets.



h4. Functional units
After much prototyping and desingn, I have identified that all Avro data flow can be done by the composition of two functors:
The Unary Functor, which I have named *Access*: 
{code}
Access<A,B> {
 B access(A a);
}
{code}
And a Binary Functor with two types named *Flow*:
{code}
Flow<A,B> {
 B flow(A a, B b);
}
{code}
In most cases, you can replace "A" with "FROM" and "B" with "TO" in relation to Target and Source concepts.  These functions can naturally compose in all the ways required for data to flow from a target to a source.

.h4 Making Symmetry
Consider this simple example, a Flow over the schema: 
{code}
{"type": "record", "name":"Foo", "fields":
  [{"type":"int"}]
}
{code}

In the current implementation, a GenericDatumReader has the following API:
{code}
D read(D reuse, Decoder in);
{code}
which internally parses a Schema step by step, recursively calling methods with a similar signature.
When we get to the leaf field, we return an integer, and on return insert that into a GenericData.Record as the first field.
A very similar process occurs with GenericDatumWriter:
{code}
void write(D datum, Encoder out);
{code}
Which traverses a schema, recursively calling methods with a similar signature.
On the way down the schema graph, we access objects and pass portions of the data through, and when we hit the leaf field, we write it to the encoder and return.

Consider the innermost operation for both of the above:
Fetch an integer, then put it somewhere:
|| step || Source || Target || Source op || Target op || flow signature ||
| read an integer | IndexedRecord | Encoder | IndexedRecord.get() | (null) | int access(IndexedRecord) |
| read an integer | Decoder | IndexedRecord | Decoder.readInt() | (null) | int access(Decoder) |
| send integer to output | IndexedRecord | Encoder | (null) | Encoder.writeInt() | Encoder flow(int, Encoder) |
| send integer to output | Decoder | IndexedRecord | (null) | IndexedRecord.put() | IndexedRecord flow(int, IndexedRecord) |

The access and flow signatures compose as follows:
{code}
int access(A);
 FollowedBy
B flow(int, B);
Equals:

B flow(A, B);
{code}

So the above two examples compose to:
|| step || Source || Target || Source op || Target op || flow signature ||
| int flow | IndexedRecord | Encoder | IndexedRecord.get() | Encoder.writeInt() | Encoder flow(IndexedRecord, Encoder) |
| int flow | Decoder | IndexedRecord | Decoder.readInt() | IndexedRecord.put() | IndexedRecord flow(Decoder, IndexedRecord) |

As can be seen, one can compose the following two functions for an integer field, one function provided by the Source, and one function provided by the Target, and produce a Flow of data between them.
The source and target each have their own contexts -- the object types that an integer field represents -- but to not have to know anything about the other side.  The flow composition also does not need any information about the source or target -- they meet only at "int".

> Java: Data Flow Overhaul -- Composition and Symmetry
> ----------------------------------------------------
>
>                 Key: AVRO-859
>                 URL: https://issues.apache.org/jira/browse/AVRO-859
>             Project: Avro
>          Issue Type: New Feature
>          Components: java
>            Reporter: Scott Carey
>            Assignee: Scott Carey
>
> Data flow in Avro is currently broken into two parts:  Read and Write.  These share many common patterns but almost no common code.  
> Additionally, the APIs for this are DatumReader and DatumWriter, which requires that implementations know how to traverse Schemas and use the Resolver.
> This is a proposal to overhaul the inner workings of Avro Java between the Decoder/Encoder APIs and DatumReader/DatumWriter such that there is significantly more code re-use and much greater opportunity for new features that can all share in general optimizations and dynamic code generation.
> The two primary concepts involved are:
> * _*Functional Composition*_
> * _*Symmetry*_
> h4. Functional Composition
> All read and write operations can be broken into functional bits and composed rather than writing monolithic classes.  This allows a "DatumWriter2" to be a graph of functions that pre-compute all state required from a schema rather than traverse a schema for each write.
> h4. Symmetry
> Avro's data flow can be made symmetric.  Rather than thinking in terms of Read and Write, think in terms of:
> * _*Source*_: Where data that is represented by an Avro schema comes from -- this may be a Decoder, or an Object graph.
> * _*Target*_: Where data that represents an Avro schema is sent -- this may be an Encoder or an Object graph.
> (More detail in the comments)

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (AVRO-859) Java: Data Flow Overhaul -- Composition and Symmetry

Posted by "Scott Carey (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/AVRO-859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13065521#comment-13065521 ] 

Scott Carey commented on AVRO-859:
----------------------------------

Another feature that this symmetry brings:

Schema resolution will apply to any operation, not just 'read'.

A resolution applies from a schema 'source' to a schema 'target'.  The source could be an object graph, with the target another object graph or the source could be bytes, and the target bytes.  One could have an object graph representing one schema and write it to bytes in a subset schema (projection on write).

> Java: Data Flow Overhaul -- Composition and Symmetry
> ----------------------------------------------------
>
>                 Key: AVRO-859
>                 URL: https://issues.apache.org/jira/browse/AVRO-859
>             Project: Avro
>          Issue Type: New Feature
>          Components: java
>            Reporter: Scott Carey
>            Assignee: Scott Carey
>
> Data flow in Avro is currently broken into two parts:  Read and Write.  These share many common patterns but almost no common code.  
> Additionally, the APIs for this are DatumReader and DatumWriter, which requires that implementations know how to traverse Schemas and use the Resolver.
> This is a proposal to overhaul the inner workings of Avro Java between the Decoder/Encoder APIs and DatumReader/DatumWriter such that there is significantly more code re-use and much greater opportunity for new features that can all share in general optimizations and dynamic code generation.
> The two primary concepts involved are:
> * _*Functional Composition*_
> * _*Symmetry*_
> h4. Functional Composition
> All read and write operations can be broken into functional bits and composed rather than writing monolithic classes.  This allows a "DatumWriter2" to be a graph of functions that pre-compute all state required from a schema rather than traverse a schema for each write.
> h4. Symmetry
> Avro's data flow can be made symmetric.  Rather than thinking in terms of Read and Write, think in terms of:
> * _*Source*_: Where data that is represented by an Avro schema comes from -- this may be a Decoder, or an Object graph.
> * _*Target*_: Where data that represents an Avro schema is sent -- this may be an Encoder or an Object graph.
> (More detail in the comments)

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (AVRO-859) Java: Data Flow Overhaul -- Composition and Symmetry

Posted by "Douglas Creager (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/AVRO-859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13069101#comment-13069101 ] 

Douglas Creager commented on AVRO-859:
--------------------------------------

One possible issue with the “BinaryDecoder source + JsonEncoder target” example that you give.  I think that this will only work easily when you're not doing any schema resolution.  This gets back to the push-vs-pull thing I mention in my previous comment.  In this example, you can either have the BinaryDecoder control things, and send data into the JsonEncoder as its decoded.  Or you can have the JsonEncoder control things, and pull data from the BinaryDecoder as its needed.  If there's no schema resolution, this works great.  Either (a) the BinaryDecoder reads a value, and because there's no resolution, that's exactly the value that the JsonEncoder will need next; or (b) the JsonEncoder asks for a value, and because there's no schema resolution, that's exactly the value that the BinaryDecoder will expect to read next from the stream.

If you're doing schema resolution, though, the decoder and encoder will be working with different schemas.  And the fields of a record type might be in a different order.  If the decoder is pushing data into the encoder, the encoder will have to buffer things if it receives a field that isn't the next one that it needs to serialize.  And vice versa — if the encoder is pulling data, the decoder might have to deserialize and buffer a bunch of intermediary fields until it gets to the one that was requested by the encoder.

None of this is a deal-breaker, but it highlights that you really want to support both pushing and pulling; ideally in this situation, you'd have the decoder push the data into an in-memory representation (doing the schema resolution there to be able to skip over any fields that will be dropped).  That in-memory representation would be the buffering that you use to get around field reordering.  And then as a separate process, you have the encoder pull data from the in-memory object.  That way each operation gets to be written either as push or pull, whichever is most natural, and without any extra complication.

> Java: Data Flow Overhaul -- Composition and Symmetry
> ----------------------------------------------------
>
>                 Key: AVRO-859
>                 URL: https://issues.apache.org/jira/browse/AVRO-859
>             Project: Avro
>          Issue Type: New Feature
>          Components: java
>            Reporter: Scott Carey
>            Assignee: Scott Carey
>
> Data flow in Avro is currently broken into two parts:  Read and Write.  These share many common patterns but almost no common code.  
> Additionally, the APIs for this are DatumReader and DatumWriter, which requires that implementations know how to traverse Schemas and use the Resolver.
> This is a proposal to overhaul the inner workings of Avro Java between the Decoder/Encoder APIs and DatumReader/DatumWriter such that there is significantly more code re-use and much greater opportunity for new features that can all share in general optimizations and dynamic code generation.
> The two primary concepts involved are:
> * _*Functional Composition*_
> * _*Symmetry*_
> h4. Functional Composition
> All read and write operations can be broken into functional bits and composed rather than writing monolithic classes.  This allows a "DatumWriter2" to be a graph of functions that pre-compute all state required from a schema rather than traverse a schema for each write.
> h4. Symmetry
> Avro's data flow can be made symmetric.  Rather than thinking in terms of Read and Write, think in terms of:
> * _*Source*_: Where data that is represented by an Avro schema comes from -- this may be a Decoder, or an Object graph.
> * _*Target*_: Where data that represents an Avro schema is sent -- this may be an Encoder or an Object graph.
> (More detail in the comments)

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Commented] (AVRO-859) Java: Data Flow Overhaul -- Composition and Symmetry

Posted by "Scott Carey (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/AVRO-859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13069114#comment-13069114 ] 

Scott Carey commented on AVRO-859:
----------------------------------

I really need to look more deeply at the changes you have done in the C implementation.  I started my work thinking in terms of "push" and "pull" and was doing something that sounds similar to your description of the "consumer" form AVRO-762.  The result was an implementation of Writing that was much faster than the current implementation and based on functional composition -- it composed functions with signatures like:
{code}
void send(FROM f, TO t);
{code}
Implementing readers turned out to be more difficult and there was much code duplication and no symmetry.  Requests for features on the user mailing list included things like converting Specific objects to Generic ones, and that got me thinking about splitting read/write up into symmetric components.  This isn't very easy, especially for Maps and Arrays.  Records and Unions aren't so tough, they turn into 'composite' and 'branch' flows fairly easily.

For "push" versus "pull" I have come to the realization that you can mix the two if you define the boundary very carefully and use the "flow" function, which is a mix of both.
"Push" in general is easier, but at the lowest level you must pull and then invert that into a push.  The Access functor has a method on that "thenFlow" to change a pull to a push.
{code}
abstract class Access<FROM, T> {
  T access(FROM f);
  Flow<FROM, NEXT> thenFlow(Flow<T, NEXT> then);
}
{code}

And this is where "source" and "target" meet in most cases -- the FlowFactory takes the Source Access functor, and creates a composite flow from the target flow functor -- the two match because the common type is T, which is determined by the schema node.

For everything but Arrays/Maps I have a working implementation that I'm hoping to submit here soon, but Arrays/Maps (especially maps) have turned out trickier.  I will make them work in a slightly less elegant way (Source implementations will have to loop over their type and trigger callbacks in a special target callback, rather than composing functors).

I wish I could use Scala here... about every 6 lines of Java would reduce to 1 in Scala for the function type definitions.

For schema resolution, everything but record re-ordering will be easy -- requiring simply one more functor or replacing a functor to transform a type, with the addition of "skip" functors for the source.
For record re-ordering I will need a tag type that specifies whether a source or target requires field order or not.  If either side is 'unordered' then it is simple. If both sides require order, a buffer will be required.  This buffer can be generic so that no source or target implementations have to worry about it other than declaring whether they require order, but it is non-trivial. 

Likewise, default values will need some work. In the Java implementation they are handled by storing a Jackson JSON node with the default.  This is not ideal.  It would be beneficial to convert default values to the most efficient representation that a Target would need to insert it when the source does not have the value.

> Java: Data Flow Overhaul -- Composition and Symmetry
> ----------------------------------------------------
>
>                 Key: AVRO-859
>                 URL: https://issues.apache.org/jira/browse/AVRO-859
>             Project: Avro
>          Issue Type: New Feature
>          Components: java
>            Reporter: Scott Carey
>            Assignee: Scott Carey
>
> Data flow in Avro is currently broken into two parts:  Read and Write.  These share many common patterns but almost no common code.  
> Additionally, the APIs for this are DatumReader and DatumWriter, which requires that implementations know how to traverse Schemas and use the Resolver.
> This is a proposal to overhaul the inner workings of Avro Java between the Decoder/Encoder APIs and DatumReader/DatumWriter such that there is significantly more code re-use and much greater opportunity for new features that can all share in general optimizations and dynamic code generation.
> The two primary concepts involved are:
> * _*Functional Composition*_
> * _*Symmetry*_
> h4. Functional Composition
> All read and write operations can be broken into functional bits and composed rather than writing monolithic classes.  This allows a "DatumWriter2" to be a graph of functions that pre-compute all state required from a schema rather than traverse a schema for each write.
> h4. Symmetry
> Avro's data flow can be made symmetric.  Rather than thinking in terms of Read and Write, think in terms of:
> * _*Source*_: Where data that is represented by an Avro schema comes from -- this may be a Decoder, or an Object graph.
> * _*Target*_: Where data that represents an Avro schema is sent -- this may be an Encoder or an Object graph.
> (More detail in the comments)

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira