You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jesse Anderson <je...@smokinghand.com> on 2016/05/18 00:40:41 UTC
DoFN Lambda
Is there a way to create a DoFN with a lambda function? The DoFN class
itself should support it, but the overloading of the ParDo.of causes
ambiguity for a lambda function. Is there a different way to accomplish
this?
To answer the why would you want this:
static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
KV<String, KV<Instant, Long>>> {
@Override
public void processElement(DoFn<KV<String, Long>, KV<String,
KV<Instant, Long>>>.ProcessContext context) throws Exception {
context.output(KV.of(context.element().getKey(),
KV.of(context.timestamp(), context.element().getValue())));
}
}
This is a DoFN that I wrote to enrich a PCollection with a the time
(Instant). I need the access to the ProcessContext to get the timestamp.
This would be much easier expressed with a lambda.
Thanks,
Jesse
Re: DoFN Lambda
Posted by Frances Perry <fj...@google.com>.
On Wed, May 18, 2016 at 8:03 AM, Jesse Anderson <je...@smokinghand.com>
wrote:
> The use case is from my thread yesterday on removing windowing. The output
> I wanted was:
> 54.148.33.jdj Hits:44 At:2015-03-31T04:00:29.999Z
> 54.148.33.jdj Hits:44 At:2015-03-31T04:00:59.999Z
> 54.148.33.jdj Hits:2 At:2015-03-31T04:01:29.999Z
> 107.22.225.dea Hits:18 At:2015-03-31T04:00:29.999Z
> 107.22.225.dea Hits:18 At:2015-03-31T04:00:59.999Z
> 107.22.225.dea Hits:1 At:2015-03-31T04:01:29.999Z
> 190.29.67.djc Hits:1 At:2015-03-31T04:00:29.999Z
> 190.29.67.djc Hits:1 At:2015-03-31T04:00:59.999Z
>
> In order to add the timestamps, I had to use a DoFN instead of
> a FlatMapElements. I needed to access the ProcessContext.
>
WithTimestamps?
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
>
> In order to output the timestamps after the GroupByKey, I had to enrich
> the PCollection with the timestamps using a DoFN instead of a
> FlatMapElements. I needed to access the ProcessContext.
>
Yup -- so that's why I was suggesting we add ExtractTimestamps or something
as the dual of WithTimestamps, or perhaps generalize both bits of
functionality into Timestamps.set() and Timestamps.get().
>
> On Tue, May 17, 2016 at 9:19 PM Frances Perry <fj...@google.com> wrote:
>
>> Might make sense to generalize the WithTimestamps transform into
>> Timestamps.set(lambda) and Timestamps.extract() ? Though I'm not sure what
>> contain to use for the result of extracting the timestamp. It's kind of a
>> misuse of KV, but I'm not sure there's a better option in Java. What kinds
>> of things do you want to do with the timestamp once you extract it?
>>
>> On Tue, May 17, 2016 at 6:08 PM, Jesse Anderson <je...@smokinghand.com>
>> wrote:
>>
>>> Good point. Do you think there's any value to adding a transform that
>>> enriches a PCollection with the timestamp? The transform could also take a
>>> PCollection and make one of its Instant members the timestamp.
>>>
>>> On Tue, May 17, 2016 at 5:51 PM Ben Chambers <bc...@google.com>
>>> wrote:
>>>
>>>> DoFn is not a functional interface, so lambdas won't work with it. If
>>>> you look at MapElements and FlatMapElements, these are transforms built on
>>>> top of ParDo that allow passing a lambda. Unfortunately, due to issues with
>>>> type erasure and Java generics, when using a lambda it is necessary to
>>>> specify the output types as well. You can see an example of this in the
>>>> java8tests:
>>>>
>>>>
>>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49
>>>>
>>>> Unfortunately, this still won't work well for your use case since the
>>>> lambda doesn't have access to the timestamp.
>>>>
>>>> On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <je...@smokinghand.com>
>>>> wrote:
>>>>
>>>>> Is there a way to create a DoFN with a lambda function? The DoFN class
>>>>> itself should support it, but the overloading of the ParDo.of causes
>>>>> ambiguity for a lambda function. Is there a different way to accomplish
>>>>> this?
>>>>>
>>>>> To answer the why would you want this:
>>>>>
>>>>> static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
>>>>> KV<String, KV<Instant, Long>>> {
>>>>> @Override
>>>>> public void processElement(DoFn<KV<String, Long>, KV<String,
>>>>> KV<Instant, Long>>>.ProcessContext context) throws Exception {
>>>>> context.output(KV.of(context.element().getKey(),
>>>>> KV.of(context.timestamp(), context.element().getValue())));
>>>>> }
>>>>> }
>>>>>
>>>>> This is a DoFN that I wrote to enrich a PCollection with a the time
>>>>> (Instant). I need the access to the ProcessContext to get the timestamp.
>>>>> This would be much easier expressed with a lambda.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jesse
>>>>>
>>>>
>>
Re: DoFN Lambda
Posted by Jesse Anderson <je...@smokinghand.com>.
The use case is from my thread yesterday on removing windowing. The output
I wanted was:
54.148.33.jdj Hits:44 At:2015-03-31T04:00:29.999Z
54.148.33.jdj Hits:44 At:2015-03-31T04:00:59.999Z
54.148.33.jdj Hits:2 At:2015-03-31T04:01:29.999Z
107.22.225.dea Hits:18 At:2015-03-31T04:00:29.999Z
107.22.225.dea Hits:18 At:2015-03-31T04:00:59.999Z
107.22.225.dea Hits:1 At:2015-03-31T04:01:29.999Z
190.29.67.djc Hits:1 At:2015-03-31T04:00:29.999Z
190.29.67.djc Hits:1 At:2015-03-31T04:00:59.999Z
In order to add the timestamps, I had to use a DoFN instead of
a FlatMapElements. I needed to access the ProcessContext.
In order to output the timestamps after the GroupByKey, I had to enrich the
PCollection with the timestamps using a DoFN instead of a FlatMapElements. I
needed to access the ProcessContext.
On Tue, May 17, 2016 at 9:19 PM Frances Perry <fj...@google.com> wrote:
> Might make sense to generalize the WithTimestamps transform into
> Timestamps.set(lambda) and Timestamps.extract() ? Though I'm not sure what
> contain to use for the result of extracting the timestamp. It's kind of a
> misuse of KV, but I'm not sure there's a better option in Java. What kinds
> of things do you want to do with the timestamp once you extract it?
>
> On Tue, May 17, 2016 at 6:08 PM, Jesse Anderson <je...@smokinghand.com>
> wrote:
>
>> Good point. Do you think there's any value to adding a transform that
>> enriches a PCollection with the timestamp? The transform could also take a
>> PCollection and make one of its Instant members the timestamp.
>>
>> On Tue, May 17, 2016 at 5:51 PM Ben Chambers <bc...@google.com>
>> wrote:
>>
>>> DoFn is not a functional interface, so lambdas won't work with it. If
>>> you look at MapElements and FlatMapElements, these are transforms built on
>>> top of ParDo that allow passing a lambda. Unfortunately, due to issues with
>>> type erasure and Java generics, when using a lambda it is necessary to
>>> specify the output types as well. You can see an example of this in the
>>> java8tests:
>>>
>>>
>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49
>>>
>>> Unfortunately, this still won't work well for your use case since the
>>> lambda doesn't have access to the timestamp.
>>>
>>> On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <je...@smokinghand.com>
>>> wrote:
>>>
>>>> Is there a way to create a DoFN with a lambda function? The DoFN class
>>>> itself should support it, but the overloading of the ParDo.of causes
>>>> ambiguity for a lambda function. Is there a different way to accomplish
>>>> this?
>>>>
>>>> To answer the why would you want this:
>>>>
>>>> static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
>>>> KV<String, KV<Instant, Long>>> {
>>>> @Override
>>>> public void processElement(DoFn<KV<String, Long>, KV<String,
>>>> KV<Instant, Long>>>.ProcessContext context) throws Exception {
>>>> context.output(KV.of(context.element().getKey(),
>>>> KV.of(context.timestamp(), context.element().getValue())));
>>>> }
>>>> }
>>>>
>>>> This is a DoFN that I wrote to enrich a PCollection with a the time
>>>> (Instant). I need the access to the ProcessContext to get the timestamp.
>>>> This would be much easier expressed with a lambda.
>>>>
>>>> Thanks,
>>>>
>>>> Jesse
>>>>
>>>
>
Re: DoFN Lambda
Posted by Robert Bradshaw <ro...@google.com>.
On Tue, May 17, 2016 at 9:19 PM, Frances Perry <fj...@google.com> wrote:
> Might make sense to generalize the WithTimestamps transform into
> Timestamps.set(lambda) and Timestamps.extract() ? Though I'm not sure what
> contain to use for the result of extracting the timestamp. It's kind of a
> misuse of KV, but I'm not sure there's a better option in Java.
IIRC, we already have a TimestampedValue class in the SDK.
> What kinds
> of things do you want to do with the timestamp once you extract it?
>
> On Tue, May 17, 2016 at 6:08 PM, Jesse Anderson <je...@smokinghand.com>
> wrote:
>>
>> Good point. Do you think there's any value to adding a transform that
>> enriches a PCollection with the timestamp? The transform could also take a
>> PCollection and make one of its Instant members the timestamp.
>>
>> On Tue, May 17, 2016 at 5:51 PM Ben Chambers <bc...@google.com> wrote:
>>>
>>> DoFn is not a functional interface, so lambdas won't work with it. If you
>>> look at MapElements and FlatMapElements, these are transforms built on top
>>> of ParDo that allow passing a lambda. Unfortunately, due to issues with type
>>> erasure and Java generics, when using a lambda it is necessary to specify
>>> the output types as well. You can see an example of this in the java8tests:
>>>
>>>
>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49
>>>
>>> Unfortunately, this still won't work well for your use case since the
>>> lambda doesn't have access to the timestamp.
>>>
>>> On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <je...@smokinghand.com>
>>> wrote:
>>>>
>>>> Is there a way to create a DoFN with a lambda function? The DoFN class
>>>> itself should support it, but the overloading of the ParDo.of causes
>>>> ambiguity for a lambda function. Is there a different way to accomplish
>>>> this?
>>>>
>>>> To answer the why would you want this:
>>>>
>>>> static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
>>>> KV<String, KV<Instant, Long>>> {
>>>> @Override
>>>> public void processElement(DoFn<KV<String, Long>, KV<String,
>>>> KV<Instant, Long>>>.ProcessContext context) throws Exception {
>>>> context.output(KV.of(context.element().getKey(),
>>>> KV.of(context.timestamp(), context.element().getValue())));
>>>> }
>>>> }
>>>>
>>>> This is a DoFN that I wrote to enrich a PCollection with a the time
>>>> (Instant). I need the access to the ProcessContext to get the timestamp.
>>>> This would be much easier expressed with a lambda.
>>>>
>>>> Thanks,
>>>>
>>>> Jesse
>
>
Re: DoFN Lambda
Posted by Frances Perry <fj...@google.com>.
Might make sense to generalize the WithTimestamps transform into
Timestamps.set(lambda) and Timestamps.extract() ? Though I'm not sure what
contain to use for the result of extracting the timestamp. It's kind of a
misuse of KV, but I'm not sure there's a better option in Java. What kinds
of things do you want to do with the timestamp once you extract it?
On Tue, May 17, 2016 at 6:08 PM, Jesse Anderson <je...@smokinghand.com>
wrote:
> Good point. Do you think there's any value to adding a transform that
> enriches a PCollection with the timestamp? The transform could also take a
> PCollection and make one of its Instant members the timestamp.
>
> On Tue, May 17, 2016 at 5:51 PM Ben Chambers <bc...@google.com> wrote:
>
>> DoFn is not a functional interface, so lambdas won't work with it. If you
>> look at MapElements and FlatMapElements, these are transforms built on top
>> of ParDo that allow passing a lambda. Unfortunately, due to issues with
>> type erasure and Java generics, when using a lambda it is necessary to
>> specify the output types as well. You can see an example of this in the
>> java8tests:
>>
>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49
>>
>> Unfortunately, this still won't work well for your use case since the
>> lambda doesn't have access to the timestamp.
>>
>> On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <je...@smokinghand.com>
>> wrote:
>>
>>> Is there a way to create a DoFN with a lambda function? The DoFN class
>>> itself should support it, but the overloading of the ParDo.of causes
>>> ambiguity for a lambda function. Is there a different way to accomplish
>>> this?
>>>
>>> To answer the why would you want this:
>>>
>>> static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
>>> KV<String, KV<Instant, Long>>> {
>>> @Override
>>> public void processElement(DoFn<KV<String, Long>, KV<String,
>>> KV<Instant, Long>>>.ProcessContext context) throws Exception {
>>> context.output(KV.of(context.element().getKey(),
>>> KV.of(context.timestamp(), context.element().getValue())));
>>> }
>>> }
>>>
>>> This is a DoFN that I wrote to enrich a PCollection with a the time
>>> (Instant). I need the access to the ProcessContext to get the timestamp.
>>> This would be much easier expressed with a lambda.
>>>
>>> Thanks,
>>>
>>> Jesse
>>>
>>
Re: DoFN Lambda
Posted by Jesse Anderson <je...@smokinghand.com>.
Good point. Do you think there's any value to adding a transform that
enriches a PCollection with the timestamp? The transform could also take a
PCollection and make one of its Instant members the timestamp.
On Tue, May 17, 2016 at 5:51 PM Ben Chambers <bc...@google.com> wrote:
> DoFn is not a functional interface, so lambdas won't work with it. If you
> look at MapElements and FlatMapElements, these are transforms built on top
> of ParDo that allow passing a lambda. Unfortunately, due to issues with
> type erasure and Java generics, when using a lambda it is necessary to
> specify the output types as well. You can see an example of this in the
> java8tests:
>
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49
>
> Unfortunately, this still won't work well for your use case since the
> lambda doesn't have access to the timestamp.
>
> On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <je...@smokinghand.com>
> wrote:
>
>> Is there a way to create a DoFN with a lambda function? The DoFN class
>> itself should support it, but the overloading of the ParDo.of causes
>> ambiguity for a lambda function. Is there a different way to accomplish
>> this?
>>
>> To answer the why would you want this:
>>
>> static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
>> KV<String, KV<Instant, Long>>> {
>> @Override
>> public void processElement(DoFn<KV<String, Long>, KV<String,
>> KV<Instant, Long>>>.ProcessContext context) throws Exception {
>> context.output(KV.of(context.element().getKey(),
>> KV.of(context.timestamp(), context.element().getValue())));
>> }
>> }
>>
>> This is a DoFN that I wrote to enrich a PCollection with a the time
>> (Instant). I need the access to the ProcessContext to get the timestamp.
>> This would be much easier expressed with a lambda.
>>
>> Thanks,
>>
>> Jesse
>>
>
Re: DoFN Lambda
Posted by Ben Chambers <bc...@google.com>.
DoFn is not a functional interface, so lambdas won't work with it. If you
look at MapElements and FlatMapElements, these are transforms built on top
of ParDo that allow passing a lambda. Unfortunately, due to issues with
type erasure and Java generics, when using a lambda it is necessary to
specify the output types as well. You can see an example of this in the
java8tests:
https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49
Unfortunately, this still won't work well for your use case since the
lambda doesn't have access to the timestamp.
On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <je...@smokinghand.com>
wrote:
> Is there a way to create a DoFN with a lambda function? The DoFN class
> itself should support it, but the overloading of the ParDo.of causes
> ambiguity for a lambda function. Is there a different way to accomplish
> this?
>
> To answer the why would you want this:
>
> static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
> KV<String, KV<Instant, Long>>> {
> @Override
> public void processElement(DoFn<KV<String, Long>, KV<String,
> KV<Instant, Long>>>.ProcessContext context) throws Exception {
> context.output(KV.of(context.element().getKey(),
> KV.of(context.timestamp(), context.element().getValue())));
> }
> }
>
> This is a DoFN that I wrote to enrich a PCollection with a the time
> (Instant). I need the access to the ProcessContext to get the timestamp.
> This would be much easier expressed with a lambda.
>
> Thanks,
>
> Jesse
>