You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jesse Anderson <je...@smokinghand.com> on 2016/05/18 00:40:41 UTC

DoFN Lambda

Is there a way to create a DoFN with a lambda function? The DoFN class
itself should support it, but the overloading of the ParDo.of causes
ambiguity for a lambda function. Is there a different way to accomplish
this?

To answer the why would you want this:

  static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
KV<String, KV<Instant, Long>>> {
    @Override
    public void processElement(DoFn<KV<String, Long>, KV<String,
KV<Instant, Long>>>.ProcessContext context) throws Exception {
      context.output(KV.of(context.element().getKey(),
KV.of(context.timestamp(), context.element().getValue())));
    }
  }

This is a DoFN that I wrote to enrich a PCollection with a the time
(Instant). I need the access to the ProcessContext to get the timestamp.
This would be much easier expressed with a lambda.

Thanks,

Jesse

Re: DoFN Lambda

Posted by Frances Perry <fj...@google.com>.
On Wed, May 18, 2016 at 8:03 AM, Jesse Anderson <je...@smokinghand.com>
wrote:

> The use case is from my thread yesterday on removing windowing. The output
> I wanted was:
> 54.148.33.jdj Hits:44 At:2015-03-31T04:00:29.999Z
> 54.148.33.jdj Hits:44 At:2015-03-31T04:00:59.999Z
> 54.148.33.jdj Hits:2 At:2015-03-31T04:01:29.999Z
> 107.22.225.dea Hits:18 At:2015-03-31T04:00:29.999Z
> 107.22.225.dea Hits:18 At:2015-03-31T04:00:59.999Z
> 107.22.225.dea Hits:1 At:2015-03-31T04:01:29.999Z
> 190.29.67.djc Hits:1 At:2015-03-31T04:00:29.999Z
> 190.29.67.djc Hits:1 At:2015-03-31T04:00:59.999Z
>
> In order to add the timestamps, I had to use a DoFN instead of
> a FlatMapElements. I needed to access the ProcessContext.
>

WithTimestamps?
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java


>
> In order to output the timestamps after the GroupByKey, I had to enrich
> the PCollection with the timestamps using a DoFN instead of a
> FlatMapElements. I needed to access the ProcessContext.
>

Yup -- so that's why I was suggesting we add ExtractTimestamps or something
as the dual of WithTimestamps, or perhaps generalize both bits of
functionality into Timestamps.set() and Timestamps.get().


>
> On Tue, May 17, 2016 at 9:19 PM Frances Perry <fj...@google.com> wrote:
>
>> Might make sense to generalize the WithTimestamps transform into
>> Timestamps.set(lambda) and Timestamps.extract() ? Though I'm not sure what
>> contain to use for the result of extracting the timestamp. It's kind of a
>> misuse of KV, but I'm not sure there's a better option in Java. What kinds
>> of things do you want to do with the timestamp once you extract it?
>>
>> On Tue, May 17, 2016 at 6:08 PM, Jesse Anderson <je...@smokinghand.com>
>> wrote:
>>
>>> Good point. Do you think there's any value to adding a transform that
>>> enriches a PCollection with the timestamp? The transform could also take a
>>> PCollection and make one of its Instant members the timestamp.
>>>
>>> On Tue, May 17, 2016 at 5:51 PM Ben Chambers <bc...@google.com>
>>> wrote:
>>>
>>>> DoFn is not a functional interface, so lambdas won't work with it. If
>>>> you look at MapElements and FlatMapElements, these are transforms built on
>>>> top of ParDo that allow passing a lambda. Unfortunately, due to issues with
>>>> type erasure and Java generics, when using a lambda it is necessary to
>>>> specify the output types as well. You can see an example of this in the
>>>> java8tests:
>>>>
>>>>
>>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49
>>>>
>>>> Unfortunately, this still won't work well for your use case since the
>>>> lambda doesn't have access to the timestamp.
>>>>
>>>> On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <je...@smokinghand.com>
>>>> wrote:
>>>>
>>>>> Is there a way to create a DoFN with a lambda function? The DoFN class
>>>>> itself should support it, but the overloading of the ParDo.of causes
>>>>> ambiguity for a lambda function. Is there a different way to accomplish
>>>>> this?
>>>>>
>>>>> To answer the why would you want this:
>>>>>
>>>>>   static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
>>>>> KV<String, KV<Instant, Long>>> {
>>>>>     @Override
>>>>>     public void processElement(DoFn<KV<String, Long>, KV<String,
>>>>> KV<Instant, Long>>>.ProcessContext context) throws Exception {
>>>>>       context.output(KV.of(context.element().getKey(),
>>>>> KV.of(context.timestamp(), context.element().getValue())));
>>>>>     }
>>>>>   }
>>>>>
>>>>> This is a DoFN that I wrote to enrich a PCollection with a the time
>>>>> (Instant). I need the access to the ProcessContext to get the timestamp.
>>>>> This would be much easier expressed with a lambda.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jesse
>>>>>
>>>>
>>

Re: DoFN Lambda

Posted by Jesse Anderson <je...@smokinghand.com>.
The use case is from my thread yesterday on removing windowing. The output
I wanted was:
54.148.33.jdj Hits:44 At:2015-03-31T04:00:29.999Z
54.148.33.jdj Hits:44 At:2015-03-31T04:00:59.999Z
54.148.33.jdj Hits:2 At:2015-03-31T04:01:29.999Z
107.22.225.dea Hits:18 At:2015-03-31T04:00:29.999Z
107.22.225.dea Hits:18 At:2015-03-31T04:00:59.999Z
107.22.225.dea Hits:1 At:2015-03-31T04:01:29.999Z
190.29.67.djc Hits:1 At:2015-03-31T04:00:29.999Z
190.29.67.djc Hits:1 At:2015-03-31T04:00:59.999Z

In order to add the timestamps, I had to use a DoFN instead of
a FlatMapElements. I needed to access the ProcessContext.

In order to output the timestamps after the GroupByKey, I had to enrich the
PCollection with the timestamps using a DoFN instead of a FlatMapElements. I
needed to access the ProcessContext.

On Tue, May 17, 2016 at 9:19 PM Frances Perry <fj...@google.com> wrote:

> Might make sense to generalize the WithTimestamps transform into
> Timestamps.set(lambda) and Timestamps.extract() ? Though I'm not sure what
> contain to use for the result of extracting the timestamp. It's kind of a
> misuse of KV, but I'm not sure there's a better option in Java. What kinds
> of things do you want to do with the timestamp once you extract it?
>
> On Tue, May 17, 2016 at 6:08 PM, Jesse Anderson <je...@smokinghand.com>
> wrote:
>
>> Good point. Do you think there's any value to adding a transform that
>> enriches a PCollection with the timestamp? The transform could also take a
>> PCollection and make one of its Instant members the timestamp.
>>
>> On Tue, May 17, 2016 at 5:51 PM Ben Chambers <bc...@google.com>
>> wrote:
>>
>>> DoFn is not a functional interface, so lambdas won't work with it. If
>>> you look at MapElements and FlatMapElements, these are transforms built on
>>> top of ParDo that allow passing a lambda. Unfortunately, due to issues with
>>> type erasure and Java generics, when using a lambda it is necessary to
>>> specify the output types as well. You can see an example of this in the
>>> java8tests:
>>>
>>>
>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49
>>>
>>> Unfortunately, this still won't work well for your use case since the
>>> lambda doesn't have access to the timestamp.
>>>
>>> On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <je...@smokinghand.com>
>>> wrote:
>>>
>>>> Is there a way to create a DoFN with a lambda function? The DoFN class
>>>> itself should support it, but the overloading of the ParDo.of causes
>>>> ambiguity for a lambda function. Is there a different way to accomplish
>>>> this?
>>>>
>>>> To answer the why would you want this:
>>>>
>>>>   static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
>>>> KV<String, KV<Instant, Long>>> {
>>>>     @Override
>>>>     public void processElement(DoFn<KV<String, Long>, KV<String,
>>>> KV<Instant, Long>>>.ProcessContext context) throws Exception {
>>>>       context.output(KV.of(context.element().getKey(),
>>>> KV.of(context.timestamp(), context.element().getValue())));
>>>>     }
>>>>   }
>>>>
>>>> This is a DoFN that I wrote to enrich a PCollection with a the time
>>>> (Instant). I need the access to the ProcessContext to get the timestamp.
>>>> This would be much easier expressed with a lambda.
>>>>
>>>> Thanks,
>>>>
>>>> Jesse
>>>>
>>>
>

Re: DoFN Lambda

Posted by Robert Bradshaw <ro...@google.com>.
On Tue, May 17, 2016 at 9:19 PM, Frances Perry <fj...@google.com> wrote:
> Might make sense to generalize the WithTimestamps transform into
> Timestamps.set(lambda) and Timestamps.extract() ? Though I'm not sure what
> contain to use for the result of extracting the timestamp. It's kind of a
> misuse of KV, but I'm not sure there's a better option in Java.

IIRC, we already have a TimestampedValue class in the SDK.

> What kinds
> of things do you want to do with the timestamp once you extract it?
>
> On Tue, May 17, 2016 at 6:08 PM, Jesse Anderson <je...@smokinghand.com>
> wrote:
>>
>> Good point. Do you think there's any value to adding a transform that
>> enriches a PCollection with the timestamp? The transform could also take a
>> PCollection and make one of its Instant members the timestamp.
>>
>> On Tue, May 17, 2016 at 5:51 PM Ben Chambers <bc...@google.com> wrote:
>>>
>>> DoFn is not a functional interface, so lambdas won't work with it. If you
>>> look at MapElements and FlatMapElements, these are transforms built on top
>>> of ParDo that allow passing a lambda. Unfortunately, due to issues with type
>>> erasure and Java generics, when using a lambda it is necessary to specify
>>> the output types as well. You can see an example of this in the java8tests:
>>>
>>>
>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49
>>>
>>> Unfortunately, this still won't work well for your use case since the
>>> lambda doesn't have access to the timestamp.
>>>
>>> On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <je...@smokinghand.com>
>>> wrote:
>>>>
>>>> Is there a way to create a DoFN with a lambda function? The DoFN class
>>>> itself should support it, but the overloading of the ParDo.of causes
>>>> ambiguity for a lambda function. Is there a different way to accomplish
>>>> this?
>>>>
>>>> To answer the why would you want this:
>>>>
>>>>   static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
>>>> KV<String, KV<Instant, Long>>> {
>>>>     @Override
>>>>     public void processElement(DoFn<KV<String, Long>, KV<String,
>>>> KV<Instant, Long>>>.ProcessContext context) throws Exception {
>>>>       context.output(KV.of(context.element().getKey(),
>>>> KV.of(context.timestamp(), context.element().getValue())));
>>>>     }
>>>>   }
>>>>
>>>> This is a DoFN that I wrote to enrich a PCollection with a the time
>>>> (Instant). I need the access to the ProcessContext to get the timestamp.
>>>> This would be much easier expressed with a lambda.
>>>>
>>>> Thanks,
>>>>
>>>> Jesse
>
>

Re: DoFN Lambda

Posted by Frances Perry <fj...@google.com>.
Might make sense to generalize the WithTimestamps transform into
Timestamps.set(lambda) and Timestamps.extract() ? Though I'm not sure what
contain to use for the result of extracting the timestamp. It's kind of a
misuse of KV, but I'm not sure there's a better option in Java. What kinds
of things do you want to do with the timestamp once you extract it?

On Tue, May 17, 2016 at 6:08 PM, Jesse Anderson <je...@smokinghand.com>
wrote:

> Good point. Do you think there's any value to adding a transform that
> enriches a PCollection with the timestamp? The transform could also take a
> PCollection and make one of its Instant members the timestamp.
>
> On Tue, May 17, 2016 at 5:51 PM Ben Chambers <bc...@google.com> wrote:
>
>> DoFn is not a functional interface, so lambdas won't work with it. If you
>> look at MapElements and FlatMapElements, these are transforms built on top
>> of ParDo that allow passing a lambda. Unfortunately, due to issues with
>> type erasure and Java generics, when using a lambda it is necessary to
>> specify the output types as well. You can see an example of this in the
>> java8tests:
>>
>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49
>>
>> Unfortunately, this still won't work well for your use case since the
>> lambda doesn't have access to the timestamp.
>>
>> On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <je...@smokinghand.com>
>> wrote:
>>
>>> Is there a way to create a DoFN with a lambda function? The DoFN class
>>> itself should support it, but the overloading of the ParDo.of causes
>>> ambiguity for a lambda function. Is there a different way to accomplish
>>> this?
>>>
>>> To answer the why would you want this:
>>>
>>>   static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
>>> KV<String, KV<Instant, Long>>> {
>>>     @Override
>>>     public void processElement(DoFn<KV<String, Long>, KV<String,
>>> KV<Instant, Long>>>.ProcessContext context) throws Exception {
>>>       context.output(KV.of(context.element().getKey(),
>>> KV.of(context.timestamp(), context.element().getValue())));
>>>     }
>>>   }
>>>
>>> This is a DoFN that I wrote to enrich a PCollection with a the time
>>> (Instant). I need the access to the ProcessContext to get the timestamp.
>>> This would be much easier expressed with a lambda.
>>>
>>> Thanks,
>>>
>>> Jesse
>>>
>>

Re: DoFN Lambda

Posted by Jesse Anderson <je...@smokinghand.com>.
Good point. Do you think there's any value to adding a transform that
enriches a PCollection with the timestamp? The transform could also take a
PCollection and make one of its Instant members the timestamp.

On Tue, May 17, 2016 at 5:51 PM Ben Chambers <bc...@google.com> wrote:

> DoFn is not a functional interface, so lambdas won't work with it. If you
> look at MapElements and FlatMapElements, these are transforms built on top
> of ParDo that allow passing a lambda. Unfortunately, due to issues with
> type erasure and Java generics, when using a lambda it is necessary to
> specify the output types as well. You can see an example of this in the
> java8tests:
>
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49
>
> Unfortunately, this still won't work well for your use case since the
> lambda doesn't have access to the timestamp.
>
> On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <je...@smokinghand.com>
> wrote:
>
>> Is there a way to create a DoFN with a lambda function? The DoFN class
>> itself should support it, but the overloading of the ParDo.of causes
>> ambiguity for a lambda function. Is there a different way to accomplish
>> this?
>>
>> To answer the why would you want this:
>>
>>   static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
>> KV<String, KV<Instant, Long>>> {
>>     @Override
>>     public void processElement(DoFn<KV<String, Long>, KV<String,
>> KV<Instant, Long>>>.ProcessContext context) throws Exception {
>>       context.output(KV.of(context.element().getKey(),
>> KV.of(context.timestamp(), context.element().getValue())));
>>     }
>>   }
>>
>> This is a DoFN that I wrote to enrich a PCollection with a the time
>> (Instant). I need the access to the ProcessContext to get the timestamp.
>> This would be much easier expressed with a lambda.
>>
>> Thanks,
>>
>> Jesse
>>
>

Re: DoFN Lambda

Posted by Ben Chambers <bc...@google.com>.
DoFn is not a functional interface, so lambdas won't work with it. If you
look at MapElements and FlatMapElements, these are transforms built on top
of ParDo that allow passing a lambda. Unfortunately, due to issues with
type erasure and Java generics, when using a lambda it is necessary to
specify the output types as well. You can see an example of this in the
java8tests:

https://github.com/apache/incubator-beam/blob/master/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java#L49

Unfortunately, this still won't work well for your use case since the
lambda doesn't have access to the timestamp.

On Tue, May 17, 2016 at 5:41 PM Jesse Anderson <je...@smokinghand.com>
wrote:

> Is there a way to create a DoFN with a lambda function? The DoFN class
> itself should support it, but the overloading of the ParDo.of causes
> ambiguity for a lambda function. Is there a different way to accomplish
> this?
>
> To answer the why would you want this:
>
>   static class EnrichWithTimestampFN extends DoFn<KV<String, Long>,
> KV<String, KV<Instant, Long>>> {
>     @Override
>     public void processElement(DoFn<KV<String, Long>, KV<String,
> KV<Instant, Long>>>.ProcessContext context) throws Exception {
>       context.output(KV.of(context.element().getKey(),
> KV.of(context.timestamp(), context.element().getValue())));
>     }
>   }
>
> This is a DoFN that I wrote to enrich a PCollection with a the time
> (Instant). I need the access to the ProcessContext to get the timestamp.
> This would be much easier expressed with a lambda.
>
> Thanks,
>
> Jesse
>