You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by Apache Wiki <wi...@apache.org> on 2008/02/02 02:31:46 UTC

[Pig Wiki] Update of "PigStreamingFunctionalSpec" by OlgaN

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The following page has been changed by OlgaN:
http://wiki.apache.org/pig/PigStreamingFunctionalSpec

New page:
[[Anchor(Pig_Streaming_Functional_Specification)]]
= Pig Streaming Functional Specification =

Streaming can have three separate meaning in the context of Pig project:

 1. A specific way of submitting jobs to Hadoop: Hadoop Streaming
 2. A form of processing in which the entire portion of the dataset that corresponds to a task in sent to the task and output streams out. There is no temporal or causal correspondence between an input record and specific output records.
 3. The use of non-Java functions with Hadoop.

The goal of Pig with respect to streaming is to support #2 for (a)Java UDFs, (b)non-Java UDFs and (c)user specified binaries/scripts. We will start with (c) since it would be most beneficial for the users. It is not our goal to be feature-by-feature compatible with Hadoop streaming as it is too open-ended and might force us to implement features that we don't necessarily want in Pig.

This document proposes the functional specification to fullfil 2.c

[[Anchor(Feature_Specification)]]
== Feature Specification ==

[[Anchor(Computation_Specification)]]
=== 1 Computation Specification ===

==== 1.1 Ability to specify streaming via binary/script. ====

==== 1.2 Ability to specify parameters to streaming ====

==== 1.3 Separation of streaming specification from stream operator. ====

For simple specifications, it is convenient for the user to be able to specify the computation directly within streaming operator:

{{{
A = load 'data';
B = stream A through `stream.pl`;
}}}

However, for more complex specifications involving serializer/deserializer or input/output via file, the user will be required to use `define` command:

{{{
<define command> ::= define <alias> <computation spec>
<alias> ::= pig identifier
<comparison spec> ::= <UDF spec> | <command spec>
<UDF spec> ::= pig standard function spec
<command spec> ::= `<command>` [<input spec>] [<output spec>] [<ship_spec>] [<cache_spec>]
<command> ::= standard Unix command including the arguments
<input spec>::= input (<input stream spec> [using <serializer>]{, <input stream spec> [using <serializer>]})
<output spec>::= output (<output stream spec> [using <deserializer>]{, <output stream spec> [using <deserializer>]})
<ship spec>::=ship(<file spec>{,<file spec>})
<cache spec>::=cache(<file spec>{,<file spec>})
<input stream spec> ::= stdin | <file spec>
<output stream spec> ::= stdout | <file spec>
<file spec> ::= unix file path
<serializer> ::= <udf spec>
<deserializer> ::= <udf spec>
}}}

Note that we use backticks to enclose the streaming command. This is because we want to reserve quotes for use within the command without need to escape. Backticks should be reasonably intuitive for a user since in Unix they are associated with notion of "execute this".

Given the definition above, the stream command will look as follows:

{{{
<stream command>::= <alias> = stream <alias>{,<alias>} through <streaming spec> [<schema spec>]
<streaming spec>::= <alias> | <UDF spec> | <command>
<schema spec>:: standard pig schema spec like as (x,yz)
}}}

Within stream operator, If streaming specification is enclosed in backticks, it is assumed to be a streaming command specified inline. Otherwise, the string is looked up in the alias hash and if found assumed to be an alias. Otherwise, an error is reported.

{{{
A = load 'data';
define cmd `stream_cmd -input file.dat`
S1 = stream A through cmd;
}}}

More examples in the later sections.

==== 1.4 Data Guarantees ====

The guarantees made to the user will be determined based on the position of the streaming operator in the pig script. The following types will be supported:

   * *unordered* - no guarantees on the order in which the data is delivered to the streaming application
   * *grouped* - the data for the same key is guaranteed to be processed contiguously on a single node
   * *grouped and ordered* - date is grouped and sorted within a group on user specified key.

In addition to position, the data grouping and ordering can be determine by the data itself. For now, users would need to know the property of the data to be able to take advantage of its structure; however, eventually, this should be part of metadata.

'''Example 1''': The data given to streaming is _unordered_

{{{
A = load 'data';
B = stream A through `stream.pl`;
}}}

'''Example 2''': The data is _grouped_

{{{
A = load 'data';
B = group A by $1;
C = foreach B flatten(A);
D = stream C through `stream.pl`
}}}

'''Example 3''': data is _grouped and ordered_

{{{
A = load 'data';
B = group A by $1;
C = foreach B {
    D = order A BY ($3, $4);
    generate D;
}
stream C through `stream.pl`
}}}

[[Anchor(Job_Management)]]
=== 2 Job Management ===

==== 2.1 Ability to ship computation and support data ====

We need to be able to ship streaming binary and supporting files, if any, from the client box to the computation nodes. 

A user can specify the files to ship via `ship` clause in the define. For instance,

{{{
define X `stream.pl foo.txt` ship('stream.pl', 'foo.txt')
}}}

If `ship` and `cache` options are not specified, pig will attempt to ship the binary in the following way:

   * If the first word on the streaming command is `perl` or `python`, pig would assume that the binary is the first string it encounters that does not start with dash.
   * Otherwise, pig will attempt to ship the first string from the command line as long as it does not come from `/bin, /user/bin, /user/local/bin, /home/y/bin`. It will determine that by scanning the path if an absolute path is provided or by executing `which`.

To prevent a command from being shipped, an empty list can be passed to =clause=.

==== 2.2 Ability to cache data ====

The approach described above works fine for binaries/jars and small data sets. For larger datasets, loading them at run time for every execution can have serious performance consequences. 

Similarly to 2.1, a user will be able to specify files to cache via =cache= clause in the define statement. For instance,

{{{
define X `stream.pl foo.gz` ship('stream.pl') cache('foo.gz')
}}}

[[Anchor(Input/Output_Handling)]]
=== 3 Input/Output Handling ===

==== 3.1 Data serialization/deserialization ====

By default, the data going into the streaming command and the one coming out is assumed to be tab delimited.

{{{
S = stream A through `stream.pl`;
}}}

In the example above, the elements of A are concatenated with tabs and passed to `stream.pl`. The output of streaming is processed one line at a time and split on tabs. The user would be able to provide an alternative delimiter to default (de)serializer via `define command`:

{{{
define X `stream.pl` input(stdin using DefaultSerializer('^A')) output (stdout using DefaultDeserializer('^A'));
S = stream A through X;
}}}

User will be able to provide custom serializer and deserializer. This would look something like example below and is similar to load/store commands. In fact, load function is identical to the deserializer and store to serializer.

{{{
define X `stream.pl` input(stdin using MySerializer) output (stdout using MyDeserializer);
S = stream A through X;
}}}

The following serializers/deserializer will be part of pig distribution:

 1. !DefaultSerializer, !DefaultDeserializer as described above
 2. !FlattenSerializer - it would take a bag and flatten it before passing it to streaming application.
 3. !PythonSerializer, !PythonDeserializer 

==== Ability to declare schema for streaming output ====

Just like the case with load, rather than relying on the position, the user can give names to the output columns:

{{{
S = stream A through `stream.pl` as (a, b ,c);
}}}

==== 3.3 Ability to let streaming read from file(s) on disk ====

This is useful in two cases:

 1. To use existing binaries that expect input in a file rather than stdin.
 2. To provide multiple inputs.

In the initial release, we would only address the first case. Support for multiple inputs would be delayed till the later versions.

In case of a single input coming from the file the command would look as follows

{{{
define Y `stream.pl foo` input('foo' using MySerializer) output (stdout using MyDeserializer);
Z = stream X through Y;
}}}

This statement will cause the content of `X` to be written to file `foo` using `MySerializer`.

==== 3.4 Ability to write output(s) to the disk ====

The motivation here is similar for ability to read data from disk described in 3.3:

 1. To accommodate existing applications that write output to the disk rather than stdout.
 2. To provide ability to create multiple outputs.

In the initial release, only limitted support for multiple outputs would be provided as described below.

To accommodate the first case, the following command can be used:

{{{
define Z `stream.pl outputfile` output('outputfile' using MyDeserializer);
Y = stream X through Z;
}}}

This tells pig that streaming application stored its complete output into file called _outputfile_ and that the content of that file should be serialized into Y using !MySerializer. (The `outputfile` is written by `stream.pl` by using standard open/write/close functions. Pig might need to capture the data and store them as part files in DFS. )

A user can specify multiple outputs but only the first one will be automatically loaded; the rest would be stored in dfs using the file name specified in the output as absolute path:

{{{
define A `stream.pl` output('output1', '/data/mydata/output2' using MyDeserializer);
Y = stream X through A;
}}}

In this example, output1 would loaded into Y while the second output will be stored in DFS in the file called  '/data/mydata/output2'.

[[Anchor(Non-streaming_Features)]]
=== 4 Non-streaming Features ===

This section describes a set of features that are not directly related to streaming but would be very useful in the context of streaming.

==== 4.1 Logging/Debugging/Error handling ====

===== 4.1.1 Logging =====

stderr of streaming application needs to be captured and presented to the user in a easily digestable format. The user will be presented the output of each streaming task separately with the header that includes the following information:  task name, task result code, start time, end time.

===== 4.1.2 Error Handling =====

If streaming application fails, pig will report the error including return code and any stderr reported by the streaming application.

If available, it would also report on the amount of data consumed so far.

==== 4.2 Ability to templatize pig scripts ====

See https://issues.apache.org/jira/browse/PIG-58 for details 

==== 4.3 Ability to processing binary data ====

Sometimes, applications need to consume the entire data file without any parsing. All we would need in this case is to provide a custom loader function that just reads the entire data.

{{{
A = load 'data' using AsIsLoader();
B = stream A by `stream.pl`
}}}

==== 4.3 Working with scalars ====

Several users need to have a way to compute a scalar using pig and use it in later computations.

The want to be able to do something like this:

{{{
A = load 'data1';
B = group A all;
C = foreach B generate COUNT(B);
D = load 'data2';
E = foreach D generate $1/C;
}}}

This does not work in pig because C is a relation not a scalar. Proposed short term solution is to do the following:

{{{
A = load 'data1';
B = group A all;
C = foreach B generate COUNT(B);
store C into 'count';
D = load 'data2';
E = foreach D generate $1/GetScalar(C);
}}}

`GetScalar` is a udf that reads the file on the first invocation and produces its content in the form of a value. This UDF will be provided as part of pig distribution till better solution is available.

[[Anchor(Performance)]]
=== 5 Performance ===

We should have a performance target in mind as compared to Hadoop streaming. I think for the initial release it would make sense to aim for '''30%''' overhead for streaming in Pig.

The following optimizations were suggested: