You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jesse Anderson <je...@smokinghand.com> on 2016/05/18 17:48:39 UTC

Collecting Results to Executing Process

Is there a way to get the PCollection's results in the executing process?
In Spark this is using the collect method on an RDD.

This would be for small amounts of data stored in a PCollection like:

PCollection<Long> count = partition.apply(Count.globally());
System.out.println(valuesincount);

Thanks,

Jesse

Re: Collecting Results to Executing Process

Posted by Jesse Anderson <je...@smokinghand.com>.
I did this with a lambda. Code in case someone's searching lands them here:
      merged.apply(MapElements.<KV<String, Long>, Long>via((KV<String,
Long> input) -> {
        System.out.println(input);
        return 0L;
      }).withOutputType(TypeDescriptors.longs()));

Caveats: use only on small datasets. This code only works locally like on
an IDE.

On Fri, May 20, 2016 at 11:21 AM Davor Bonaci <da...@google.com> wrote:

> The question of displaying contents of a (small) PCollection has been
> asked several times. There are various reasons why this is challenging,
> primarily centered around leading users into a sub-optimal outcome /
> experience. On the other hand, usefulness and educational value is
> certainly there.
>
> This is probably something we should rethink -- but, in the meanwhile,
> writing a debugging DoFn is the way to go.
>
> On Fri, May 20, 2016 at 7:37 AM, Ismaël Mejía <ie...@gmail.com> wrote:
>
>> This reminds me of my first email to this list when I created the
>> Log.Print transform to print intermidiate steps Jesse, maybe you can use
>> something like that:
>>
>>
>> https://github.com/iemejia/beam-playground/blob/master/src/test/java/org/apache/beam/contrib/transforms/DebugTest.java
>>
>> Frances is right for a real case scenario, but the case that Jesse
>> mentions (similar to mine) is the development time use where commonly we
>> play with a local sample and we need to see what happened after the
>> PTransforms. This also can be useful for the integration with notebooks
>> (Zeppelin, Jupyter, etc).
>>
>> Ismael
>>
>>
>> On Fri, May 20, 2016 at 3:55 PM, Jesse Anderson <je...@smokinghand.com>
>> wrote:
>>
>>> @amit, yes that's the reason I was looking for one. Students often want
>>> a quick sanity check for their processing. The workaround now is to write
>>> out the collection to a file and then open it.
>>>
>>> On Fri, May 20, 2016, 12:15 AM Amit Sela <am...@gmail.com> wrote:
>>>
>>>> For the Spark runner this will print to the driver stdout, which in
>>>> some cases is the console you're running the job from (depends on execution
>>>> modes - standalone/yarn-client), and in others it's the driver's stdout log
>>>> (yarn-cluster, not sure about Mesos).
>>>>
>>>> Generally, if such PTransform is considered for the SDK, I think it
>>>> should print to stdout in a centralized location (in Spark it's the driver
>>>> node) and it should be capped by N to avoid OOM. I can definitely see this
>>>> useful for development, even if it's mostly for IDE use, and I think this
>>>> is what Jesse was looking for because debugging a PCollection's content in
>>>> the IDE is useless unless you apply the runner's (implementation of)
>>>> "collect" method to see what it contains.
>>>>
>>>> If we're looking for something to print the content of PCollection's
>>>> partitions on the nodes, this could simply be a composite transform that
>>>> iterates over the first N elements in the partition and logs their values.
>>>>
>>>> On Fri, May 20, 2016 at 8:56 AM Aljoscha Krettek <al...@apache.org>
>>>> wrote:
>>>>
>>>>> I removed ConsoleIO in my latest PR since it was specific to Flink. It
>>>>> was just a wrapper for a DoFn that prints to System.out, so it would be
>>>>> super easy to implement in core Beam. I don't how, however, if this is
>>>>> useful since for most runners this would print to some file on the worker
>>>>> where the DoFn is running. This is only useful when running inside an IDE.
>>>>>
>>>>> On Fri, 20 May 2016 at 06:06 Frances Perry <fj...@google.com> wrote:
>>>>>
>>>>>> Looks like Flink and Spark have their own implementations of
>>>>>> ConsoleIO but there's no SDK level version. Generally that goes against the
>>>>>> goal of having generally useful IO connectors, with potential
>>>>>> runner-specific overrides. Is there a generalized version we could add?
>>>>>>
>>>>>> On Thu, May 19, 2016 at 12:51 PM, Amit Sela <am...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'll join Frances and add that even Spark warns about using
>>>>>>> collect() as it might not fit into Driver memory, and like you said it's
>>>>>>> only for small datasets - but small is relative...
>>>>>>> I you want some "printout" during development you could use
>>>>>>> ConsoleIO (for streaming) - both Flink an Spark runner support this (and
>>>>>>> Beam doesn't AFAIK).
>>>>>>>
>>>>>>> On Thu, May 19, 2016 at 9:49 PM Frances Perry <fj...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Oh, and I forgot the unified model!  For unbounded collections, you
>>>>>>>> can't ask to access the whole collection, given you'll be waiting forever
>>>>>>>> ;-) Thomas has some work in progress on making PAssert support per-window
>>>>>>>> assertions though, so it'll be able to handle that case in the future.
>>>>>>>>
>>>>>>>> On Thu, May 19, 2016 at 11:26 AM, Frances Perry <fj...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Nope. A Beam pipeline goes through three different phases:
>>>>>>>>> (1) Graph construction -- from Pipeline.create() until
>>>>>>>>> Pipeline.run(). During this phase PCollection are just placeholders --
>>>>>>>>> their contents don't exist.
>>>>>>>>> (2) Submission -- Pipeline.run() submits the pipeline to a runner
>>>>>>>>> for execution.
>>>>>>>>> (3) Execution -- The runner runs the pipeline. May continue after
>>>>>>>>> Pipeline.run() returns, depending on the runner. Some PCollections may get
>>>>>>>>> materialized during execution, but others may not (for example, if the
>>>>>>>>> runner chooses to fuse or optimize them away).
>>>>>>>>>
>>>>>>>>> So in other words, there's no place in your main program when a
>>>>>>>>> PCollection is guaranteed to have its contents available.
>>>>>>>>>
>>>>>>>>> If you are attempting to debug a job, you might look into PAssert
>>>>>>>>> [1], which gives you a place to inspect the entire contents of a
>>>>>>>>> PCollection and do something with it. In the DirectPipelineRunner that code
>>>>>>>>> will run locally, so you'll see the output. But of course, in other runners
>>>>>>>>> it may helpfully print on some random worker somewhere. But you can also
>>>>>>>>> write tests that assert on its contents, which will work on any runner.
>>>>>>>>> (This is how the @RunnableOnService tests work that we are setting up for
>>>>>>>>> all runners.)
>>>>>>>>>
>>>>>>>>> We should get some documentation on testing put together on the
>>>>>>>>> Beam site, but in the meantime the info on the Dataflow site [2] roughly
>>>>>>>>> applies (DataflowAssert was renamed PAssert in Beam).
>>>>>>>>>
>>>>>>>>> Hope that helps!
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/incubator-beam/blob/eb682a80c4091dce5242ebc8272f43f3461a6fc5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
>>>>>>>>> [2]
>>>>>>>>> https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
>>>>>>>>>
>>>>>>>>> On Wed, May 18, 2016 at 10:48 AM, Jesse Anderson <
>>>>>>>>> jesse@smokinghand.com> wrote:
>>>>>>>>>
>>>>>>>>>> Is there a way to get the PCollection's results in the executing
>>>>>>>>>> process? In Spark this is using the collect method on an RDD.
>>>>>>>>>>
>>>>>>>>>> This would be for small amounts of data stored in a PCollection
>>>>>>>>>> like:
>>>>>>>>>>
>>>>>>>>>> PCollection<Long> count = partition.apply(Count.globally());
>>>>>>>>>> System.out.println(valuesincount);
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Jesse
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>
>

Re: Collecting Results to Executing Process

Posted by Davor Bonaci <da...@google.com>.
The question of displaying contents of a (small) PCollection has been asked
several times. There are various reasons why this is challenging, primarily
centered around leading users into a sub-optimal outcome / experience. On
the other hand, usefulness and educational value is certainly there.

This is probably something we should rethink -- but, in the meanwhile,
writing a debugging DoFn is the way to go.

On Fri, May 20, 2016 at 7:37 AM, Ismaël Mejía <ie...@gmail.com> wrote:

> This reminds me of my first email to this list when I created the
> Log.Print transform to print intermidiate steps Jesse, maybe you can use
> something like that:
>
>
> https://github.com/iemejia/beam-playground/blob/master/src/test/java/org/apache/beam/contrib/transforms/DebugTest.java
>
> Frances is right for a real case scenario, but the case that Jesse
> mentions (similar to mine) is the development time use where commonly we
> play with a local sample and we need to see what happened after the
> PTransforms. This also can be useful for the integration with notebooks
> (Zeppelin, Jupyter, etc).
>
> Ismael
>
>
> On Fri, May 20, 2016 at 3:55 PM, Jesse Anderson <je...@smokinghand.com>
> wrote:
>
>> @amit, yes that's the reason I was looking for one. Students often want a
>> quick sanity check for their processing. The workaround now is to write out
>> the collection to a file and then open it.
>>
>> On Fri, May 20, 2016, 12:15 AM Amit Sela <am...@gmail.com> wrote:
>>
>>> For the Spark runner this will print to the driver stdout, which in some
>>> cases is the console you're running the job from (depends on execution
>>> modes - standalone/yarn-client), and in others it's the driver's stdout log
>>> (yarn-cluster, not sure about Mesos).
>>>
>>> Generally, if such PTransform is considered for the SDK, I think it
>>> should print to stdout in a centralized location (in Spark it's the driver
>>> node) and it should be capped by N to avoid OOM. I can definitely see this
>>> useful for development, even if it's mostly for IDE use, and I think this
>>> is what Jesse was looking for because debugging a PCollection's content in
>>> the IDE is useless unless you apply the runner's (implementation of)
>>> "collect" method to see what it contains.
>>>
>>> If we're looking for something to print the content of PCollection's
>>> partitions on the nodes, this could simply be a composite transform that
>>> iterates over the first N elements in the partition and logs their values.
>>>
>>> On Fri, May 20, 2016 at 8:56 AM Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> I removed ConsoleIO in my latest PR since it was specific to Flink. It
>>>> was just a wrapper for a DoFn that prints to System.out, so it would be
>>>> super easy to implement in core Beam. I don't how, however, if this is
>>>> useful since for most runners this would print to some file on the worker
>>>> where the DoFn is running. This is only useful when running inside an IDE.
>>>>
>>>> On Fri, 20 May 2016 at 06:06 Frances Perry <fj...@google.com> wrote:
>>>>
>>>>> Looks like Flink and Spark have their own implementations of ConsoleIO
>>>>> but there's no SDK level version. Generally that goes against the goal of
>>>>> having generally useful IO connectors, with potential runner-specific
>>>>> overrides. Is there a generalized version we could add?
>>>>>
>>>>> On Thu, May 19, 2016 at 12:51 PM, Amit Sela <am...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I'll join Frances and add that even Spark warns about using collect()
>>>>>> as it might not fit into Driver memory, and like you said it's only for
>>>>>> small datasets - but small is relative...
>>>>>> I you want some "printout" during development you could use ConsoleIO
>>>>>> (for streaming) - both Flink an Spark runner support this (and Beam doesn't
>>>>>> AFAIK).
>>>>>>
>>>>>> On Thu, May 19, 2016 at 9:49 PM Frances Perry <fj...@google.com> wrote:
>>>>>>
>>>>>>> Oh, and I forgot the unified model!  For unbounded collections, you
>>>>>>> can't ask to access the whole collection, given you'll be waiting forever
>>>>>>> ;-) Thomas has some work in progress on making PAssert support per-window
>>>>>>> assertions though, so it'll be able to handle that case in the future.
>>>>>>>
>>>>>>> On Thu, May 19, 2016 at 11:26 AM, Frances Perry <fj...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Nope. A Beam pipeline goes through three different phases:
>>>>>>>> (1) Graph construction -- from Pipeline.create() until
>>>>>>>> Pipeline.run(). During this phase PCollection are just placeholders --
>>>>>>>> their contents don't exist.
>>>>>>>> (2) Submission -- Pipeline.run() submits the pipeline to a runner
>>>>>>>> for execution.
>>>>>>>> (3) Execution -- The runner runs the pipeline. May continue after
>>>>>>>> Pipeline.run() returns, depending on the runner. Some PCollections may get
>>>>>>>> materialized during execution, but others may not (for example, if the
>>>>>>>> runner chooses to fuse or optimize them away).
>>>>>>>>
>>>>>>>> So in other words, there's no place in your main program when a
>>>>>>>> PCollection is guaranteed to have its contents available.
>>>>>>>>
>>>>>>>> If you are attempting to debug a job, you might look into PAssert
>>>>>>>> [1], which gives you a place to inspect the entire contents of a
>>>>>>>> PCollection and do something with it. In the DirectPipelineRunner that code
>>>>>>>> will run locally, so you'll see the output. But of course, in other runners
>>>>>>>> it may helpfully print on some random worker somewhere. But you can also
>>>>>>>> write tests that assert on its contents, which will work on any runner.
>>>>>>>> (This is how the @RunnableOnService tests work that we are setting up for
>>>>>>>> all runners.)
>>>>>>>>
>>>>>>>> We should get some documentation on testing put together on the
>>>>>>>> Beam site, but in the meantime the info on the Dataflow site [2] roughly
>>>>>>>> applies (DataflowAssert was renamed PAssert in Beam).
>>>>>>>>
>>>>>>>> Hope that helps!
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/incubator-beam/blob/eb682a80c4091dce5242ebc8272f43f3461a6fc5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
>>>>>>>> [2]
>>>>>>>> https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
>>>>>>>>
>>>>>>>> On Wed, May 18, 2016 at 10:48 AM, Jesse Anderson <
>>>>>>>> jesse@smokinghand.com> wrote:
>>>>>>>>
>>>>>>>>> Is there a way to get the PCollection's results in the executing
>>>>>>>>> process? In Spark this is using the collect method on an RDD.
>>>>>>>>>
>>>>>>>>> This would be for small amounts of data stored in a PCollection
>>>>>>>>> like:
>>>>>>>>>
>>>>>>>>> PCollection<Long> count = partition.apply(Count.globally());
>>>>>>>>> System.out.println(valuesincount);
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Jesse
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>

Re: Collecting Results to Executing Process

Posted by Ismaël Mejía <ie...@gmail.com>.
This reminds me of my first email to this list when I created the Log.Print
transform to print intermidiate steps Jesse, maybe you can use something
like that:

https://github.com/iemejia/beam-playground/blob/master/src/test/java/org/apache/beam/contrib/transforms/DebugTest.java

Frances is right for a real case scenario, but the case that Jesse mentions
(similar to mine) is the development time use where commonly we play with a
local sample and we need to see what happened after the PTransforms. This
also can be useful for the integration with notebooks (Zeppelin, Jupyter,
etc).

Ismael


On Fri, May 20, 2016 at 3:55 PM, Jesse Anderson <je...@smokinghand.com>
wrote:

> @amit, yes that's the reason I was looking for one. Students often want a
> quick sanity check for their processing. The workaround now is to write out
> the collection to a file and then open it.
>
> On Fri, May 20, 2016, 12:15 AM Amit Sela <am...@gmail.com> wrote:
>
>> For the Spark runner this will print to the driver stdout, which in some
>> cases is the console you're running the job from (depends on execution
>> modes - standalone/yarn-client), and in others it's the driver's stdout log
>> (yarn-cluster, not sure about Mesos).
>>
>> Generally, if such PTransform is considered for the SDK, I think it
>> should print to stdout in a centralized location (in Spark it's the driver
>> node) and it should be capped by N to avoid OOM. I can definitely see this
>> useful for development, even if it's mostly for IDE use, and I think this
>> is what Jesse was looking for because debugging a PCollection's content in
>> the IDE is useless unless you apply the runner's (implementation of)
>> "collect" method to see what it contains.
>>
>> If we're looking for something to print the content of PCollection's
>> partitions on the nodes, this could simply be a composite transform that
>> iterates over the first N elements in the partition and logs their values.
>>
>> On Fri, May 20, 2016 at 8:56 AM Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> I removed ConsoleIO in my latest PR since it was specific to Flink. It
>>> was just a wrapper for a DoFn that prints to System.out, so it would be
>>> super easy to implement in core Beam. I don't how, however, if this is
>>> useful since for most runners this would print to some file on the worker
>>> where the DoFn is running. This is only useful when running inside an IDE.
>>>
>>> On Fri, 20 May 2016 at 06:06 Frances Perry <fj...@google.com> wrote:
>>>
>>>> Looks like Flink and Spark have their own implementations of ConsoleIO
>>>> but there's no SDK level version. Generally that goes against the goal of
>>>> having generally useful IO connectors, with potential runner-specific
>>>> overrides. Is there a generalized version we could add?
>>>>
>>>> On Thu, May 19, 2016 at 12:51 PM, Amit Sela <am...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'll join Frances and add that even Spark warns about using collect()
>>>>> as it might not fit into Driver memory, and like you said it's only for
>>>>> small datasets - but small is relative...
>>>>> I you want some "printout" during development you could use ConsoleIO
>>>>> (for streaming) - both Flink an Spark runner support this (and Beam doesn't
>>>>> AFAIK).
>>>>>
>>>>> On Thu, May 19, 2016 at 9:49 PM Frances Perry <fj...@google.com> wrote:
>>>>>
>>>>>> Oh, and I forgot the unified model!  For unbounded collections, you
>>>>>> can't ask to access the whole collection, given you'll be waiting forever
>>>>>> ;-) Thomas has some work in progress on making PAssert support per-window
>>>>>> assertions though, so it'll be able to handle that case in the future.
>>>>>>
>>>>>> On Thu, May 19, 2016 at 11:26 AM, Frances Perry <fj...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Nope. A Beam pipeline goes through three different phases:
>>>>>>> (1) Graph construction -- from Pipeline.create() until
>>>>>>> Pipeline.run(). During this phase PCollection are just placeholders --
>>>>>>> their contents don't exist.
>>>>>>> (2) Submission -- Pipeline.run() submits the pipeline to a runner
>>>>>>> for execution.
>>>>>>> (3) Execution -- The runner runs the pipeline. May continue after
>>>>>>> Pipeline.run() returns, depending on the runner. Some PCollections may get
>>>>>>> materialized during execution, but others may not (for example, if the
>>>>>>> runner chooses to fuse or optimize them away).
>>>>>>>
>>>>>>> So in other words, there's no place in your main program when a
>>>>>>> PCollection is guaranteed to have its contents available.
>>>>>>>
>>>>>>> If you are attempting to debug a job, you might look into PAssert
>>>>>>> [1], which gives you a place to inspect the entire contents of a
>>>>>>> PCollection and do something with it. In the DirectPipelineRunner that code
>>>>>>> will run locally, so you'll see the output. But of course, in other runners
>>>>>>> it may helpfully print on some random worker somewhere. But you can also
>>>>>>> write tests that assert on its contents, which will work on any runner.
>>>>>>> (This is how the @RunnableOnService tests work that we are setting up for
>>>>>>> all runners.)
>>>>>>>
>>>>>>> We should get some documentation on testing put together on the Beam
>>>>>>> site, but in the meantime the info on the Dataflow site [2] roughly applies
>>>>>>> (DataflowAssert was renamed PAssert in Beam).
>>>>>>>
>>>>>>> Hope that helps!
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/incubator-beam/blob/eb682a80c4091dce5242ebc8272f43f3461a6fc5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
>>>>>>> [2]
>>>>>>> https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
>>>>>>>
>>>>>>> On Wed, May 18, 2016 at 10:48 AM, Jesse Anderson <
>>>>>>> jesse@smokinghand.com> wrote:
>>>>>>>
>>>>>>>> Is there a way to get the PCollection's results in the executing
>>>>>>>> process? In Spark this is using the collect method on an RDD.
>>>>>>>>
>>>>>>>> This would be for small amounts of data stored in a PCollection
>>>>>>>> like:
>>>>>>>>
>>>>>>>> PCollection<Long> count = partition.apply(Count.globally());
>>>>>>>> System.out.println(valuesincount);
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Jesse
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>

Re: Collecting Results to Executing Process

Posted by Jesse Anderson <je...@smokinghand.com>.
@amit, yes that's the reason I was looking for one. Students often want a
quick sanity check for their processing. The workaround now is to write out
the collection to a file and then open it.

On Fri, May 20, 2016, 12:15 AM Amit Sela <am...@gmail.com> wrote:

> For the Spark runner this will print to the driver stdout, which in some
> cases is the console you're running the job from (depends on execution
> modes - standalone/yarn-client), and in others it's the driver's stdout log
> (yarn-cluster, not sure about Mesos).
>
> Generally, if such PTransform is considered for the SDK, I think it should
> print to stdout in a centralized location (in Spark it's the driver node)
> and it should be capped by N to avoid OOM. I can definitely see this useful
> for development, even if it's mostly for IDE use, and I think this is what
> Jesse was looking for because debugging a PCollection's content in the IDE
> is useless unless you apply the runner's (implementation of) "collect"
> method to see what it contains.
>
> If we're looking for something to print the content of PCollection's
> partitions on the nodes, this could simply be a composite transform that
> iterates over the first N elements in the partition and logs their values.
>
> On Fri, May 20, 2016 at 8:56 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> I removed ConsoleIO in my latest PR since it was specific to Flink. It
>> was just a wrapper for a DoFn that prints to System.out, so it would be
>> super easy to implement in core Beam. I don't how, however, if this is
>> useful since for most runners this would print to some file on the worker
>> where the DoFn is running. This is only useful when running inside an IDE.
>>
>> On Fri, 20 May 2016 at 06:06 Frances Perry <fj...@google.com> wrote:
>>
>>> Looks like Flink and Spark have their own implementations of ConsoleIO
>>> but there's no SDK level version. Generally that goes against the goal of
>>> having generally useful IO connectors, with potential runner-specific
>>> overrides. Is there a generalized version we could add?
>>>
>>> On Thu, May 19, 2016 at 12:51 PM, Amit Sela <am...@gmail.com>
>>> wrote:
>>>
>>>> I'll join Frances and add that even Spark warns about using collect()
>>>> as it might not fit into Driver memory, and like you said it's only for
>>>> small datasets - but small is relative...
>>>> I you want some "printout" during development you could use ConsoleIO
>>>> (for streaming) - both Flink an Spark runner support this (and Beam doesn't
>>>> AFAIK).
>>>>
>>>> On Thu, May 19, 2016 at 9:49 PM Frances Perry <fj...@google.com> wrote:
>>>>
>>>>> Oh, and I forgot the unified model!  For unbounded collections, you
>>>>> can't ask to access the whole collection, given you'll be waiting forever
>>>>> ;-) Thomas has some work in progress on making PAssert support per-window
>>>>> assertions though, so it'll be able to handle that case in the future.
>>>>>
>>>>> On Thu, May 19, 2016 at 11:26 AM, Frances Perry <fj...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Nope. A Beam pipeline goes through three different phases:
>>>>>> (1) Graph construction -- from Pipeline.create() until
>>>>>> Pipeline.run(). During this phase PCollection are just placeholders --
>>>>>> their contents don't exist.
>>>>>> (2) Submission -- Pipeline.run() submits the pipeline to a runner for
>>>>>> execution.
>>>>>> (3) Execution -- The runner runs the pipeline. May continue after
>>>>>> Pipeline.run() returns, depending on the runner. Some PCollections may get
>>>>>> materialized during execution, but others may not (for example, if the
>>>>>> runner chooses to fuse or optimize them away).
>>>>>>
>>>>>> So in other words, there's no place in your main program when a
>>>>>> PCollection is guaranteed to have its contents available.
>>>>>>
>>>>>> If you are attempting to debug a job, you might look into PAssert
>>>>>> [1], which gives you a place to inspect the entire contents of a
>>>>>> PCollection and do something with it. In the DirectPipelineRunner that code
>>>>>> will run locally, so you'll see the output. But of course, in other runners
>>>>>> it may helpfully print on some random worker somewhere. But you can also
>>>>>> write tests that assert on its contents, which will work on any runner.
>>>>>> (This is how the @RunnableOnService tests work that we are setting up for
>>>>>> all runners.)
>>>>>>
>>>>>> We should get some documentation on testing put together on the Beam
>>>>>> site, but in the meantime the info on the Dataflow site [2] roughly applies
>>>>>> (DataflowAssert was renamed PAssert in Beam).
>>>>>>
>>>>>> Hope that helps!
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/incubator-beam/blob/eb682a80c4091dce5242ebc8272f43f3461a6fc5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
>>>>>> [2] https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
>>>>>>
>>>>>> On Wed, May 18, 2016 at 10:48 AM, Jesse Anderson <
>>>>>> jesse@smokinghand.com> wrote:
>>>>>>
>>>>>>> Is there a way to get the PCollection's results in the executing
>>>>>>> process? In Spark this is using the collect method on an RDD.
>>>>>>>
>>>>>>> This would be for small amounts of data stored in a PCollection like:
>>>>>>>
>>>>>>> PCollection<Long> count = partition.apply(Count.globally());
>>>>>>> System.out.println(valuesincount);
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Jesse
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>

Re: Collecting Results to Executing Process

Posted by Amit Sela <am...@gmail.com>.
For the Spark runner this will print to the driver stdout, which in some
cases is the console you're running the job from (depends on execution
modes - standalone/yarn-client), and in others it's the driver's stdout log
(yarn-cluster, not sure about Mesos).

Generally, if such PTransform is considered for the SDK, I think it should
print to stdout in a centralized location (in Spark it's the driver node)
and it should be capped by N to avoid OOM. I can definitely see this useful
for development, even if it's mostly for IDE use, and I think this is what
Jesse was looking for because debugging a PCollection's content in the IDE
is useless unless you apply the runner's (implementation of) "collect"
method to see what it contains.

If we're looking for something to print the content of PCollection's
partitions on the nodes, this could simply be a composite transform that
iterates over the first N elements in the partition and logs their values.

On Fri, May 20, 2016 at 8:56 AM Aljoscha Krettek <al...@apache.org>
wrote:

> I removed ConsoleIO in my latest PR since it was specific to Flink. It was
> just a wrapper for a DoFn that prints to System.out, so it would be super
> easy to implement in core Beam. I don't how, however, if this is useful
> since for most runners this would print to some file on the worker where
> the DoFn is running. This is only useful when running inside an IDE.
>
> On Fri, 20 May 2016 at 06:06 Frances Perry <fj...@google.com> wrote:
>
>> Looks like Flink and Spark have their own implementations of ConsoleIO
>> but there's no SDK level version. Generally that goes against the goal of
>> having generally useful IO connectors, with potential runner-specific
>> overrides. Is there a generalized version we could add?
>>
>> On Thu, May 19, 2016 at 12:51 PM, Amit Sela <am...@gmail.com> wrote:
>>
>>> I'll join Frances and add that even Spark warns about using collect() as
>>> it might not fit into Driver memory, and like you said it's only for small
>>> datasets - but small is relative...
>>> I you want some "printout" during development you could use ConsoleIO
>>> (for streaming) - both Flink an Spark runner support this (and Beam doesn't
>>> AFAIK).
>>>
>>> On Thu, May 19, 2016 at 9:49 PM Frances Perry <fj...@google.com> wrote:
>>>
>>>> Oh, and I forgot the unified model!  For unbounded collections, you
>>>> can't ask to access the whole collection, given you'll be waiting forever
>>>> ;-) Thomas has some work in progress on making PAssert support per-window
>>>> assertions though, so it'll be able to handle that case in the future.
>>>>
>>>> On Thu, May 19, 2016 at 11:26 AM, Frances Perry <fj...@google.com> wrote:
>>>>
>>>>> Nope. A Beam pipeline goes through three different phases:
>>>>> (1) Graph construction -- from Pipeline.create() until Pipeline.run().
>>>>> During this phase PCollection are just placeholders -- their contents don't
>>>>> exist.
>>>>> (2) Submission -- Pipeline.run() submits the pipeline to a runner for
>>>>> execution.
>>>>> (3) Execution -- The runner runs the pipeline. May continue after
>>>>> Pipeline.run() returns, depending on the runner. Some PCollections may get
>>>>> materialized during execution, but others may not (for example, if the
>>>>> runner chooses to fuse or optimize them away).
>>>>>
>>>>> So in other words, there's no place in your main program when a
>>>>> PCollection is guaranteed to have its contents available.
>>>>>
>>>>> If you are attempting to debug a job, you might look into PAssert [1],
>>>>> which gives you a place to inspect the entire contents of a PCollection and
>>>>> do something with it. In the DirectPipelineRunner that code will run
>>>>> locally, so you'll see the output. But of course, in other runners it may
>>>>> helpfully print on some random worker somewhere. But you can also write
>>>>> tests that assert on its contents, which will work on any runner. (This is
>>>>> how the @RunnableOnService tests work that we are setting up for all
>>>>> runners.)
>>>>>
>>>>> We should get some documentation on testing put together on the Beam
>>>>> site, but in the meantime the info on the Dataflow site [2] roughly applies
>>>>> (DataflowAssert was renamed PAssert in Beam).
>>>>>
>>>>> Hope that helps!
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/incubator-beam/blob/eb682a80c4091dce5242ebc8272f43f3461a6fc5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
>>>>> [2] https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
>>>>>
>>>>> On Wed, May 18, 2016 at 10:48 AM, Jesse Anderson <
>>>>> jesse@smokinghand.com> wrote:
>>>>>
>>>>>> Is there a way to get the PCollection's results in the executing
>>>>>> process? In Spark this is using the collect method on an RDD.
>>>>>>
>>>>>> This would be for small amounts of data stored in a PCollection like:
>>>>>>
>>>>>> PCollection<Long> count = partition.apply(Count.globally());
>>>>>> System.out.println(valuesincount);
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Jesse
>>>>>>
>>>>>
>>>>>
>>>>
>>

Re: Collecting Results to Executing Process

Posted by Aljoscha Krettek <al...@apache.org>.
I removed ConsoleIO in my latest PR since it was specific to Flink. It was
just a wrapper for a DoFn that prints to System.out, so it would be super
easy to implement in core Beam. I don't how, however, if this is useful
since for most runners this would print to some file on the worker where
the DoFn is running. This is only useful when running inside an IDE.

On Fri, 20 May 2016 at 06:06 Frances Perry <fj...@google.com> wrote:

> Looks like Flink and Spark have their own implementations of ConsoleIO but
> there's no SDK level version. Generally that goes against the goal of
> having generally useful IO connectors, with potential runner-specific
> overrides. Is there a generalized version we could add?
>
> On Thu, May 19, 2016 at 12:51 PM, Amit Sela <am...@gmail.com> wrote:
>
>> I'll join Frances and add that even Spark warns about using collect() as
>> it might not fit into Driver memory, and like you said it's only for small
>> datasets - but small is relative...
>> I you want some "printout" during development you could use ConsoleIO
>> (for streaming) - both Flink an Spark runner support this (and Beam doesn't
>> AFAIK).
>>
>> On Thu, May 19, 2016 at 9:49 PM Frances Perry <fj...@google.com> wrote:
>>
>>> Oh, and I forgot the unified model!  For unbounded collections, you
>>> can't ask to access the whole collection, given you'll be waiting forever
>>> ;-) Thomas has some work in progress on making PAssert support per-window
>>> assertions though, so it'll be able to handle that case in the future.
>>>
>>> On Thu, May 19, 2016 at 11:26 AM, Frances Perry <fj...@google.com> wrote:
>>>
>>>> Nope. A Beam pipeline goes through three different phases:
>>>> (1) Graph construction -- from Pipeline.create() until Pipeline.run().
>>>> During this phase PCollection are just placeholders -- their contents don't
>>>> exist.
>>>> (2) Submission -- Pipeline.run() submits the pipeline to a runner for
>>>> execution.
>>>> (3) Execution -- The runner runs the pipeline. May continue after
>>>> Pipeline.run() returns, depending on the runner. Some PCollections may get
>>>> materialized during execution, but others may not (for example, if the
>>>> runner chooses to fuse or optimize them away).
>>>>
>>>> So in other words, there's no place in your main program when a
>>>> PCollection is guaranteed to have its contents available.
>>>>
>>>> If you are attempting to debug a job, you might look into PAssert [1],
>>>> which gives you a place to inspect the entire contents of a PCollection and
>>>> do something with it. In the DirectPipelineRunner that code will run
>>>> locally, so you'll see the output. But of course, in other runners it may
>>>> helpfully print on some random worker somewhere. But you can also write
>>>> tests that assert on its contents, which will work on any runner. (This is
>>>> how the @RunnableOnService tests work that we are setting up for all
>>>> runners.)
>>>>
>>>> We should get some documentation on testing put together on the Beam
>>>> site, but in the meantime the info on the Dataflow site [2] roughly applies
>>>> (DataflowAssert was renamed PAssert in Beam).
>>>>
>>>> Hope that helps!
>>>>
>>>> [1]
>>>> https://github.com/apache/incubator-beam/blob/eb682a80c4091dce5242ebc8272f43f3461a6fc5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
>>>> [2] https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
>>>>
>>>> On Wed, May 18, 2016 at 10:48 AM, Jesse Anderson <jesse@smokinghand.com
>>>> > wrote:
>>>>
>>>>> Is there a way to get the PCollection's results in the executing
>>>>> process? In Spark this is using the collect method on an RDD.
>>>>>
>>>>> This would be for small amounts of data stored in a PCollection like:
>>>>>
>>>>> PCollection<Long> count = partition.apply(Count.globally());
>>>>> System.out.println(valuesincount);
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jesse
>>>>>
>>>>
>>>>
>>>
>

Re: Collecting Results to Executing Process

Posted by Frances Perry <fj...@google.com>.
Looks like Flink and Spark have their own implementations of ConsoleIO but
there's no SDK level version. Generally that goes against the goal of
having generally useful IO connectors, with potential runner-specific
overrides. Is there a generalized version we could add?

On Thu, May 19, 2016 at 12:51 PM, Amit Sela <am...@gmail.com> wrote:

> I'll join Frances and add that even Spark warns about using collect() as
> it might not fit into Driver memory, and like you said it's only for small
> datasets - but small is relative...
> I you want some "printout" during development you could use ConsoleIO (for
> streaming) - both Flink an Spark runner support this (and Beam doesn't
> AFAIK).
>
> On Thu, May 19, 2016 at 9:49 PM Frances Perry <fj...@google.com> wrote:
>
>> Oh, and I forgot the unified model!  For unbounded collections, you can't
>> ask to access the whole collection, given you'll be waiting forever ;-)
>> Thomas has some work in progress on making PAssert support per-window
>> assertions though, so it'll be able to handle that case in the future.
>>
>> On Thu, May 19, 2016 at 11:26 AM, Frances Perry <fj...@google.com> wrote:
>>
>>> Nope. A Beam pipeline goes through three different phases:
>>> (1) Graph construction -- from Pipeline.create() until Pipeline.run().
>>> During this phase PCollection are just placeholders -- their contents don't
>>> exist.
>>> (2) Submission -- Pipeline.run() submits the pipeline to a runner for
>>> execution.
>>> (3) Execution -- The runner runs the pipeline. May continue after
>>> Pipeline.run() returns, depending on the runner. Some PCollections may get
>>> materialized during execution, but others may not (for example, if the
>>> runner chooses to fuse or optimize them away).
>>>
>>> So in other words, there's no place in your main program when a
>>> PCollection is guaranteed to have its contents available.
>>>
>>> If you are attempting to debug a job, you might look into PAssert [1],
>>> which gives you a place to inspect the entire contents of a PCollection and
>>> do something with it. In the DirectPipelineRunner that code will run
>>> locally, so you'll see the output. But of course, in other runners it may
>>> helpfully print on some random worker somewhere. But you can also write
>>> tests that assert on its contents, which will work on any runner. (This is
>>> how the @RunnableOnService tests work that we are setting up for all
>>> runners.)
>>>
>>> We should get some documentation on testing put together on the Beam
>>> site, but in the meantime the info on the Dataflow site [2] roughly applies
>>> (DataflowAssert was renamed PAssert in Beam).
>>>
>>> Hope that helps!
>>>
>>> [1]
>>> https://github.com/apache/incubator-beam/blob/eb682a80c4091dce5242ebc8272f43f3461a6fc5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
>>> [2] https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
>>>
>>> On Wed, May 18, 2016 at 10:48 AM, Jesse Anderson <je...@smokinghand.com>
>>> wrote:
>>>
>>>> Is there a way to get the PCollection's results in the executing
>>>> process? In Spark this is using the collect method on an RDD.
>>>>
>>>> This would be for small amounts of data stored in a PCollection like:
>>>>
>>>> PCollection<Long> count = partition.apply(Count.globally());
>>>> System.out.println(valuesincount);
>>>>
>>>> Thanks,
>>>>
>>>> Jesse
>>>>
>>>
>>>
>>

Re: Collecting Results to Executing Process

Posted by Amit Sela <am...@gmail.com>.
I'll join Frances and add that even Spark warns about using collect() as it
might not fit into Driver memory, and like you said it's only for small
datasets - but small is relative...
I you want some "printout" during development you could use ConsoleIO (for
streaming) - both Flink an Spark runner support this (and Beam doesn't
AFAIK).

On Thu, May 19, 2016 at 9:49 PM Frances Perry <fj...@google.com> wrote:

> Oh, and I forgot the unified model!  For unbounded collections, you can't
> ask to access the whole collection, given you'll be waiting forever ;-)
> Thomas has some work in progress on making PAssert support per-window
> assertions though, so it'll be able to handle that case in the future.
>
> On Thu, May 19, 2016 at 11:26 AM, Frances Perry <fj...@google.com> wrote:
>
>> Nope. A Beam pipeline goes through three different phases:
>> (1) Graph construction -- from Pipeline.create() until Pipeline.run().
>> During this phase PCollection are just placeholders -- their contents don't
>> exist.
>> (2) Submission -- Pipeline.run() submits the pipeline to a runner for
>> execution.
>> (3) Execution -- The runner runs the pipeline. May continue after
>> Pipeline.run() returns, depending on the runner. Some PCollections may get
>> materialized during execution, but others may not (for example, if the
>> runner chooses to fuse or optimize them away).
>>
>> So in other words, there's no place in your main program when a
>> PCollection is guaranteed to have its contents available.
>>
>> If you are attempting to debug a job, you might look into PAssert [1],
>> which gives you a place to inspect the entire contents of a PCollection and
>> do something with it. In the DirectPipelineRunner that code will run
>> locally, so you'll see the output. But of course, in other runners it may
>> helpfully print on some random worker somewhere. But you can also write
>> tests that assert on its contents, which will work on any runner. (This is
>> how the @RunnableOnService tests work that we are setting up for all
>> runners.)
>>
>> We should get some documentation on testing put together on the Beam
>> site, but in the meantime the info on the Dataflow site [2] roughly applies
>> (DataflowAssert was renamed PAssert in Beam).
>>
>> Hope that helps!
>>
>> [1]
>> https://github.com/apache/incubator-beam/blob/eb682a80c4091dce5242ebc8272f43f3461a6fc5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
>> [2] https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
>>
>> On Wed, May 18, 2016 at 10:48 AM, Jesse Anderson <je...@smokinghand.com>
>> wrote:
>>
>>> Is there a way to get the PCollection's results in the executing
>>> process? In Spark this is using the collect method on an RDD.
>>>
>>> This would be for small amounts of data stored in a PCollection like:
>>>
>>> PCollection<Long> count = partition.apply(Count.globally());
>>> System.out.println(valuesincount);
>>>
>>> Thanks,
>>>
>>> Jesse
>>>
>>
>>
>

Re: Collecting Results to Executing Process

Posted by Frances Perry <fj...@google.com>.
Oh, and I forgot the unified model!  For unbounded collections, you can't
ask to access the whole collection, given you'll be waiting forever ;-)
Thomas has some work in progress on making PAssert support per-window
assertions though, so it'll be able to handle that case in the future.

On Thu, May 19, 2016 at 11:26 AM, Frances Perry <fj...@google.com> wrote:

> Nope. A Beam pipeline goes through three different phases:
> (1) Graph construction -- from Pipeline.create() until Pipeline.run().
> During this phase PCollection are just placeholders -- their contents don't
> exist.
> (2) Submission -- Pipeline.run() submits the pipeline to a runner for
> execution.
> (3) Execution -- The runner runs the pipeline. May continue after
> Pipeline.run() returns, depending on the runner. Some PCollections may get
> materialized during execution, but others may not (for example, if the
> runner chooses to fuse or optimize them away).
>
> So in other words, there's no place in your main program when a
> PCollection is guaranteed to have its contents available.
>
> If you are attempting to debug a job, you might look into PAssert [1],
> which gives you a place to inspect the entire contents of a PCollection and
> do something with it. In the DirectPipelineRunner that code will run
> locally, so you'll see the output. But of course, in other runners it may
> helpfully print on some random worker somewhere. But you can also write
> tests that assert on its contents, which will work on any runner. (This is
> how the @RunnableOnService tests work that we are setting up for all
> runners.)
>
> We should get some documentation on testing put together on the Beam site,
> but in the meantime the info on the Dataflow site [2] roughly applies
> (DataflowAssert was renamed PAssert in Beam).
>
> Hope that helps!
>
> [1]
> https://github.com/apache/incubator-beam/blob/eb682a80c4091dce5242ebc8272f43f3461a6fc5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
> [2] https://cloud.google.com/dataflow/pipelines/testing-your-pipeline
>
> On Wed, May 18, 2016 at 10:48 AM, Jesse Anderson <je...@smokinghand.com>
> wrote:
>
>> Is there a way to get the PCollection's results in the executing process?
>> In Spark this is using the collect method on an RDD.
>>
>> This would be for small amounts of data stored in a PCollection like:
>>
>> PCollection<Long> count = partition.apply(Count.globally());
>> System.out.println(valuesincount);
>>
>> Thanks,
>>
>> Jesse
>>
>
>

Re: Collecting Results to Executing Process

Posted by Frances Perry <fj...@google.com>.
Nope. A Beam pipeline goes through three different phases:
(1) Graph construction -- from Pipeline.create() until Pipeline.run().
During this phase PCollection are just placeholders -- their contents don't
exist.
(2) Submission -- Pipeline.run() submits the pipeline to a runner for
execution.
(3) Execution -- The runner runs the pipeline. May continue after
Pipeline.run() returns, depending on the runner. Some PCollections may get
materialized during execution, but others may not (for example, if the
runner chooses to fuse or optimize them away).

So in other words, there's no place in your main program when a PCollection
is guaranteed to have its contents available.

If you are attempting to debug a job, you might look into PAssert [1],
which gives you a place to inspect the entire contents of a PCollection and
do something with it. In the DirectPipelineRunner that code will run
locally, so you'll see the output. But of course, in other runners it may
helpfully print on some random worker somewhere. But you can also write
tests that assert on its contents, which will work on any runner. (This is
how the @RunnableOnService tests work that we are setting up for all
runners.)

We should get some documentation on testing put together on the Beam site,
but in the meantime the info on the Dataflow site [2] roughly applies
(DataflowAssert was renamed PAssert in Beam).

Hope that helps!

[1]
https://github.com/apache/incubator-beam/blob/eb682a80c4091dce5242ebc8272f43f3461a6fc5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
[2] https://cloud.google.com/dataflow/pipelines/testing-your-pipeline

On Wed, May 18, 2016 at 10:48 AM, Jesse Anderson <je...@smokinghand.com>
wrote:

> Is there a way to get the PCollection's results in the executing process?
> In Spark this is using the collect method on an RDD.
>
> This would be for small amounts of data stored in a PCollection like:
>
> PCollection<Long> count = partition.apply(Count.globally());
> System.out.println(valuesincount);
>
> Thanks,
>
> Jesse
>