You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Tobias Feldhaus <To...@localsearch.ch> on 2017/02/17 15:51:28 UTC

Documentation: Side Input and Outputs

Hello,

It seems like the documentation [0] about side in- and outputs has missed some 
refactoring of the code. I can’t find Max.MaxIntFn() for example, but there is a 
new Max.ofIntegers() method that looks like it matches it. 

I would love to improve the documentation via a pull-request, but I don’t 
understand this concept well enough to feel confident doing so, yet.

My intention is to get a 

	PCollectionView<Long> numOfDays

Which holds the number of elements (days) of a given PCollection<DateTime> dates, 
and building it via a sum of counts (as a SingletonView).

Something that corresponds in my head to *wrong*:

	dates.apply("Count", Count.globally().asSingletonView());

Is it possible to access the side input outside of a DoFn?


Tobias

[0] https://beam.apache.org/documentation/programming-guide/#transforms-sideio


Re: Documentation: Side Input and Outputs

Posted by Tobias Feldhaus <To...@localsearch.ch>.
According to this [0] Stackoverflow answer, keying by day and grouping by key and then using that to infer the table name is not possible.
The only thing left would be using per window tables, which need an attached timestamp. But the TimeStamping seems to have changed as this
Approach [1] is not working in the Beam SDK for me anymore.

Is there any way to get gzipped JSON files from GCS through Dataflow into BQ into a partitioned or even date stamped table?

[0] http://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey#31165682
[1] https://cloud.google.com/dataflow/model/windowing#TimeStamping

On 19.02.17, 23:09, "Tobias Feldhaus" <To...@localsearch.ch>> wrote:


That would mean attaching a (date) timestamp (which is right now an open question for me, see “Assigning Timestamps” problem) to each element and windowing by day, or building a KV with the date and grouping by the key of the KV?

On 19.02.17, 22:24, "Ben Chambers" <bc...@apache.org>> wrote:

Could you instead key by day and then use per key computations and/or groups by key? Often it is easier to compute per key than to partition. The first is a simpler pipeline structure while partition requires more typically duplicated transform nodes.

On Sun, Feb 19, 2017, 1:20 PM Tobias Feldhaus <To...@localsearch.ch>> wrote:
And I think my approach will never work. I just RTFM again and it clearly states:

“You can, for example, pass the number of partitions as a command-line option at runtime
(which will then be used to build your pipeline graph), but you cannot determine the number
of partitions in mid-pipeline (based on data calculated after your pipeline graph is constructed, for instance).” [0]

Meaning that I cannot read my data in my pipeline to determine the number of days I want to partition it by to
write it to dated tables in BQ, or at least to write AVRO files (one per day) to GCS.

I guess I need two pipelines. This was something I wanted to avoid as it is making the thing more complicated, since
I need to chain the two and check if all from the first part are successfully done, before I can start phase two.

Or am I missing something here?

Tobi

[0] https://beam.apache.org/documentation/programming-guide/#transforms-flatten-partition

On 19.02.17, 21:41, "Tobias Feldhaus" <To...@localsearch.ch>> wrote:


Sorry for the formatting of the last mail, I basically killed the quoting.

I think I figured it out partially.

I am referring to this example [0] and tried to mimic the behaviour.

The code looks like this:



        Pipeline p = Pipeline.create(options);



        PCollection<ItLogLine> logLines = p.apply("Read logfile", TextIO.Read.from(bucket))

                .apply("Repartition", Repartition.of())

                .apply("Parse JSON", ParDo.of(new ReadObjects()))

                .apply("Extract timestamp", ParDo.of(new ExtractTimestamps()));



        PCollection<TableRow> output  = logLines.apply("Format Output", ParDo.of(new Outputter()));



        output.apply("Write to BigQueryService", BigQueryIO.Write

                .to("somedataset."+tableName)

                .withSchema(schema)

                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));



        PCollection<KV<Instant, Instant>> dates = logLines

                .apply("Extract Instant", ParDo.of(new GetDateFunction()))

                .apply(WithKeys.of(new SerializableFunction<Instant, Instant>() {

                    public Instant apply(Instant instant) { return instant; } }));



        final PCollectionView<Long> numOfDays =

                dates

                        .apply("GetDateTimes", Keys.<Instant>create())

                        .apply("DistinctDocs", Distinct.<Instant>create())

                        .apply("Count", Count.<Instant>globally())

                        .apply("View", View.<Long>asSingleton());



//       PCollectionList pCollectionPerDay = logLines.apply("Partition per Day", Partition.of ( I need to know the number of days here)



I was using Integer before in the PCollectionView and that was the issue, but it works with Long. Also,

I was using DateTime, which does not come with a DefaultCoder predefined, so I simply switched to
Instants (using the TlDf example [1], and WithKeys.of [2]).



I have two big open questions. In my code I was using c.outputWithTimestamp(itLogLine, timestampInstant), in a step, where I parsed

a string from a ItLogLine field into an instant and attached it to the element to use it later in the chain again.



Now, after switching to Beam from Dataflow, this stopped working. I assume that I was using it wrongly, since it says that



“If invoked from ProcessElement, the timestamp must not be older than the input element's timestamp minus DoFn#getAllowedTimestampSkew”



so I want to change my code to do the right thing - however, in the current documentation I cannot find the text that describes how to

assign timestamps, as the link is broken (“see Assigning Timestamps<https://beam.apache.org/documentation/programming-guide/#windowing> for more information on how to do so.” – the link goes to #Windowing).

My second question is about the PCollectionView – Dan said that it’s possible to use it as a SideInput, but is it also possible to use a sideinput when doing partitioning?
Intuitively I would like to get a single Long (numOfDays) and then use that to the Partition.of().





Have a nice Sunday,
Tobi



[0] https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L209

[1] https://github.com/apache/beam/blob/8183ac825d506a56421f41adc2e25b544f1bb80f/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L181

[2] https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/WithKeys



On 19.02.17, 14:45, "Tobias Feldhaus" <To...@localsearch.ch>> wrote:



    Hi Ken,



    thank you for your answer!







    On Fri, Feb 17, 2017 at 7:51 AM, Tobias Feldhaus <To...@localsearch.ch>> wrote:



    My intention is to get a



            PCollectionView<Long> numOfDays



    Which holds the number of elements (days) of a given PCollection<DateTime> dates,

    and building it via a sum of counts (as a SingletonView).



    Something that corresponds in my head to *wrong*:



            dates.apply("Count", Count.globally().asSingletonView());



    Seems about right.



    It seems about right to me too, but apparently I have a misconception about what is happening here in my head:



            PCollection<ItLogLine> logLines = p.apply("Read logfile", TextIO.Read.from(bucket))

                    .apply("Repartition", Repartition.of())

                    .apply("Parse JSON", ParDo.of(new ReadObjects()))

                    .apply("Extract timestamp", ParDo.of(new ExtractTimestamps()));



            PCollection<DateTime> dates = logLines

                    .apply("Get Dates", ParDo.of(new GetDateFunction()))

                    .apply("Get distinct Dates", Distinct.<DateTime>create());



            final PCollectionView<Long> numOfDays = dates.apply("Count", Count.globally().asSingletonView());



    This ends up in a type mismatch, is Count only applicable to certain types like String, Long, Integer, TableRow?

    Considering the code this would be counterintuitive as everything seems to be implemented using generic types in the SDK.



    I am still getting used to the Beam Programming Model and it confuses me still from time to time, sorry.



    Is it possible to access the side input outside of a DoFn?



    You can access a side input from a DoFn and now also from a CombineFnWithContext (a bit of an advanced feature).



    So getting a single calculated value to partition a PCollection (number of days in this case), should be done via the extractOutput [0]

    Method in this case?



    Best,

    Tobi



    [0] https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/transforms/CombineWithContext.CombineFnWithContext.html#extractOutput-AccumT-org.apache.beam.sdk.transforms.CombineWithContext.Context-







Re: Documentation: Side Input and Outputs

Posted by Tobias Feldhaus <To...@localsearch.ch>.
That would mean attaching a (date) timestamp (which is right now an open question for me, see “Assigning Timestamps” problem) to each element and windowing by day, or building a KV with the date and grouping by the key of the KV?

On 19.02.17, 22:24, "Ben Chambers" <bc...@apache.org>> wrote:

Could you instead key by day and then use per key computations and/or groups by key? Often it is easier to compute per key than to partition. The first is a simpler pipeline structure while partition requires more typically duplicated transform nodes.

On Sun, Feb 19, 2017, 1:20 PM Tobias Feldhaus <To...@localsearch.ch>> wrote:
And I think my approach will never work. I just RTFM again and it clearly states:

“You can, for example, pass the number of partitions as a command-line option at runtime
(which will then be used to build your pipeline graph), but you cannot determine the number
of partitions in mid-pipeline (based on data calculated after your pipeline graph is constructed, for instance).” [0]

Meaning that I cannot read my data in my pipeline to determine the number of days I want to partition it by to
write it to dated tables in BQ, or at least to write AVRO files (one per day) to GCS.

I guess I need two pipelines. This was something I wanted to avoid as it is making the thing more complicated, since
I need to chain the two and check if all from the first part are successfully done, before I can start phase two.

Or am I missing something here?

Tobi

[0] https://beam.apache.org/documentation/programming-guide/#transforms-flatten-partition

On 19.02.17, 21:41, "Tobias Feldhaus" <To...@localsearch.ch>> wrote:


Sorry for the formatting of the last mail, I basically killed the quoting.

I think I figured it out partially.

I am referring to this example [0] and tried to mimic the behaviour.

The code looks like this:



        Pipeline p = Pipeline.create(options);



        PCollection<ItLogLine> logLines = p.apply("Read logfile", TextIO.Read.from(bucket))

                .apply("Repartition", Repartition.of())

                .apply("Parse JSON", ParDo.of(new ReadObjects()))

                .apply("Extract timestamp", ParDo.of(new ExtractTimestamps()));



        PCollection<TableRow> output  = logLines.apply("Format Output", ParDo.of(new Outputter()));



        output.apply("Write to BigQueryService", BigQueryIO.Write

                .to("somedataset."+tableName)

                .withSchema(schema)

                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));



        PCollection<KV<Instant, Instant>> dates = logLines

                .apply("Extract Instant", ParDo.of(new GetDateFunction()))

                .apply(WithKeys.of(new SerializableFunction<Instant, Instant>() {

                    public Instant apply(Instant instant) { return instant; } }));



        final PCollectionView<Long> numOfDays =

                dates

                        .apply("GetDateTimes", Keys.<Instant>create())

                        .apply("DistinctDocs", Distinct.<Instant>create())

                        .apply("Count", Count.<Instant>globally())

                        .apply("View", View.<Long>asSingleton());



//       PCollectionList pCollectionPerDay = logLines.apply("Partition per Day", Partition.of ( I need to know the number of days here)



I was using Integer before in the PCollectionView and that was the issue, but it works with Long. Also,

I was using DateTime, which does not come with a DefaultCoder predefined, so I simply switched to
Instants (using the TlDf example [1], and WithKeys.of [2]).



I have two big open questions. In my code I was using c.outputWithTimestamp(itLogLine, timestampInstant), in a step, where I parsed

a string from a ItLogLine field into an instant and attached it to the element to use it later in the chain again.



Now, after switching to Beam from Dataflow, this stopped working. I assume that I was using it wrongly, since it says that



“If invoked from ProcessElement, the timestamp must not be older than the input element's timestamp minus DoFn#getAllowedTimestampSkew”



so I want to change my code to do the right thing - however, in the current documentation I cannot find the text that describes how to

assign timestamps, as the link is broken (“see Assigning Timestamps<https://beam.apache.org/documentation/programming-guide/#windowing> for more information on how to do so.” – the link goes to #Windowing).

My second question is about the PCollectionView – Dan said that it’s possible to use it as a SideInput, but is it also possible to use a sideinput when doing partitioning?
Intuitively I would like to get a single Long (numOfDays) and then use that to the Partition.of().





Have a nice Sunday,
Tobi



[0] https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L209

[1] https://github.com/apache/beam/blob/8183ac825d506a56421f41adc2e25b544f1bb80f/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L181

[2] https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/WithKeys



On 19.02.17, 14:45, "Tobias Feldhaus" <To...@localsearch.ch>> wrote:



    Hi Ken,



    thank you for your answer!







    On Fri, Feb 17, 2017 at 7:51 AM, Tobias Feldhaus <To...@localsearch.ch>> wrote:



    My intention is to get a



            PCollectionView<Long> numOfDays



    Which holds the number of elements (days) of a given PCollection<DateTime> dates,

    and building it via a sum of counts (as a SingletonView).



    Something that corresponds in my head to *wrong*:



            dates.apply("Count", Count.globally().asSingletonView());



    Seems about right.



    It seems about right to me too, but apparently I have a misconception about what is happening here in my head:



            PCollection<ItLogLine> logLines = p.apply("Read logfile", TextIO.Read.from(bucket))

                    .apply("Repartition", Repartition.of())

                    .apply("Parse JSON", ParDo.of(new ReadObjects()))

                    .apply("Extract timestamp", ParDo.of(new ExtractTimestamps()));



            PCollection<DateTime> dates = logLines

                    .apply("Get Dates", ParDo.of(new GetDateFunction()))

                    .apply("Get distinct Dates", Distinct.<DateTime>create());



            final PCollectionView<Long> numOfDays = dates.apply("Count", Count.globally().asSingletonView());



    This ends up in a type mismatch, is Count only applicable to certain types like String, Long, Integer, TableRow?

    Considering the code this would be counterintuitive as everything seems to be implemented using generic types in the SDK.



    I am still getting used to the Beam Programming Model and it confuses me still from time to time, sorry.



    Is it possible to access the side input outside of a DoFn?



    You can access a side input from a DoFn and now also from a CombineFnWithContext (a bit of an advanced feature).



    So getting a single calculated value to partition a PCollection (number of days in this case), should be done via the extractOutput [0]

    Method in this case?



    Best,

    Tobi



    [0] https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/transforms/CombineWithContext.CombineFnWithContext.html#extractOutput-AccumT-org.apache.beam.sdk.transforms.CombineWithContext.Context-







Re: Documentation: Side Input and Outputs

Posted by Ben Chambers <bc...@apache.org>.
Could you instead key by day and then use per key computations and/or
groups by key? Often it is easier to compute per key than to partition. The
first is a simpler pipeline structure while partition requires more
typically duplicated transform nodes.

On Sun, Feb 19, 2017, 1:20 PM Tobias Feldhaus <
Tobias.Feldhaus@localsearch.ch> wrote:

> And I think my approach will never work. I just RTFM again and it clearly
> states:
>
> “You can, for example, pass the number of partitions as a command-line
> option at runtime
>
> (which will then be used to build your pipeline graph), but you cannot
> determine the number
>
> of partitions in mid-pipeline (based on data calculated after your
> pipeline graph is constructed, for instance).” [0]
>
>
>
> Meaning that I cannot read my data in my pipeline to determine the number
> of days I want to partition it by to
>
> write it to dated tables in BQ, or at least to write AVRO files (one per
> day) to GCS.
>
>
>
> I guess I need two pipelines. This was something I wanted to avoid as it
> is making the thing more complicated, since
>
> I need to chain the two and check if all from the first part are
> successfully done, before I can start phase two.
>
>
>
> Or am I missing something here?
>
> Tobi
>
>
>
> [0]
> https://beam.apache.org/documentation/programming-guide/#transforms-flatten-partition
>
>
>
> On 19.02.17, 21:41, "Tobias Feldhaus" <To...@localsearch.ch>
> wrote:
>
>
>
> Sorry for the formatting of the last mail, I basically killed the quoting.
>
> I think I figured it out partially.
>
> I am referring to this example [0] and tried to mimic the behaviour.
>
> The code looks like this:
>
>
>
>         Pipeline p = Pipeline.create(options);
>
>
>
>         PCollection<ItLogLine> logLines = p.apply("Read logfile",
> TextIO.Read.from(bucket))
>
>                 .apply("Repartition", Repartition.of())
>
>                 .apply("Parse JSON", ParDo.of(new ReadObjects()))
>
>                 .apply("Extract timestamp", ParDo.of(new
> ExtractTimestamps()));
>
>
>
>         PCollection<TableRow> output  = logLines.apply("Format Output",
> ParDo.of(new Outputter()));
>
>
>
>         output.apply("Write to BigQueryService", BigQueryIO.Write
>
>                 .to("somedataset."+tableName)
>
>                 .withSchema(schema)
>
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>
>
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>
>
>
>         PCollection<KV<Instant, Instant>> dates = logLines
>
>                 .apply("Extract Instant", ParDo.of(new GetDateFunction()))
>
>                 .apply(WithKeys.of(new SerializableFunction<Instant,
> Instant>() {
>
>                     public Instant apply(Instant instant) { return
> instant; } }));
>
>
>
>         final PCollectionView<Long> numOfDays =
>
>                 dates
>
>                         .apply("GetDateTimes", Keys.<Instant>create())
>
>                         .apply("DistinctDocs", Distinct.<Instant>create())
>
>                         .apply("Count", Count.<Instant>globally())
>
>                         .apply("View", View.<Long>asSingleton());
>
>
>
> //       PCollectionList pCollectionPerDay = logLines.apply("Partition per
> Day", Partition.of ( I need to know the number of days here)
>
>
>
> I was using Integer before in the PCollectionView and that was the issue,
> but it works with Long. Also,
>
> I was using DateTime, which does not come with a DefaultCoder predefined,
> so I simply switched to
> Instants (using the TlDf example [1], and WithKeys.of [2]).
>
>
>
> I have two big open questions. In my code I was using
> c.outputWithTimestamp(itLogLine, timestampInstant), in a step, where I
> parsed
>
> a string from a ItLogLine field into an instant and attached it to the
> element to use it later in the chain again.
>
>
>
> Now, after switching to Beam from Dataflow, this stopped working. I assume
> that I was using it wrongly, since it says that
>
>
>
> “*If invoked from ProcessElement, the timestamp must not be older than
> the input element's timestamp minus DoFn#getAllowedTimestampSkew”*
>
>
>
> so I want to change my code to do the right thing - however, in the
> current documentation I cannot find the text that describes how to
>
> assign timestamps, as the link is broken (“see Assigning Timestamps
> <https://beam.apache.org/documentation/programming-guide/#windowing> for
> more information on how to do so.” – the link goes to #Windowing).
>
> My second question is about the PCollectionView – Dan said that it’s
> possible to use it as a SideInput, but is it also possible to use a
> sideinput when doing partitioning?
> Intuitively I would like to get a single Long (numOfDays) and then use
> that to the Partition.of().
>
>
>
>
>
> Have a nice Sunday,
> Tobi
>
>
>
> [0]
> https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L209
>
> [1]
> https://github.com/apache/beam/blob/8183ac825d506a56421f41adc2e25b544f1bb80f/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L181
>
> [2]
> https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/WithKeys
>
>
>
> On 19.02.17, 14:45, "Tobias Feldhaus" <To...@localsearch.ch>
> wrote:
>
>
>
>     Hi Ken,
>
>
>
>     thank you for your answer!
>
>
>
>
>
>
>
>     On Fri, Feb 17, 2017 at 7:51 AM, Tobias Feldhaus <
> Tobias.Feldhaus@localsearch.ch> wrote:
>
>
>
>     My intention is to get a
>
>
>
>             PCollectionView<Long> numOfDays
>
>
>
>     Which holds the number of elements (days) of a given
> PCollection<DateTime> dates,
>
>     and building it via a sum of counts (as a SingletonView).
>
>
>
>     Something that corresponds in my head to *wrong*:
>
>
>
>             dates.apply("Count", Count.globally().asSingletonView());
>
>
>
>     Seems about right.
>
>
>
>     It seems about right to me too, but apparently I have a misconception
> about what is happening here in my head:
>
>
>
>             PCollection<ItLogLine> logLines = p.apply("Read logfile",
> TextIO.Read.from(bucket))
>
>                     .apply("Repartition", Repartition.of())
>
>                     .apply("Parse JSON", ParDo.of(new ReadObjects()))
>
>                     .apply("Extract timestamp", ParDo.of(new
> ExtractTimestamps()));
>
>
>
>             PCollection<DateTime> dates = logLines
>
>                     .apply("Get Dates", ParDo.of(new GetDateFunction()))
>
>                     .apply("Get distinct Dates",
> Distinct.<DateTime>create());
>
>
>
>             final PCollectionView<Long> numOfDays = dates.apply("Count",
> Count.globally().asSingletonView());
>
>
>
>     This ends up in a type mismatch, is Count only applicable to certain
> types like String, Long, Integer, TableRow?
>
>     Considering the code this would be counterintuitive as everything
> seems to be implemented using generic types in the SDK.
>
>
>
>     I am still getting used to the Beam Programming Model and it confuses
> me still from time to time, sorry.
>
>
>
>     Is it possible to access the side input outside of a DoFn?
>
>
>
>     You can access a side input from a DoFn and now also from a
> CombineFnWithContext (a bit of an advanced feature).
>
>
>
>     So getting a single calculated value to partition a PCollection
> (number of days in this case), should be done via the extractOutput [0]
>
>     Method in this case?
>
>
>
>     Best,
>
>     Tobi
>
>
>
>     [0]
> https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/transforms/CombineWithContext.CombineFnWithContext.html#extractOutput-AccumT-org.apache.beam.sdk.transforms.CombineWithContext.Context-
>
>
>
>
>
>
>

Re: Documentation: Side Input and Outputs

Posted by Tobias Feldhaus <To...@localsearch.ch>.
And I think my approach will never work. I just RTFM again and it clearly states:

“You can, for example, pass the number of partitions as a command-line option at runtime
(which will then be used to build your pipeline graph), but you cannot determine the number
of partitions in mid-pipeline (based on data calculated after your pipeline graph is constructed, for instance).” [0]

Meaning that I cannot read my data in my pipeline to determine the number of days I want to partition it by to
write it to dated tables in BQ, or at least to write AVRO files (one per day) to GCS.

I guess I need two pipelines. This was something I wanted to avoid as it is making the thing more complicated, since
I need to chain the two and check if all from the first part are successfully done, before I can start phase two.

Or am I missing something here?

Tobi

[0] https://beam.apache.org/documentation/programming-guide/#transforms-flatten-partition

On 19.02.17, 21:41, "Tobias Feldhaus" <To...@localsearch.ch>> wrote:


Sorry for the formatting of the last mail, I basically killed the quoting.

I think I figured it out partially.

I am referring to this example [0] and tried to mimic the behaviour.

The code looks like this:



        Pipeline p = Pipeline.create(options);



        PCollection<ItLogLine> logLines = p.apply("Read logfile", TextIO.Read.from(bucket))

                .apply("Repartition", Repartition.of())

                .apply("Parse JSON", ParDo.of(new ReadObjects()))

                .apply("Extract timestamp", ParDo.of(new ExtractTimestamps()));



        PCollection<TableRow> output  = logLines.apply("Format Output", ParDo.of(new Outputter()));



        output.apply("Write to BigQueryService", BigQueryIO.Write

                .to("somedataset."+tableName)

                .withSchema(schema)

                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));



        PCollection<KV<Instant, Instant>> dates = logLines

                .apply("Extract Instant", ParDo.of(new GetDateFunction()))

                .apply(WithKeys.of(new SerializableFunction<Instant, Instant>() {

                    public Instant apply(Instant instant) { return instant; } }));



        final PCollectionView<Long> numOfDays =

                dates

                        .apply("GetDateTimes", Keys.<Instant>create())

                        .apply("DistinctDocs", Distinct.<Instant>create())

                        .apply("Count", Count.<Instant>globally())

                        .apply("View", View.<Long>asSingleton());



//       PCollectionList pCollectionPerDay = logLines.apply("Partition per Day", Partition.of ( I need to know the number of days here)



I was using Integer before in the PCollectionView and that was the issue, but it works with Long. Also,

I was using DateTime, which does not come with a DefaultCoder predefined, so I simply switched to
Instants (using the TlDf example [1], and WithKeys.of [2]).



I have two big open questions. In my code I was using c.outputWithTimestamp(itLogLine, timestampInstant), in a step, where I parsed

a string from a ItLogLine field into an instant and attached it to the element to use it later in the chain again.



Now, after switching to Beam from Dataflow, this stopped working. I assume that I was using it wrongly, since it says that



“If invoked from ProcessElement, the timestamp must not be older than the input element's timestamp minus DoFn#getAllowedTimestampSkew”



so I want to change my code to do the right thing - however, in the current documentation I cannot find the text that describes how to

assign timestamps, as the link is broken (“see Assigning Timestamps<https://beam.apache.org/documentation/programming-guide/#windowing> for more information on how to do so.” – the link goes to #Windowing).

My second question is about the PCollectionView – Dan said that it’s possible to use it as a SideInput, but is it also possible to use a sideinput when doing partitioning?
Intuitively I would like to get a single Long (numOfDays) and then use that to the Partition.of().





Have a nice Sunday,
Tobi



[0] https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L209

[1] https://github.com/apache/beam/blob/8183ac825d506a56421f41adc2e25b544f1bb80f/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L181

[2] https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/WithKeys



On 19.02.17, 14:45, "Tobias Feldhaus" <To...@localsearch.ch> wrote:



    Hi Ken,



    thank you for your answer!







    On Fri, Feb 17, 2017 at 7:51 AM, Tobias Feldhaus <To...@localsearch.ch> wrote:



    My intention is to get a



            PCollectionView<Long> numOfDays



    Which holds the number of elements (days) of a given PCollection<DateTime> dates,

    and building it via a sum of counts (as a SingletonView).



    Something that corresponds in my head to *wrong*:



            dates.apply("Count", Count.globally().asSingletonView());



    Seems about right.



    It seems about right to me too, but apparently I have a misconception about what is happening here in my head:



            PCollection<ItLogLine> logLines = p.apply("Read logfile", TextIO.Read.from(bucket))

                    .apply("Repartition", Repartition.of())

                    .apply("Parse JSON", ParDo.of(new ReadObjects()))

                    .apply("Extract timestamp", ParDo.of(new ExtractTimestamps()));



            PCollection<DateTime> dates = logLines

                    .apply("Get Dates", ParDo.of(new GetDateFunction()))

                    .apply("Get distinct Dates", Distinct.<DateTime>create());



            final PCollectionView<Long> numOfDays = dates.apply("Count", Count.globally().asSingletonView());



    This ends up in a type mismatch, is Count only applicable to certain types like String, Long, Integer, TableRow?

    Considering the code this would be counterintuitive as everything seems to be implemented using generic types in the SDK.



    I am still getting used to the Beam Programming Model and it confuses me still from time to time, sorry.



    Is it possible to access the side input outside of a DoFn?



    You can access a side input from a DoFn and now also from a CombineFnWithContext (a bit of an advanced feature).



    So getting a single calculated value to partition a PCollection (number of days in this case), should be done via the extractOutput [0]

    Method in this case?



    Best,

    Tobi



    [0] https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/transforms/CombineWithContext.CombineFnWithContext.html#extractOutput-AccumT-org.apache.beam.sdk.transforms.CombineWithContext.Context-







Re: Documentation: Side Input and Outputs

Posted by Tobias Feldhaus <To...@localsearch.ch>.
Sorry for the formatting of the last mail, I basically killed the quoting.

I think I figured it out partially.

I am referring to this example [0] and tried to mimic the behaviour.

The code looks like this:



        Pipeline p = Pipeline.create(options);



        PCollection<ItLogLine> logLines = p.apply("Read logfile", TextIO.Read.from(bucket))

                .apply("Repartition", Repartition.of())

                .apply("Parse JSON", ParDo.of(new ReadObjects()))

                .apply("Extract timestamp", ParDo.of(new ExtractTimestamps()));



        PCollection<TableRow> output  = logLines.apply("Format Output", ParDo.of(new Outputter()));



        output.apply("Write to BigQueryService", BigQueryIO.Write

                .to("somedataset."+tableName)

                .withSchema(schema)

                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));



        PCollection<KV<Instant, Instant>> dates = logLines

                .apply("Extract Instant", ParDo.of(new GetDateFunction()))

                .apply(WithKeys.of(new SerializableFunction<Instant, Instant>() {

                    public Instant apply(Instant instant) { return instant; } }));



        final PCollectionView<Long> numOfDays =

                dates

                        .apply("GetDateTimes", Keys.<Instant>create())

                        .apply("DistinctDocs", Distinct.<Instant>create())

                        .apply("Count", Count.<Instant>globally())

                        .apply("View", View.<Long>asSingleton());



//       PCollectionList pCollectionPerDay = logLines.apply("Partition per Day", Partition.of ( I need to know the number of days here)



I was using Integer before in the PCollectionView and that was the issue, but it works with Long. Also,

I was using DateTime, which does not come with a DefaultCoder predefined, so I simply switched to
Instants (using the TlDf example [1], and WithKeys.of [2]).



I have two big open questions. In my code I was using c.outputWithTimestamp(itLogLine, timestampInstant), in a step, where I parsed

a string from a ItLogLine field into an instant and attached it to the element to use it later in the chain again.



Now, after switching to Beam from Dataflow, this stopped working. I assume that I was using it wrongly, since it says that



“If invoked from ProcessElement, the timestamp must not be older than the input element's timestamp minus DoFn#getAllowedTimestampSkew”



so I want to change my code to do the right thing - however, in the current documentation I cannot find the text that describes how to

assign timestamps, as the link is broken (“see Assigning Timestamps<https://beam.apache.org/documentation/programming-guide/#windowing> for more information on how to do so.” – the link goes to #Windowing).

My second question is about the PCollectionView – Dan said that it’s possible to use it as a SideInput, but is it also possible to use a sideinput when doing partitioning?
Intuitively I would like to get a single Long (numOfDays) and then use that to the Partition.of().





Have a nice Sunday,
Tobi



[0] https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L209

[1] https://github.com/apache/beam/blob/8183ac825d506a56421f41adc2e25b544f1bb80f/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java#L181

[2] https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/WithKeys



On 19.02.17, 14:45, "Tobias Feldhaus" <To...@localsearch.ch> wrote:



    Hi Ken,



    thank you for your answer!







    On Fri, Feb 17, 2017 at 7:51 AM, Tobias Feldhaus <To...@localsearch.ch> wrote:



    My intention is to get a



            PCollectionView<Long> numOfDays



    Which holds the number of elements (days) of a given PCollection<DateTime> dates,

    and building it via a sum of counts (as a SingletonView).



    Something that corresponds in my head to *wrong*:



            dates.apply("Count", Count.globally().asSingletonView());



    Seems about right.



    It seems about right to me too, but apparently I have a misconception about what is happening here in my head:



            PCollection<ItLogLine> logLines = p.apply("Read logfile", TextIO.Read.from(bucket))

                    .apply("Repartition", Repartition.of())

                    .apply("Parse JSON", ParDo.of(new ReadObjects()))

                    .apply("Extract timestamp", ParDo.of(new ExtractTimestamps()));



            PCollection<DateTime> dates = logLines

                    .apply("Get Dates", ParDo.of(new GetDateFunction()))

                    .apply("Get distinct Dates", Distinct.<DateTime>create());



            final PCollectionView<Long> numOfDays = dates.apply("Count", Count.globally().asSingletonView());



    This ends up in a type mismatch, is Count only applicable to certain types like String, Long, Integer, TableRow?

    Considering the code this would be counterintuitive as everything seems to be implemented using generic types in the SDK.



    I am still getting used to the Beam Programming Model and it confuses me still from time to time, sorry.



    Is it possible to access the side input outside of a DoFn?



    You can access a side input from a DoFn and now also from a CombineFnWithContext (a bit of an advanced feature).



    So getting a single calculated value to partition a PCollection (number of days in this case), should be done via the extractOutput [0]

    Method in this case?



    Best,

    Tobi



    [0] https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/transforms/CombineWithContext.CombineFnWithContext.html#extractOutput-AccumT-org.apache.beam.sdk.transforms.CombineWithContext.Context-







Re: Documentation: Side Input and Outputs

Posted by Tobias Feldhaus <To...@localsearch.ch>.
Hi Ken,

thank you for your answer!



On Fri, Feb 17, 2017 at 7:51 AM, Tobias Feldhaus <To...@localsearch.ch> wrote:

My intention is to get a

        PCollectionView<Long> numOfDays

Which holds the number of elements (days) of a given PCollection<DateTime> dates,
and building it via a sum of counts (as a SingletonView).

Something that corresponds in my head to *wrong*:

        dates.apply("Count", Count.globally().asSingletonView());

Seems about right.

It seems about right to me too, but apparently I have a misconception about what is happening here in my head:

        PCollection<ItLogLine> logLines = p.apply("Read logfile", TextIO.Read.from(bucket))
                .apply("Repartition", Repartition.of())
                .apply("Parse JSON", ParDo.of(new ReadObjects()))
                .apply("Extract timestamp", ParDo.of(new ExtractTimestamps()));

        PCollection<DateTime> dates = logLines
                .apply("Get Dates", ParDo.of(new GetDateFunction()))
                .apply("Get distinct Dates", Distinct.<DateTime>create());
        
        final PCollectionView<Long> numOfDays = dates.apply("Count", Count.globally().asSingletonView());

This ends up in a type mismatch, is Count only applicable to certain types like String, Long, Integer, TableRow? 
Considering the code this would be counterintuitive as everything seems to be implemented using generic types in the SDK. 

I am still getting used to the Beam Programming Model and it confuses me still from time to time, sorry.
 
Is it possible to access the side input outside of a DoFn?

You can access a side input from a DoFn and now also from a CombineFnWithContext (a bit of an advanced feature).

So getting a single calculated value to partition a PCollection (number of days in this case), should be done via the extractOutput [0] 
Method in this case?

Best,
Tobi

[0] https://beam.apache.org/documentation/sdks/javadoc/0.5.0/org/apache/beam/sdk/transforms/CombineWithContext.CombineFnWithContext.html#extractOutput-AccumT-org.apache.beam.sdk.transforms.CombineWithContext.Context-



Re: Documentation: Side Input and Outputs

Posted by Kenneth Knowles <kl...@google.com>.
Hi Tobias,

On Fri, Feb 17, 2017 at 7:51 AM, Tobias Feldhaus <
Tobias.Feldhaus@localsearch.ch> wrote:

> It seems like the documentation [0] about side in- and outputs has missed
> some
> refactoring of the code. I can’t find Max.MaxIntFn() for example, but
> there is a
> new Max.ofIntegers() method that looks like it matches it.
>

You have it correct.

I would love to improve the documentation via a pull-request, but I don’t
> understand this concept well enough to feel confident doing so, yet.
>

We love documentation-improving pull requests!

My intention is to get a
>
>         PCollectionView<Long> numOfDays
>
> Which holds the number of elements (days) of a given PCollection<DateTime>
> dates,
> and building it via a sum of counts (as a SingletonView).
>
> Something that corresponds in my head to *wrong*:
>
>         dates.apply("Count", Count.globally().asSingletonView());
>

Seems about right.


> Is it possible to access the side input outside of a DoFn?
>

You can access a side input from a DoFn and now also from a
CombineFnWithContext (a bit of an advanced feature).

Kenn