You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Stephan Ewen <se...@apache.org> on 2014/07/01 14:57:51 UTC

Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Hey Krzysztof!

Everything looks standard there.

Let me ask those questions, to make sure I get the discussion right:
 - You are running a two node setup. Has one node the master and a worker,
the other one has a worker only? Or do you have a dedicated master node?
 - Are the example on small data and on large data strictly the same,
except for differently sized input files?

Most importantly:
 - It the input file is available on the workers, is it available under the
path "/home/krzysztof/stratosphere05/generatedFrequencies2.txt" ?

My guess right now is still that there the workers do not see the file
properly.

Greetings,
Stephan




On Sun, Jun 29, 2014 at 4:50 PM, Krzysztof Pasierbinski <
Krzysztof.Pasierbinski@dfki.de> wrote:

> Hi Stephan,
> I have got 2 node configuration. The first node is a master and a worker,
> the second node is a worker. File path, Flink (Stratosphere) version and
> operation system is the same on both nodes.
> My test program is in the attachment (simple modification of "Word count"
> example).
> The execution plan looks like this:
> {
>     "nodes": [
>
>     {
>         "id": 4,
>         "type": "source",
>         "pact": "Data Source",
>         "contents": "TextInputFormat
> (file:/home/krzysztof/stratosphere05/generatedFrequencies2.txt) - UTF-8",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "3..6 GB" },
>             { "name": "Est. Cardinality", "value": "98.43 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "3..6 GB" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "3..6 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 3,
>         "type": "pact",
>         "pact": "FlatMap",
>         "contents":
> "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 4, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Map",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 2,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MAX(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 3, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Sorted Combine",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 1,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MAX(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 2, "ship_strategy": "Hash Partition on [0]",
> "local_strategy": "Sort (combining) on [0:ASC]"}
>         ],
>         "driver_strategy": "Sorted Group Reduce",
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "(unknown)" },
>             { "name": "Disk I/O", "value": "(unknown)" },
>             { "name": "CPU", "value": "(unknown)" },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 0,
>         "type": "sink",
>         "pact": "Data Sink",
>         "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@5bf734",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 1, "ship_strategy": "Forward"}
>         ],
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 8,
>         "type": "pact",
>         "pact": "FlatMap",
>         "contents":
> "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 4, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Map",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 7,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MIN(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 8, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Sorted Combine",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 6,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MIN(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 7, "ship_strategy": "Hash Partition on [0]",
> "local_strategy": "Sort (combining) on [0:ASC]"}
>         ],
>         "driver_strategy": "Sorted Group Reduce",
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "(unknown)" },
>             { "name": "Disk I/O", "value": "(unknown)" },
>             { "name": "CPU", "value": "(unknown)" },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 5,
>         "type": "sink",
>         "pact": "Data Sink",
>         "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@1ced484",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 6, "ship_strategy": "Forward"}
>         ],
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     }
>     ]
> }
>
> -----Ursprüngliche Nachricht-----
> Von: ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] Im Auftrag von
> Stephan Ewen
> Gesendet: Sonntag, 29. Juni 2014 16:33
> An: dev@flink.incubator.apache.org
> Betreff: Re: Cluster execution of an example program ("Word count") and a
> problem related to the modificated example
>
> Hi Krzysztof!
>
> Indeed, the input size should not matter. Can you tell us the details of
> the setup that worked?
>
> The built-in examples work without distributed file system, because they
> do not depend on files. The example programs set a Java Collection as the
> input, which gets distributed as part of the program.
>
> Stephan
>

Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Posted by Stephan Ewen <se...@apache.org>.
Yes, I thought about that the same way. It will still generate two splits,
but they may end up at the same worker, if that worker is fast enough.

This sounds almost like a case for an "auto-dop" option for inputs.

Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Posted by Fabian Hueske <fh...@apache.org>.
I thought about why it had worked for small input files.

Obviously, all input split have been read from the (local) worker that is
on the same machine as the master (otherwise the remote worker would have
raised a FileNotFoundException). Since the input splits were very small,
reading the first input split might have been faster than the request of
the remote worker to the JobManager to assign an input split. Therefore,
the local worker could requested the next input split before the remote
worker and read the full file.

Once the input was larger, reading the first split took longer than the
request of the remote worker which got a split assigned and failed to read
the file.




2014-07-07 12:16 GMT+02:00 Stephan Ewen <se...@apache.org>:

> Hi!
>
> Okay, good to hear you solved the issue. HDFS is a good way to go in large
> setups, though shared filesystems / SANs that are mounted on all machines
> work as well (using Amazon EBS is an example for that).
>
> Stephan
>

Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Okay, good to hear you solved the issue. HDFS is a good way to go in large
setups, though shared filesystems / SANs that are mounted on all machines
work as well (using Amazon EBS is an example for that).

Stephan

AW: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Posted by Krzysztof Pasierbinski <Kr...@dfki.de>.
Hi Stephan,
thank you for your support.
I don't have a dedicated master node. My master node is worker at the same time. The second node is a worker only. The small and big files have exactly the same structure (type, path, structure, even the name - only the size and values change). At first, the input file was only available at master node (and for small files it works!). After copying the input file to the second worker node it works fine but I don't think that is the effective way to go. So I am going to switch to HDFS.


-----Ursprüngliche Nachricht-----
Von: ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] Im Auftrag von Stephan Ewen
Gesendet: Dienstag, 1. Juli 2014 14:58
An: dev@flink.incubator.apache.org
Betreff: Re: Cluster execution of an example program ("Word count") and a problem related to the modificated example

Hey Krzysztof!

Everything looks standard there.

Let me ask those questions, to make sure I get the discussion right:
 - You are running a two node setup. Has one node the master and a worker, the other one has a worker only? Or do you have a dedicated master node?
 - Are the example on small data and on large data strictly the same, except for differently sized input files?

Most importantly:
 - It the input file is available on the workers, is it available under the path "/home/krzysztof/stratosphere05/generatedFrequencies2.txt" ?

My guess right now is still that there the workers do not see the file properly.

Greetings,
Stephan




On Sun, Jun 29, 2014 at 4:50 PM, Krzysztof Pasierbinski < Krzysztof.Pasierbinski@dfki.de> wrote:

> Hi Stephan,
> I have got 2 node configuration. The first node is a master and a 
> worker, the second node is a worker. File path, Flink (Stratosphere) 
> version and operation system is the same on both nodes.
> My test program is in the attachment (simple modification of "Word count"
> example).
> The execution plan looks like this:
> {
>     "nodes": [
>
>     {
>         "id": 4,
>         "type": "source",
>         "pact": "Data Source",
>         "contents": "TextInputFormat
> (file:/home/krzysztof/stratosphere05/generatedFrequencies2.txt) - UTF-8",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "3..6 GB" },
>             { "name": "Est. Cardinality", "value": "98.43 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "3..6 GB" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "3..6 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 3,
>         "type": "pact",
>         "pact": "FlatMap",
>         "contents":
> "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 4, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Map",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 2,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MAX(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 3, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Sorted Combine",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 1,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MAX(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 2, "ship_strategy": "Hash Partition on [0]",
> "local_strategy": "Sort (combining) on [0:ASC]"}
>         ],
>         "driver_strategy": "Sorted Group Reduce",
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "(unknown)" },
>             { "name": "Disk I/O", "value": "(unknown)" },
>             { "name": "CPU", "value": "(unknown)" },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 0,
>         "type": "sink",
>         "pact": "Data Sink",
>         "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@5bf734",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 1, "ship_strategy": "Forward"}
>         ],
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 8,
>         "type": "pact",
>         "pact": "FlatMap",
>         "contents":
> "eu.stratosphere.example.java.wordcount.WordCount$Tokenizer2",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 4, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Map",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 7,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MIN(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 8, "ship_strategy": "Forward"}
>         ],
>         "driver_strategy": "Sorted Combine",
>         "global_properties": [
>             { "name": "Partitioning", "value": "RANDOM" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "(none)" },
>             { "name": "Grouping", "value": "not grouped" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "492.19 M" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "0.0 B" },
>             { "name": "Cumulative Disk I/O", "value": "1..82 GB" },
>             { "name": "Cumulative CPU", "value": "0.0 " }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 6,
>         "type": "pact",
>         "pact": "GroupReduce",
>         "contents": "SUM(2),MIN(1)",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 7, "ship_strategy": "Hash Partition on [0]",
> "local_strategy": "Sort (combining) on [0:ASC]"}
>         ],
>         "driver_strategy": "Sorted Group Reduce",
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "(unknown)" },
>             { "name": "Disk I/O", "value": "(unknown)" },
>             { "name": "CPU", "value": "(unknown)" },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     },
>     {
>         "id": 5,
>         "type": "sink",
>         "pact": "Data Sink",
>         "contents": "eu.stratosphere.api.java.io.CsvOutputFormat@1ced484",
>         "parallelism": "2",
>         "subtasks_per_instance": "1",
>         "predecessors": [
>             {"id": 6, "ship_strategy": "Forward"}
>         ],
>         "global_properties": [
>             { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>             { "name": "Partitioned on", "value": "[0]" },
>             { "name": "Partitioning Order", "value": "(none)" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "local_properties": [
>             { "name": "Order", "value": "[0:ASC]" },
>             { "name": "Grouped on", "value": "[0]" },
>             { "name": "Uniqueness", "value": "not unique" }
>         ],
>         "estimates": [
>             { "name": "Est. Output Size", "value": "(unknown)" },
>             { "name": "Est. Cardinality", "value": "(unknown)" }        ],
>         "costs": [
>             { "name": "Network", "value": "0.0 B" },
>             { "name": "Disk I/O", "value": "0.0 B" },
>             { "name": "CPU", "value": "0.0 " },
>             { "name": "Cumulative Network", "value": "(unknown)" },
>             { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>             { "name": "Cumulative CPU", "value": "(unknown)" }
>         ],
>         "compiler_hints": [
>             { "name": "Output Size (bytes)", "value": "(none)" },
>             { "name": "Output Cardinality", "value": "(none)" },
>             { "name": "Avg. Output Record Size (bytes)", "value": "(none)"
> },
>             { "name": "Filter Factor", "value": "(none)" }        ]
>     }
>     ]
> }
>
> -----Ursprüngliche Nachricht-----
> Von: ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] Im Auftrag 
> von Stephan Ewen
> Gesendet: Sonntag, 29. Juni 2014 16:33
> An: dev@flink.incubator.apache.org
> Betreff: Re: Cluster execution of an example program ("Word count") 
> and a problem related to the modificated example
>
> Hi Krzysztof!
>
> Indeed, the input size should not matter. Can you tell us the details 
> of the setup that worked?
>
> The built-in examples work without distributed file system, because 
> they do not depend on files. The example programs set a Java 
> Collection as the input, which gets distributed as part of the program.
>
> Stephan
>