You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by Apache Wiki <wi...@apache.org> on 2009/02/07 05:23:04 UTC

[Pig Wiki] Update of "PigMultiQueryPerformanceSpecification" by GuntherHagleitner

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The following page has been changed by GuntherHagleitner:
http://wiki.apache.org/pig/PigMultiQueryPerformanceSpecification

New page:
[[Anchor(Multi-query_Performance)]]
= Multi-query Performance =

Currently scripts with multiple store commands can result in a lot of duplicated work. The idea how to avoid the duplication is described here: https://issues.apache.org/jira/browse/PIG-627

[[Anchor(External)]]
== External ==

[[Anchor(Use_cases:)]]
=== Use cases: ===

[[Anchor(Explicit/implicit_split:)]]
==== Explicit/implicit split: ====

There might be cases in which you want to different processing on separate parts of the same datastream. Like so:

{{{
A = load ...
...
split A' into B if ..., C if ...
...
store B' ...
store C' ...
}}}

or

{{{
A=load ...
...
B=filter A' ...
C=filter A' ...
...
store B' ...
store C' ...
}}}

In the current system the first example will dump A' to disk and then start jobs for B' and C'. In the second example Pig will execute all the dependencies of B' and store it. And then execute all the dependencies of C' and store it.

Both of the above are equivalent, but the performance will be different. 

Here's what we plan to do to increase the performance:

   * In the second case we will add an implicit split to transform the query to case number one. That will eliminate the processing of A' multiple times.
   * Make the split non-blocking and allow processing to continue. This will help reduce the amount of data that has to be stored right at the split.
   * Allow multiple outputs from a job. This way we can store some results as a side-effect. This is also necessary to make the previous item work.
   * Allow multiple split branches to be carried on to the combiner/reducer. This will reduce the amount of IO again in the case where multiple branches in the split can benefit from a combiner run.

[[Anchor(Storing_intermediate_results)]]
==== Storing intermediate results ====

Sometimes people will store intermediate results.

{{{
A=load ...
...
store A'
...
store A''
}}}

If the script doesn't re-load A' for the processing of A'' the steps above A' will be duplicated. This is basically a special case of Number 2 above, so the same steps are recommended. With the proposed changes the script will basically process A'' and dump A' as a side-effect. Which is what the user probably wanted to begin with.

[[Anchor(Why?)]]
=== Why? ===

Pig's philosophy is: Optimize it yourself, why don't you.

However:

   * Implicit splits: It's probably what you expect when you use the same handle in different stores.
   * Store/Load vs Split: When optimizing, it's a reasonable assumption that splits are faster than load/store combinations
   * Side-effects: There is no way right now to make use of this

[[Anchor(Changes)]]
=== Changes ===

[[Anchor(Execution_in_batch_mode)]]
==== Execution in batch mode ====

Batch mode is entered when Pig is given a script to execute. Interactive mode is on the grunt shell ("grunt:>"). Right now there isn't much difference between them. In order for us to optimize the multi-query case, we'll need to distinguish the two more. 

Right now whenever the parser sees a store (or dump, explain, illustrate or describe) it will kick of the execution of that part of the script. Part of this proposal is that in batch mode, we parse the entire script first and see if we can combine things to reduce the overall amount of work that needs to be done. Only after that will the execution start. 

The following changes are proposed (in batch):

   * Store will not trigger an immediate execution. The entire script is considered before the execution starts.
   * Explicit splits will be put in places where a handle has multiple children. If the user wants to explicitly force re-computation of common ancestors she has to provide multiple scripts.
   * Multiple split branches/stores in the script will be combined into the same job, if possible. Again, using multiple scripts is the way to go to avoid this (if that is desired).

For diagnostic operators there are some problems with this:

   * They work on handles, which only gives you a slice of the entire script execution at a time. What's more, is that at the point they may occur in a script they might not give you an accurate picture about the situation, since the execution plans might change once the entire script is handled.
   * They change the logical tree. This means that we need to clone the tree before we run them - something that we want to avoid in batch execution.

The proposal therefore is:

   * Have Pig in batch mode ignore explain, dump, illustrate and describe.
   * Add a load command to the shell to execute a script in interactive mode.
   * Add scripts as a target (in additions to handles) to some diagnostic parameters.
   * Add dot as an output type to explain (a graphical explanation of the graph will make multi-query explains more understandable.)

That means that while someone is developing a PIG script they can put any diagnostic operator into the script and then go to the grunt shell and load the script. The statement will be executed and give you some information about that part of the script. When a script is loaded, the user will also be able to refer to any handles defined in the script on the shell. 

Finally, when the script is ready the user can run the same script in batch and all the diagnostic operators are ignored.

[[Anchor(Load)]]
==== Load ====

(See https://issues.apache.org/jira/browse/PIG-574 - this is basically the same as requested there)

The new command has the format:

{{{
load <script name>
}}}

Which will run the script in interactive mode.

[[Anchor(Explain)]]
==== Explain ====

Changes to the command:

{{{
explain <script>||<handle> [using text||dot] [into <path>]
}}}

Behavior:

   * Explain is not executed in batch mode.
   * If explain is given a script, it will output the entire execution graph (logical, physical, MR + moving result files)

Text/Dot:

   * Text will give what we have today, dot will output a format that can be passed to dot for graphical display.
   * In Text mode, multiple output (split) will be broken out in sections.
   * Default (no using clause): Text

Path:

   * Will generate logical.[txt||dot], physical.[txt||dot], mapred.[txt||dot] in the specified directory.
   * Default (no path given): Stdout

[[Anchor(Illustrate)]]
==== Illustrate ====

Changes to the command:

{{{
illustrate <script>||<handle> [into <file>]
}}}

Behavior:

   * Illustrate is not executed in batch mode.
   * If illustrate is given a script, it will output the entire execution graph (logical, physical, MR + moving result files)

File:

   * Will write the illustrate output into the specified file.
   * Default: Stdout

[[Anchor(Phases)]]
== Phases ==

These are the identified steps to get the proposal implemented.

[[Anchor(Phase_1)]]
=== Phase 1 ===
Phase one is about getting some infrastructural things in place. 

   * Batch execution (instead of single store execution)
   * Merge logical plans into single
   * Updated explain/illustrate
   * Change local and hadoop engine to execute multiple store plans (with store-load per split).

At the end of phase one, we'll have implicit splits. But the MR plan will be the same as if the user made the splits explicit.

[[Anchor(Phase_2)]]
=== Phase 2 ===
Phase two is about getting the MR Compiler to agree to this.

   * Allow multiple stores in single job
   * Merge multiple plans into the split operator
   * Terminate all but one with stores

[[Anchor(Phase_3)]]
=== Phase 3 ===
Phase three is about using the combiner/reducer on multiple split branches in the same job

   * Merge Combiner/Reducer plans
   * Put in logic to decide when to multiplex pipelines

[[Anchor(Internal)]]
== Internal Changes ==

[[Anchor(Grunt_parser_(Phase_1))]]
==== Grunt parser (Phase 1) ====
The parser currently uses a bottom up approach. When it sees a store (dump, explain), it goes bottom up and generates the plan that needs to happen for this particular store. In order to optimize the multi-query example, we need, however, a peek on the entire graph for a script (interactive mode can be handled differently).

In order to do this we will change the batch mode of the parser to:

   * Not execute the plan when we see a store (or dump, illustrate, describe, explain - which will be ignored)
   * Alter the already existing merge functionality to allow intersecting graphs to be joined into a single logical plan.
   * Wait until the entire script is parsed and merged before sending the plan on to do validation, optimization, etc.

The new "load" command will simply feed all the lines of a script through the interactive mode.

[[Anchor(Explain,_Dump,_and_Illustrate_(Phase_1))]]
==== Explain, Dump, Describe and Illustrate (Phase 1) ====

As described above the changes are:

   * Ignore these operations in batch mode
   * Add options to explain and illustrate to work on a script file as well as a handle.
   * Add the ability to print plans as dot files and to write explain and illustrate output to files.

There will be some work to nicely represent the graphs resulting from explain in text form. Right now operators with multiple outputs will result in the ancestor tree be duplicated for each output. It might be nicer to show the ancestors once and mark the other places as copies of that one.

[[Anchor(Local_Execution_engine_(Phase_1))]]
==== Local Execution engine (Phase 1) ====
We need to make the local execution engine understand multiple store plans too. This might come for free or at least cheaply. The current local engine uses a call to store() on each physical store node to trigger the execution of the pipeline and write out the result. Split is realized as a blocking operator that will process the entire input and hands out an iterator to the tuples.

This will give the right result, but re-processes all the dependencies once per store. We might in a later phase want to align this better with the hadoop engine and allow a non blocking split as well as a separating the storing of the records with the pulling from the pipeline.

[[Anchor(Implicit_split_insertion_(Phase_1))]]
==== Implicit split insertion (Phase 1) ====
Implicit split insertion already exists as part of the optimizer. It translates any non-split logical node with multiple outputs into a split - split output combination. This is what we need to put the splits for the multi-query optimization in place. Right now, however, the parser is set up in a way that multiple stores will never end up in the same plan and thus the insertion doesn't happen for this case. 

In short: Once we change the parser to look at the entire graph for a script instead of a store-by-store basis, we will get this for free. We might actually have to add logic to suppress this behavior in cases where the split will be slower than the dual processing.

[[Anchor(Store/Multiple_output_(Phase_2))]]
==== Store/Multiple output (Phase 2) ====
If we put implicit splits in place and enhance splits to contain additional operators or even multiplex the split output within the same map reduce job, at some point either a map or a reduce job need to be able to produce multiple outputs. Currently there is a single output collector that will store the results in part-* files.

Here are some options:
[[Anchor(hadoop_0.19_supports_MultipleOutput)]]
===== hadoop 0.19 supports MultipleOutput =====
Link: http://hadoop.apache.org/core/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html#addNamedOutput(org.apache.hadoop.mapred.JobConf,%20java.lang.String,%20java.lang.Class,%20java.lang.Class,%20java.lang.Class)

All the output will still be in the same directory, but the developer can give name for different sets of output data. So, in our case we might name the output "split1" and "split2" and the output would come out to be:

{{{
/outdir/split1-0000
/outdir/split1-0001
/outdir/split1-0002
/outdir/split2-0000
}}}

[[Anchor(Side-Effect_files)]]
===== Side-Effect files =====
Link: http://hadoop.apache.org/core/docs/current/mapred_tutorial.html#Task+Side-Effect+Files

[[Anchor(Store_operator_(Phase_2))]]
===== Store operator (Phase 2) =====
In the current system the store op is removed by JobControlCompiler and used to set up the output directory, etc. It holds information about how and where to store the files, but doesn't actually do anything in the pipeline. The logic for storing records in the map-reduce case, is handled by the map only and map reduce classes. The logic is simple: Whatever comes out of the pipeline will be transformed into a key/value pair collected using the output collector. 

Using multiple output it would make sense to let the store operator do the actual collection. The job compiler would have the duty to configure multiple output streams and then assign the right collectors to the store operator. The actual mapper/reducers will still have the responsibility to run the pipeline and check for errors but the storing is handled by the store operator. Anything that is not stored and trickles out at the bottom of the pipeline goes into the standard collector. After the job is run we will have to move the files to the right directories.

It seems to make more sense to use hadoop's multiple output functionality. Trying to build the same functionality with side-effect files will duplicate efforts made in hadoop 19. However, that way we might have to provide modes to run the queries differently depending on the version of hadoop.

[[Anchor(Split_operator_(Phase_2))]]
==== Split operator (Phase 2) ====

The goal is to make the split operator non-blocking. Instead of dumping a split immediately to disk we'll try to keep on going as long as possible. So the split operator would return the same record for as many times as there are children. This leaves you with multiple branches of the operator tree in the same map or reduce stage. These are going to be realized as nested plans inside split.

[[Anchor(Multiplex/Demultiplex_(Phase_3))]]
===== Multiplex/Demultiplex (Phase 3) =====

The multiplex operation will serialize different nested plans of the split into the same output stream. If there are multiple distinct output streams it will add a key to distinguish between those. If all but one of the branches are terminated by a store, it will simple stream the single remaining one to the output.

Demuxing is done by the split operator followed by a special filter. The filter will only accept tuples for the particular pipeline.

In phase 2, we'll simply terminate all but one split branch and store it into a tmp dir.

[[Anchor(Combiner/Reducer_(Phase_3))]]
===== Combiner/Reducer (Phase 3) =====

Multiplexing different branches into one stream allows us to run a combiner on the result - reducing the amount of data that will written. Otherwise we would have to dump everything in a map only job and then start one or more MR jobs to pick up the pieces.

The current plan is to split the byte used for the join key between splits and joins. That leaves a nibble for each and reduces the number of joins and splits to 16.

[[Anchor(MRCompiler_(Phase_2_and_3))]]
===== MRCompiler (Phase 2 and 3) =====
The MR Compiler right now looks for splits, terminates the MR job at that point and connects the remaining operators via load and store.

We'll add a new optimizer pass to look for these split scenarios. This gives us the ability to use the combiner plan information to make the determination of multipexing or not (Phase 3) and also allows us more easily to switch back to the old style handling, if multiple outputs are not available.

[[Anchor(Parallelism_(Phase_3))]]
===== Parallelism (Phase 3) =====

If we multiplex outputs from different split branches we have to decide what to do with the requested parallelism: Max, sum or average?

[[Anchor(Diamond_problem_(Phase_3))]]
==== Diamond problem (Phase 3) ====
What happens when different split plans come back together?

Should come for free. Need to make sure unions can handle multiple split branches.