You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Chris Douglas (JIRA)" <ji...@apache.org> on 2007/10/22 05:59:50 UTC

[jira] Created: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Map-side joins on sorted, equally-partitioned datasets
------------------------------------------------------

                 Key: HADOOP-2085
                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
             Project: Hadoop
          Issue Type: New Feature
          Components: mapred
            Reporter: Chris Douglas
            Assignee: Chris Douglas
             Fix For: 0.16.0


h3. Motivation

Given a set of sorted datasets keyed with the same class and yielding equal
partitions, it is possible to effect a join of those datasets prior to the
map. This could save costs in re-partitioning, sorting, shuffling, and
writing out data required in the general case.

h3. Interface

The attached code offers the following interface to users of these classes.

|| property || required || value ||
| mapred.join.expr | yes | Join expression to effect over input data |
| mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
| mapred.join.define.<ident> | no | Class mapped to identifier in join expression |

The join expression understands the following grammar:
{noformat}
func ::= <ident>([<func>,]*<func>)
func ::= tbl(<class>,"<path>");
{noformat}

Operations included in this patch are partitioned into one of two types:
join operations emitting tuples and "multi-filter" operations emitting a
single value from (but not necessarily included in) a set of input values.
For a given key, each operation will consider the cross product of all
values for all sources at that node.

Identifiers supported by default:

|| identifier || type || description ||
| inner | Join | Full inner join |
| outer | Join | Full outer join |
| override | MultiFilter | For a given key, prefer values from the rightmost source |

A user of this class must set the {{InputFormat}} for the job to
{{CompositeInputFormat}} and define a join expression accepted by the preceding
grammar. For example, both of the following are acceptable:

{noformat}
inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
          "hdfs://host:8020/foo/bar"),
      tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
          "hdfs://host:8020/foo/baz"))

outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
                   "hdfs://host:8020/foo/bar"),
               tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
                   "hdfs://host:8020/foo/baz")),
      tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
          "hdfs://host:8020/foo/rab"))
{noformat}

{{CompositeInputFormat}} includes a handful of convenience methods to aid
construction of these verbose statements.

As in the second example, joins may be nested. Users may provide a
comparator class in the {{mapred.join.keycomparator}} property to
specify the ordering of their keys, or accept the default comparator as
returned by {{WritableComparator.get(keyclass)}}.

Users can specify their own join operations, typically by overriding
{{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
to an identifier in the join expression using the
{{mapred.join.define._ident_}} property, where _ident_ is the identifier
appearing in the join expression. Users may elect to emit- or modify- values
passing through their join operation. Consulting the existing operations for
guidance is recommended. Adding arguments is considerably more complex (and
only partially supported), as one must also add a {{Node}} type to the parse
tree. One is probably better off extending {{RecordReader}} in most cases.

h3. Design

As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
types for the join tree. Delegation satisfies most requirements of the
{{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
Most of the work in this patch concerns {{getRecordReader}}. The
{{CompositeInputFormat}} itself delegates to the parse tree generated by
{{Parser}}.

h4. Hierarchical Joins

Each {{RecordReader}} from the user must be "wrapped", since effecting a
join requires the framework to track the head value from each source. Since
the cross product of all values for each composite level of the join is
emitted to its parent, all sources^1^ must be capable of repeating the
values for the current key. To avoid keeping an excessive number of copies
(one per source per level), each composite requests its children to populate
a {{JoinCollector}} with an iterator over its values. This way, there is
only one copy of the current key for each composite node, the head key-value
pair for each leaf, and storage at each leaf for all the values matching the
current key at the parent collector (if it is currently participating in a
join at the root). Strategies have been employed to avoid excessive copying
when filling a user-provided {{Writable}}, but they have been conservative
(e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
the user modifies the value returned, possibly changing the state of a
{{JoinCollector}} in the tree). For example, if the following sources
contain these key streams:

{noformat}
A: 0  0   1    1     2        ...
B: 1  1   1    1     2        ...
C: 1  6   21   107   ...
D: 6  28  496  8128  33550336 ...
{noformat}

Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
the root is 1 the tree may look like this:

{noformat}

            x (1, [ I(A), [ I(y) ] ] )
          /   \
         W     y (1, [ I(B), I(C), EMPTY ])
         |   / | \
         |  W  W  W
         |  |  |  D (6, V~6~) => EMPTY
         |  |  C => (6, V~6~) => V~1.1~ @1.1
         |  B => (2, V~2~)    => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
         A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
{noformat}

A {{JoinCollector}} from _x_ will have been created by requesting an
iterator from _A_ and another from _y_. The iterator at _y_ is built by
requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
key 1, it returns an empty iterator. Since the value to return for a given
join is a {{Writable}} provided by the user, the iterators returned are also
responsible for writing the next value in that stream. For multilevel joins
passing through a subclass of {{JoinRecordReader}}, the value produced will
contain tuples within tuples; iterators for composites delegate to
sub-iterators responsible for filling the value in the tuple at the position
matching their position in the composite. In a sense, the only iterators
that write to a tuple are the {{RecordReader}}s at the leaves. Note that
this also implies that emitted tuples may not contain values from each
source, but they will always have the same capacity.

h4. Writables

{{Writable}} objects- including {{InputSplit}}s and {{TupleWritable}}s-
encode themselves in the following format:

{noformat}
<count><class1><class2>...<classn><obj1><obj2>...<objn>
{noformat}

The inefficiency is regrettable- particularly since this overhead is
incurred for every instance and most often the tuples emitted will be
processed only within the map- but the encoding satisfies the {{Writable}}
contract well enough to be emitted to the reducer, written to disk, etc. It
is hoped that general compression will trim the most egregious waste. It
should be noted that the framework does not actually write out a tuple (i.e.
does not suffer for this deficiency) unless emitting one from
{{MultiFilterRecordReader}} (a rare case in practice, it is hoped).

h4. Extensibility

The join framework is modestly extensible. Practically, users seeking to add
their own identifiers to join expressions are limited to extending
{{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
latitude within these constraints, as illustrated in
{{OverrideRecordReader}}, where values in child {{RecordReader}}s are
skipped instead of incurring the overhead of building the iterator (that
will inevitably be discarded).^2^ For most cases, the user need only
implement the combine and/or emit methods in their subclass. It is expected
that most will find that the three default operations will suffice.

Adding arguments to expressions is more difficult. One would need to include
a {{Node}} type for the parser, which requires some knowledge of its inner
workings. The model in this area is crude and requires refinement before it
can be "extensible" by a reasonable definition.

h3. Performance

I have no numbers.

Notes

1. This isn't strictly true. The "leftmost" source will never need to repeat
itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
a welcome addition.

2. Note that- even if reset- the override will only loop through the values
in the rightmost key, instead of repeating that series a number of times
equal to the cardinality of the cross product of the discarded streams
(regrettably, looking at the code of {{OverrideRecordReader}} is more
illustrative than this explanation).


-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Tsz Wo (Nicholas), SZE (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12537188 ] 

Tsz Wo (Nicholas), SZE commented on HADOOP-2085:
------------------------------------------------

By assuming certain properties of the input datasets, the join operation might be performed more efficiently.  I think this is an interesting observation.

I tried to read your patch but still cannot fully understand it.  It would be great if you can give an example (like WordCount) to show how to use the new codes.

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Status: Patch Available  (was: Open)

Trying hudson again

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085-5.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Status: Open  (was: Patch Available)

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Status: Patch Available  (was: Open)

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085-5.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Attachment: 2085-3.patch

More findbugs, added an example

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Status: Patch Available  (was: Open)

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Attachment: 2085-5.patch

Addressed NPE and included interface info from this JIRA in o/a/h/mapred/join/package.html

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085-5.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Status: Patch Available  (was: Open)

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C => (6, V~6~) => V~1.1~ @1.1
>          |  B => (2, V~2~)    => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}}s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}}s and {{TupleWritable}}s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}}s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Hadoop QA (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12543318 ] 

Hadoop QA commented on HADOOP-2085:
-----------------------------------

+1 overall.  Here are the results of testing the latest attachment 
http://issues.apache.org/jira/secure/attachment/12369694/2085-5.patch
against trunk revision r595563.

    @author +1.  The patch does not contain any @author tags.

    javadoc +1.  The javadoc tool did not generate any warning messages.

    javac +1.  The applied patch does not generate any new compiler warnings.

    findbugs +1.  The patch does not introduce any new Findbugs warnings.

    core tests +1.  The patch passed core unit tests.

    contrib tests +1.  The patch passed contrib unit tests.

Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1116/testReport/
Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1116/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1116/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1116/console

This message is automatically generated.

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085-5.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Hadoop QA (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12541217 ] 

Hadoop QA commented on HADOOP-2085:
-----------------------------------

+1 overall.  Here are the results of testing the latest attachment 
http://issues.apache.org/jira/secure/attachment/12369206/2085-4.patch
against trunk revision r592860.

    @author +1.  The patch does not contain any @author tags.

    javadoc +1.  The javadoc tool did not generate any warning messages.

    javac +1.  The applied patch does not generate any new compiler warnings.

    findbugs +1.  The patch does not introduce any new Findbugs warnings.

    core tests +1.  The patch passed core unit tests.

    contrib tests +1.  The patch passed contrib unit tests.

Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1077/testReport/
Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1077/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1077/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1077/console

This message is automatically generated.

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Attachment: 2085.patch

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C => (6, V~6~) => V~1.1~ @1.1
>          |  B => (2, V~2~)    => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}}s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}}s and {{TupleWritable}}s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}}s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Status: Open  (was: Patch Available)

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085-5.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Hadoop QA (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12538406 ] 

Hadoop QA commented on HADOOP-2085:
-----------------------------------

-1 overall.  Here are the results of testing the latest attachment 
http://issues.apache.org/jira/secure/attachment/12368550/2085-3.patch
against trunk revision r588778.

    @author +1.  The patch does not contain any @author tags.

    javadoc +1.  The javadoc tool did not generate any warning messages.

    javac +1.  The applied patch does not generate any new compiler warnings.

    findbugs +1.  The patch does not introduce any new Findbugs warnings.

    core tests -1.  The patch failed core unit tests.

    contrib tests -1.  The patch failed contrib unit tests.

Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1019/testReport/
Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1019/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1019/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1019/console

This message is automatically generated.

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Status: Open  (was: Patch Available)

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Hadoop QA (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12543264 ] 

Hadoop QA commented on HADOOP-2085:
-----------------------------------

-1 overall.  Here are the results of testing the latest attachment 
http://issues.apache.org/jira/secure/attachment/12369694/2085-5.patch
against trunk revision r595563.

    @author +1.  The patch does not contain any @author tags.

    javadoc +1.  The javadoc tool did not generate any warning messages.

    javac +1.  The applied patch does not generate any new compiler warnings.

    findbugs +1.  The patch does not introduce any new Findbugs warnings.

    core tests +1.  The patch passed core unit tests.

    contrib tests -1.  The patch failed contrib unit tests.

Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1110/testReport/
Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1110/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1110/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1110/console

This message is automatically generated.

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085-5.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12540302 ] 

Chris Douglas commented on HADOOP-2085:
---------------------------------------

Joydeep-

The assumption it makes is precisely as you describe it: the ith split from each source must contain the same keys. It does only the most rudimentary verification of this, IIRC verifying that it received an equal number of splits from each source. Generally, getting splits should be cheap, so it doesn't verify key ranges for any of the splits (and probably ought not to).

I've asked around, and "the way" out of this onerous constraint involves using MapFiles. At a high level, you need an index for your input data so your splits can be informed. I'm not familiar with the details here, I'm afraid.

CompositeInputSplit::getLocations() returns an unweighted union of hosts from its child splits. It would be preferable to weight a host that contains multiple splits for a given composite split, but for now it provides a flat list.

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Description: 
h3. Motivation

Given a set of sorted datasets keyed with the same class and yielding equal
partitions, it is possible to effect a join of those datasets prior to the
map. This could save costs in re-partitioning, sorting, shuffling, and
writing out data required in the general case.

h3. Interface

The attached code offers the following interface to users of these classes.

|| property || required || value ||
| mapred.join.expr | yes | Join expression to effect over input data |
| mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
| mapred.join.define.<ident> | no | Class mapped to identifier in join expression |

The join expression understands the following grammar:
{noformat}
func ::= <ident>([<func>,]*<func>)
func ::= tbl(<class>,"<path>");
{noformat}

Operations included in this patch are partitioned into one of two types:
join operations emitting tuples and "multi-filter" operations emitting a
single value from (but not necessarily included in) a set of input values.
For a given key, each operation will consider the cross product of all
values for all sources at that node.

Identifiers supported by default:

|| identifier || type || description ||
| inner | Join | Full inner join |
| outer | Join | Full outer join |
| override | MultiFilter | For a given key, prefer values from the rightmost source |

A user of this class must set the {{InputFormat}} for the job to
{{CompositeInputFormat}} and define a join expression accepted by the preceding
grammar. For example, both of the following are acceptable:

{noformat}
inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
          "hdfs://host:8020/foo/bar"),
      tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
          "hdfs://host:8020/foo/baz"))

outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
                   "hdfs://host:8020/foo/bar"),
               tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
                   "hdfs://host:8020/foo/baz")),
      tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
          "hdfs://host:8020/foo/rab"))
{noformat}

{{CompositeInputFormat}} includes a handful of convenience methods to aid
construction of these verbose statements.

As in the second example, joins may be nested. Users may provide a
comparator class in the {{mapred.join.keycomparator}} property to
specify the ordering of their keys, or accept the default comparator as
returned by {{WritableComparator.get(keyclass)}}.

Users can specify their own join operations, typically by overriding
{{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
to an identifier in the join expression using the
{{mapred.join.define._ident_}} property, where _ident_ is the identifier
appearing in the join expression. Users may elect to emit- or modify- values
passing through their join operation. Consulting the existing operations for
guidance is recommended. Adding arguments is considerably more complex (and
only partially supported), as one must also add a {{Node}} type to the parse
tree. One is probably better off extending {{RecordReader}} in most cases.

h3. Design

As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
types for the join tree. Delegation satisfies most requirements of the
{{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
Most of the work in this patch concerns {{getRecordReader}}. The
{{CompositeInputFormat}} itself delegates to the parse tree generated by
{{Parser}}.

h4. Hierarchical Joins

Each {{RecordReader}} from the user must be "wrapped", since effecting a
join requires the framework to track the head value from each source. Since
the cross product of all values for each composite level of the join is
emitted to its parent, all sources ^1^ must be capable of repeating the
values for the current key. To avoid keeping an excessive number of copies
(one per source per level), each composite requests its children to populate
a {{JoinCollector}} with an iterator over its values. This way, there is
only one copy of the current key for each composite node, the head key-value
pair for each leaf, and storage at each leaf for all the values matching the
current key at the parent collector (if it is currently participating in a
join at the root). Strategies have been employed to avoid excessive copying
when filling a user-provided {{Writable}}, but they have been conservative
(e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
the user modifies the value returned, possibly changing the state of a
{{JoinCollector}} in the tree). For example, if the following sources
contain these key streams:

{noformat}
A: 0  0   1    1     2        ...
B: 1  1   1    1     2        ...
C: 1  6   21   107   ...
D: 6  28  496  8128  33550336 ...
{noformat}

Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
the root is 1 the tree may look like this:

{noformat}

            x (1, [ I(A), [ I(y) ] ] )
          /   \
         W     y (1, [ I(B), I(C), EMPTY ])
         |   / | \
         |  W  W  W
         |  |  |  D (6, V~6~) => EMPTY
         |  |  C (6, V~6~)    => V~1.1~ @1.1
         |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
         A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
{noformat}

A {{JoinCollector}} from _x_ will have been created by requesting an
iterator from _A_ and another from _y_. The iterator at _y_ is built by
requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
key 1, it returns an empty iterator. Since the value to return for a given
join is a {{Writable}} provided by the user, the iterators returned are also
responsible for writing the next value in that stream. For multilevel joins
passing through a subclass of {{JoinRecordReader}}, the value produced will
contain tuples within tuples; iterators for composites delegate to
sub-iterators responsible for filling the value in the tuple at the position
matching their position in the composite. In a sense, the only iterators
that write to a tuple are the {{RecordReader}} s at the leaves. Note that
this also implies that emitted tuples may not contain values from each
source, but they will always have the same capacity.

h4. Writables

{{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
encode themselves in the following format:

{noformat}
<count><class1><class2>...<classn><obj1><obj2>...<objn>
{noformat}

The inefficiency is regrettable- particularly since this overhead is
incurred for every instance and most often the tuples emitted will be
processed only within the map- but the encoding satisfies the {{Writable}}
contract well enough to be emitted to the reducer, written to disk, etc. It
is hoped that general compression will trim the most egregious waste. It
should be noted that the framework does not actually write out a tuple (i.e.
does not suffer for this deficiency) unless emitting one from
{{MultiFilterRecordReader}} (a rare case in practice, it is hoped).

h4. Extensibility

The join framework is modestly extensible. Practically, users seeking to add
their own identifiers to join expressions are limited to extending
{{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
latitude within these constraints, as illustrated in
{{OverrideRecordReader}}, where values in child {{RecordReader}} s are
skipped instead of incurring the overhead of building the iterator (that
will inevitably be discarded).^2^ For most cases, the user need only
implement the combine and/or emit methods in their subclass. It is expected
that most will find that the three default operations will suffice.

Adding arguments to expressions is more difficult. One would need to include
a {{Node}} type for the parser, which requires some knowledge of its inner
workings. The model in this area is crude and requires refinement before it
can be "extensible" by a reasonable definition.

h3. Performance

I have no numbers.

Notes

1. This isn't strictly true. The "leftmost" source will never need to repeat
itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
a welcome addition.

2. Note that- even if reset- the override will only loop through the values
in the rightmost key, instead of repeating that series a number of times
equal to the cardinality of the cross product of the discarded streams
(regrettably, looking at the code of {{OverrideRecordReader}} is more
illustrative than this explanation).


  was:
h3. Motivation

Given a set of sorted datasets keyed with the same class and yielding equal
partitions, it is possible to effect a join of those datasets prior to the
map. This could save costs in re-partitioning, sorting, shuffling, and
writing out data required in the general case.

h3. Interface

The attached code offers the following interface to users of these classes.

|| property || required || value ||
| mapred.join.expr | yes | Join expression to effect over input data |
| mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
| mapred.join.define.<ident> | no | Class mapped to identifier in join expression |

The join expression understands the following grammar:
{noformat}
func ::= <ident>([<func>,]*<func>)
func ::= tbl(<class>,"<path>");
{noformat}

Operations included in this patch are partitioned into one of two types:
join operations emitting tuples and "multi-filter" operations emitting a
single value from (but not necessarily included in) a set of input values.
For a given key, each operation will consider the cross product of all
values for all sources at that node.

Identifiers supported by default:

|| identifier || type || description ||
| inner | Join | Full inner join |
| outer | Join | Full outer join |
| override | MultiFilter | For a given key, prefer values from the rightmost source |

A user of this class must set the {{InputFormat}} for the job to
{{CompositeInputFormat}} and define a join expression accepted by the preceding
grammar. For example, both of the following are acceptable:

{noformat}
inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
          "hdfs://host:8020/foo/bar"),
      tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
          "hdfs://host:8020/foo/baz"))

outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
                   "hdfs://host:8020/foo/bar"),
               tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
                   "hdfs://host:8020/foo/baz")),
      tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
          "hdfs://host:8020/foo/rab"))
{noformat}

{{CompositeInputFormat}} includes a handful of convenience methods to aid
construction of these verbose statements.

As in the second example, joins may be nested. Users may provide a
comparator class in the {{mapred.join.keycomparator}} property to
specify the ordering of their keys, or accept the default comparator as
returned by {{WritableComparator.get(keyclass)}}.

Users can specify their own join operations, typically by overriding
{{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
to an identifier in the join expression using the
{{mapred.join.define._ident_}} property, where _ident_ is the identifier
appearing in the join expression. Users may elect to emit- or modify- values
passing through their join operation. Consulting the existing operations for
guidance is recommended. Adding arguments is considerably more complex (and
only partially supported), as one must also add a {{Node}} type to the parse
tree. One is probably better off extending {{RecordReader}} in most cases.

h3. Design

As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
types for the join tree. Delegation satisfies most requirements of the
{{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
Most of the work in this patch concerns {{getRecordReader}}. The
{{CompositeInputFormat}} itself delegates to the parse tree generated by
{{Parser}}.

h4. Hierarchical Joins

Each {{RecordReader}} from the user must be "wrapped", since effecting a
join requires the framework to track the head value from each source. Since
the cross product of all values for each composite level of the join is
emitted to its parent, all sources^1^ must be capable of repeating the
values for the current key. To avoid keeping an excessive number of copies
(one per source per level), each composite requests its children to populate
a {{JoinCollector}} with an iterator over its values. This way, there is
only one copy of the current key for each composite node, the head key-value
pair for each leaf, and storage at each leaf for all the values matching the
current key at the parent collector (if it is currently participating in a
join at the root). Strategies have been employed to avoid excessive copying
when filling a user-provided {{Writable}}, but they have been conservative
(e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
the user modifies the value returned, possibly changing the state of a
{{JoinCollector}} in the tree). For example, if the following sources
contain these key streams:

{noformat}
A: 0  0   1    1     2        ...
B: 1  1   1    1     2        ...
C: 1  6   21   107   ...
D: 6  28  496  8128  33550336 ...
{noformat}

Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
the root is 1 the tree may look like this:

{noformat}

            x (1, [ I(A), [ I(y) ] ] )
          /   \
         W     y (1, [ I(B), I(C), EMPTY ])
         |   / | \
         |  W  W  W
         |  |  |  D (6, V~6~) => EMPTY
         |  |  C => (6, V~6~) => V~1.1~ @1.1
         |  B => (2, V~2~)    => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
         A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
{noformat}

A {{JoinCollector}} from _x_ will have been created by requesting an
iterator from _A_ and another from _y_. The iterator at _y_ is built by
requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
key 1, it returns an empty iterator. Since the value to return for a given
join is a {{Writable}} provided by the user, the iterators returned are also
responsible for writing the next value in that stream. For multilevel joins
passing through a subclass of {{JoinRecordReader}}, the value produced will
contain tuples within tuples; iterators for composites delegate to
sub-iterators responsible for filling the value in the tuple at the position
matching their position in the composite. In a sense, the only iterators
that write to a tuple are the {{RecordReader}}s at the leaves. Note that
this also implies that emitted tuples may not contain values from each
source, but they will always have the same capacity.

h4. Writables

{{Writable}} objects- including {{InputSplit}}s and {{TupleWritable}}s-
encode themselves in the following format:

{noformat}
<count><class1><class2>...<classn><obj1><obj2>...<objn>
{noformat}

The inefficiency is regrettable- particularly since this overhead is
incurred for every instance and most often the tuples emitted will be
processed only within the map- but the encoding satisfies the {{Writable}}
contract well enough to be emitted to the reducer, written to disk, etc. It
is hoped that general compression will trim the most egregious waste. It
should be noted that the framework does not actually write out a tuple (i.e.
does not suffer for this deficiency) unless emitting one from
{{MultiFilterRecordReader}} (a rare case in practice, it is hoped).

h4. Extensibility

The join framework is modestly extensible. Practically, users seeking to add
their own identifiers to join expressions are limited to extending
{{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
latitude within these constraints, as illustrated in
{{OverrideRecordReader}}, where values in child {{RecordReader}}s are
skipped instead of incurring the overhead of building the iterator (that
will inevitably be discarded).^2^ For most cases, the user need only
implement the combine and/or emit methods in their subclass. It is expected
that most will find that the three default operations will suffice.

Adding arguments to expressions is more difficult. One would need to include
a {{Node}} type for the parser, which requires some knowledge of its inner
workings. The model in this area is crude and requires refinement before it
can be "extensible" by a reasonable definition.

h3. Performance

I have no numbers.

Notes

1. This isn't strictly true. The "leftmost" source will never need to repeat
itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
a welcome addition.

2. Note that- even if reset- the override will only loop through the values
in the rightmost key, instead of repeating that series a number of times
equal to the cardinality of the cross product of the discarded streams
(regrettably, looking at the code of {{OverrideRecordReader}} is more
illustrative than this explanation).



> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Hadoop QA (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12538122 ] 

Hadoop QA commented on HADOOP-2085:
-----------------------------------

-1 overall.  Here are the results of testing the latest attachment 
http://issues.apache.org/jira/secure/attachment/12368513/2085-2.patch
against trunk revision r588778.

    @author +1.  The patch does not contain any @author tags.

    javadoc +1.  The javadoc tool did not generate any warning messages.

    javac +1.  The applied patch does not generate any new compiler warnings.

    findbugs -1.  The patch appears to introduce 3 new Findbugs warnings.

    core tests -1.  The patch failed core unit tests.

    contrib tests +1.  The patch passed contrib unit tests.

Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1012/testReport/
Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1012/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1012/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1012/console

This message is automatically generated.

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Hudson (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12549697 ] 

Hudson commented on HADOOP-2085:
--------------------------------

Integrated in Hadoop-Nightly #326 (See [http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Nightly/326/])

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085-5.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Hadoop QA (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12537868 ] 

Hadoop QA commented on HADOOP-2085:
-----------------------------------

-1 overall.  Here are the results of testing the latest attachment 
http://issues.apache.org/jira/secure/attachment/12368110/2085.patch
against trunk revision r588341.

    @author +1.  The patch does not contain any @author tags.

    javadoc -1.  The javadoc tool appears to have generated  messages.

    javac +1.  The applied patch does not generate any new compiler warnings.

    findbugs -1.  The patch appears to introduce 4 new Findbugs warnings.

    core tests +1.  The patch passed core unit tests.

    contrib tests -1.  The patch failed contrib unit tests.

Test results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1003/testReport/
Findbugs warnings: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1003/artifact/trunk/build/test/findbugs/newPatchFindbugsWarnings.html
Checkstyle results: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1003/artifact/trunk/build/test/checkstyle-errors.html
Console output: http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch/1003/console

This message is automatically generated.

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Status: Open  (was: Patch Available)

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085-5.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Status: Patch Available  (was: Open)

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Owen O'Malley (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Owen O'Malley updated HADOOP-2085:
----------------------------------

    Status: Open  (was: Patch Available)

The JavaDoc for TupleWritable's class description isn't right. (It wasn't updated.)

The IOExceptions that are wrapping other errors should have descriptive string messages in them.

You don't need to define a new ReflectionUtils.newInstance without the config, because if you pass in null, it won't use it.

All of the instances the use cls.newInstance should be using ReflectionUtils.newInstance, since it does the constructor cache and handles the non-public class/constructor problems.

The fieldnames m and n need descriptive names.

I really don't like protected fields, especially when they are set/used multiple levels below where they are defined.


> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Milind Bhandarkar (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12542872 ] 

Milind Bhandarkar commented on HADOOP-2085:
-------------------------------------------

A few comments on the patch:

If mapred.join.expr is not specified, CompositeInputFormat should throw a better exception,, rather than NPE.

A simple benchmark that we can use to compare performance with with the reducer-side joins is desirable. But, it can be a separate jira.

The motivation and design that is in this jira should be in package.html for o.a.h.mapred.join.


> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Status: Patch Available  (was: Open)

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12540319 ] 

Chris Douglas commented on HADOOP-2085:
---------------------------------------

I don't know if this is helpful, but: as it exists now, the framework is incapable of finer granularity than an InputFormat, but neither will it object whatever you can fit into that framework.

What you describe- directories as pseudo-tables with files as partitions- sounds like exactly what this is geared toward.

As an example of a workaround/partial fit, consider your 16/32 way case. Whether it would be worthwhile/possible to express in the existing code will depend on a few factors: if the two files you're joining in the 32-way set are pairwise disjoint, then you can simply use an OverrideRecordReader with two custom InputFormats (each taking one "half" of the pair) to "join" them. However, if they're not disjoint, then you'll lose values. ^1^ Feeding the output of that into a join with your 16 way dataset might work, but it's a bit of a hack. You'd need to be certain of the partitions of both datasets to be confident in your results.



Notes
1. Really, you're looking for a different implementation of CompositeRecordReader::JoinCollector that emits values from each source in turn, rather than emitting the cross-product; this is being considered, but may not be in the immediate future. It's of limited use with the requirement that each source be sorted an partitioned in the same way, unfortunately. Most simply want to merge two sorted datasets without worrying about how they're partitioned (HADOOP-2120).

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Owen O'Malley (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Owen O'Malley updated HADOOP-2085:
----------------------------------

    Resolution: Fixed
        Status: Resolved  (was: Patch Available)

I just committed this. Thanks, Chris.

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085-5.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Attachment: 2085-2.patch

Fixed findbugs warnings, addressed javadoc, changed Token type to accommodate Nicholas's feedback.

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Chris Douglas (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Douglas updated HADOOP-2085:
----------------------------------

    Attachment: 2085-4.patch

Updated javadocs, made better use of ReflectionUtils, improved some variable names, made protected fields private or final (excluding the parser, which is temporary).

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085-4.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Joydeep Sen Sarma (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12540317 ] 

Joydeep Sen Sarma commented on HADOOP-2085:
-------------------------------------------

understood. i was thinking this might be using mapfiles or some kind of binary search to line up splits.

Dumb question - our data is laid out as files (representing partitions) within a single directory - with a directory representing a pseudo-table. Is this compatible with where u are going? ie. - can i represent join one such directory against another - with (say) an inputformat that emits each file as a split (and making sure the order is the same)?

The other case is that sometimes one dataset is partitioned (say) 16 way - but another is partitioned 32 way. This can happen when datasets are of unequal size (otherwise we end up creating too many files). In the above case, 2 files from the latter dataset have to be joined against each file from the former (assuming simple modulo arithmetic partitioning). would this be possible?



> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-2085) Map-side joins on sorted, equally-partitioned datasets

Posted by "Joydeep Sen Sarma (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12540295 ] 

Joydeep Sen Sarma commented on HADOOP-2085:
-------------------------------------------

Chris - can you help me understand how the splits work? This might be useful for some of our apps and trying to understand what assumptions etc. are being made.

we have sorted data files containing same sets of keys - but corresponding hdfs chunks of each file may not have the same set of keys. it wasnt clear to me from going through the patch how the merge-join is being parallelized. from node.getsplits - it seemed as if the ith split of the join record reader is composed of the ith split of each of the component files. but in this case - the join keys wouldn't line up ..

also - given that the map task works on multiple hdfs files - where does it get scheduled?

> Map-side joins on sorted, equally-partitioned datasets
> ------------------------------------------------------
>
>                 Key: HADOOP-2085
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2085
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Chris Douglas
>            Assignee: Chris Douglas
>             Fix For: 0.16.0
>
>         Attachments: 2085-2.patch, 2085-3.patch, 2085.patch
>
>
> h3. Motivation
> Given a set of sorted datasets keyed with the same class and yielding equal
> partitions, it is possible to effect a join of those datasets prior to the
> map. This could save costs in re-partitioning, sorting, shuffling, and
> writing out data required in the general case.
> h3. Interface
> The attached code offers the following interface to users of these classes.
> || property || required || value ||
> | mapred.join.expr | yes | Join expression to effect over input data |
> | mapred.join.keycomparator | no | {{WritableComparator}} class to use for comparing keys |
> | mapred.join.define.<ident> | no | Class mapped to identifier in join expression |
> The join expression understands the following grammar:
> {noformat}
> func ::= <ident>([<func>,]*<func>)
> func ::= tbl(<class>,"<path>");
> {noformat}
> Operations included in this patch are partitioned into one of two types:
> join operations emitting tuples and "multi-filter" operations emitting a
> single value from (but not necessarily included in) a set of input values.
> For a given key, each operation will consider the cross product of all
> values for all sources at that node.
> Identifiers supported by default:
> || identifier || type || description ||
> | inner | Join | Full inner join |
> | outer | Join | Full outer join |
> | override | MultiFilter | For a given key, prefer values from the rightmost source |
> A user of this class must set the {{InputFormat}} for the job to
> {{CompositeInputFormat}} and define a join expression accepted by the preceding
> grammar. For example, both of the following are acceptable:
> {noformat}
> inner(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/bar"),
>       tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/baz"))
> outer(override(tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/bar"),
>                tbl(org.apache.hadoop.mapred.SequenceFileInputFormat.class,
>                    "hdfs://host:8020/foo/baz")),
>       tbl(org.apache.hadoop.mapred/SequenceFileInputFormat.class,
>           "hdfs://host:8020/foo/rab"))
> {noformat}
> {{CompositeInputFormat}} includes a handful of convenience methods to aid
> construction of these verbose statements.
> As in the second example, joins may be nested. Users may provide a
> comparator class in the {{mapred.join.keycomparator}} property to
> specify the ordering of their keys, or accept the default comparator as
> returned by {{WritableComparator.get(keyclass)}}.
> Users can specify their own join operations, typically by overriding
> {{JoinRecordReader}} or {{MultiFilterRecordReader}} and mapping that class
> to an identifier in the join expression using the
> {{mapred.join.define._ident_}} property, where _ident_ is the identifier
> appearing in the join expression. Users may elect to emit- or modify- values
> passing through their join operation. Consulting the existing operations for
> guidance is recommended. Adding arguments is considerably more complex (and
> only partially supported), as one must also add a {{Node}} type to the parse
> tree. One is probably better off extending {{RecordReader}} in most cases.
> h3. Design
> As alluded to above, the design defines inner (Composite) and leaf (Wrapped)
> types for the join tree. Delegation satisfies most requirements of the
> {{InputFormat}} contract, particularly {{validateInput}} and {{getSplits}}.
> Most of the work in this patch concerns {{getRecordReader}}. The
> {{CompositeInputFormat}} itself delegates to the parse tree generated by
> {{Parser}}.
> h4. Hierarchical Joins
> Each {{RecordReader}} from the user must be "wrapped", since effecting a
> join requires the framework to track the head value from each source. Since
> the cross product of all values for each composite level of the join is
> emitted to its parent, all sources ^1^ must be capable of repeating the
> values for the current key. To avoid keeping an excessive number of copies
> (one per source per level), each composite requests its children to populate
> a {{JoinCollector}} with an iterator over its values. This way, there is
> only one copy of the current key for each composite node, the head key-value
> pair for each leaf, and storage at each leaf for all the values matching the
> current key at the parent collector (if it is currently participating in a
> join at the root). Strategies have been employed to avoid excessive copying
> when filling a user-provided {{Writable}}, but they have been conservative
> (e.g. in {{MultiFilterRecordReader}}, the value emitted is cloned in case
> the user modifies the value returned, possibly changing the state of a
> {{JoinCollector}} in the tree). For example, if the following sources
> contain these key streams:
> {noformat}
> A: 0  0   1    1     2        ...
> B: 1  1   1    1     2        ...
> C: 1  6   21   107   ...
> D: 6  28  496  8128  33550336 ...
> {noformat}
> Let _A-D_ be wrapped sources and _x,y_ be composite operations. If the
> expression is of the form {{x(A, y(B,C,D))}}, then when the current key at
> the root is 1 the tree may look like this:
> {noformat}
>             x (1, [ I(A), [ I(y) ] ] )
>           /   \
>          W     y (1, [ I(B), I(C), EMPTY ])
>          |   / | \
>          |  W  W  W
>          |  |  |  D (6, V~6~) => EMPTY
>          |  |  C (6, V~6~)    => V~1.1~ @1.1
>          |  B (2, V~2~)       => V~1,1~ V~1,2~ V~1,3~ V~1,4~ @1,3
>          A (2, V~2~)          => V~1,1~ V~1,2~ @1,2
> {noformat}
> A {{JoinCollector}} from _x_ will have been created by requesting an
> iterator from _A_ and another from _y_. The iterator at _y_ is built by
> requesting iterators from _B_, _C_, and _D_. Since _D_ doesn't contain the
> key 1, it returns an empty iterator. Since the value to return for a given
> join is a {{Writable}} provided by the user, the iterators returned are also
> responsible for writing the next value in that stream. For multilevel joins
> passing through a subclass of {{JoinRecordReader}}, the value produced will
> contain tuples within tuples; iterators for composites delegate to
> sub-iterators responsible for filling the value in the tuple at the position
> matching their position in the composite. In a sense, the only iterators
> that write to a tuple are the {{RecordReader}} s at the leaves. Note that
> this also implies that emitted tuples may not contain values from each
> source, but they will always have the same capacity.
> h4. Writables
> {{Writable}} objects- including {{InputSplit}} s and {{TupleWritable}} s-
> encode themselves in the following format:
> {noformat}
> <count><class1><class2>...<classn><obj1><obj2>...<objn>
> {noformat}
> The inefficiency is regrettable- particularly since this overhead is
> incurred for every instance and most often the tuples emitted will be
> processed only within the map- but the encoding satisfies the {{Writable}}
> contract well enough to be emitted to the reducer, written to disk, etc. It
> is hoped that general compression will trim the most egregious waste. It
> should be noted that the framework does not actually write out a tuple (i.e.
> does not suffer for this deficiency) unless emitting one from
> {{MultiFilterRecordReader}} (a rare case in practice, it is hoped).
> h4. Extensibility
> The join framework is modestly extensible. Practically, users seeking to add
> their own identifiers to join expressions are limited to extending
> {{JoinRecordReader}} and {{MultiFilterRecordReader}}. There is considerable
> latitude within these constraints, as illustrated in
> {{OverrideRecordReader}}, where values in child {{RecordReader}} s are
> skipped instead of incurring the overhead of building the iterator (that
> will inevitably be discarded).^2^ For most cases, the user need only
> implement the combine and/or emit methods in their subclass. It is expected
> that most will find that the three default operations will suffice.
> Adding arguments to expressions is more difficult. One would need to include
> a {{Node}} type for the parser, which requires some knowledge of its inner
> workings. The model in this area is crude and requires refinement before it
> can be "extensible" by a reasonable definition.
> h3. Performance
> I have no numbers.
> Notes
> 1. This isn't strictly true. The "leftmost" source will never need to repeat
> itself. Adding a pseudo-{{ResettableIterator}} to handle this case would be
> a welcome addition.
> 2. Note that- even if reset- the override will only loop through the values
> in the rightmost key, instead of repeating that series a number of times
> equal to the cardinality of the cross product of the discarded streams
> (regrettably, looking at the code of {{OverrideRecordReader}} is more
> illustrative than this explanation).

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.