You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Hemali Sutaria <hs...@paloaltonetworks.com> on 2021/02/18 14:53:42 UTC

BEAM-6855

Hi,

I have one question. This is *kind of a blocker for our upcoming release*.
It would be great if you could reply at your earliest convenience.

My dataflow pipeline is stateful. I am using Beam SDK for stateful
processing (StateId, ValueState). I have also implemented OnTimer for my
stateful transformation. On every dataflow start, I want to read from
CloudSQL and build the cache. For that, I need to provide the pre-built
cache as side-input to my current transform. But, it looks like there is
some issue when I add side input to my stateful transform. I think I am
hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855).
Is there any workaround? Any help would be appreciated.

Following is my definition of Transforms. I am using 2.23.0 beam SDK. I am
using GlobalWindow.

private class GetLatestState extends DoFn<KV<DataTunnelStatusKey,
DataTunnelStatus>, DataTunnelStateRelational> {
    @TimerId("tunnelStatusExpiryTimer")
    private final TimerSpec tunnelStatusExpiryTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @StateId("tunnelStatus")
    private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
            StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));

@ProcessElement
public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
                    MultiOutputReceiver out,
                    @StateId("tunnelStatus")
ValueState<DataTunnelStatus> tunnelStatusCache,
                    @TimerId("tunnelStatusExpiryTimer") Timer
tunnelStatusExpiryTimer,
                    ProcessContext c)



Thanks,
Hemali Sutaria

Re: BEAM-6855

Posted by Kenneth Knowles <kl...@google.com>.
Another workaround might be to create a PCollection that is the tagged
union of the main input and the side input. I think you can avoid
per-element overhead of checking which input they are from by setting some
sort of timer or threshold where you switch a hardcoded lambda to the "main
input only" path.

Kenn

On Tue, Feb 23, 2021 at 5:07 PM Ahmet Altay <al...@google.com> wrote:

> Hemali, would this be a reasonable workaround for your problem?
>
> /cc +Kenneth Knowles <kl...@google.com> - In case there is an alternative
> workaround to BEAM-6855.
> /cc +Cosmin Arad <ca...@google.com>
>
> On Thu, Feb 18, 2021 at 1:27 PM Brian Hulette <bh...@google.com> wrote:
>
>> I added JvmInitializer [1] to do some one-time initialization per JVM
>> before processing starts. It might be useful here... the intended use-case
>> was to perform quick configuration functions, but I suppose you could use
>> it to pull some data that you can reference later.
>>
>> [1]
>> https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html
>>
>> On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada <pa...@google.com> wrote:
>>
>>> +Brian Hulette <bh...@google.com> I believe you worked on a way to
>>> load data on worker startup?
>>>
>>> On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dp...@google.com>
>>> wrote:
>>>
>>>> The getState function should be static, sorry. "synchronized static
>>>> @NotNull MyState getState()"
>>>>
>>>> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dp...@google.com>
>>>> wrote:
>>>>
>>>>> > On every dataflow start, I want to read from CloudSQL and build the
>>>>> cache
>>>>>
>>>>> If you do this outside of dataflow, you can use a static to do this on
>>>>> every worker start. Is that what you're looking for? For example:
>>>>>
>>>>> final class StateLoader {
>>>>>   private StateLoader() {}
>>>>>
>>>>>   @GuardedBy("this")
>>>>>   private static @Nullable MyState state;
>>>>>
>>>>>   synchronized @NotNull MyState getState() {
>>>>>     if (state == null) {
>>>>>       state = LoadStateFromSQL();
>>>>>     }
>>>>>     return state;
>>>>>   }
>>>>> }
>>>>>
>>>>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
>>>>> hsutaria@paloaltonetworks.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have one question. This is *kind of a blocker for our upcoming
>>>>>> release*. It would be great if you could reply at your earliest
>>>>>> convenience.
>>>>>>
>>>>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>>>>>> processing (StateId, ValueState). I have also implemented OnTimer for my
>>>>>> stateful transformation. On every dataflow start, I want to read from
>>>>>> CloudSQL and build the cache. For that, I need to provide the pre-built
>>>>>> cache as side-input to my current transform. But, it looks like there is
>>>>>> some issue when I add side input to my stateful transform. I think I am
>>>>>> hitting BEAM-6855 issue (
>>>>>> https://issues.apache.org/jira/browse/BEAM-6855). Is there any
>>>>>> workaround? Any help would be appreciated.
>>>>>>
>>>>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK.
>>>>>> I am using GlobalWindow.
>>>>>>
>>>>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, DataTunnelStatus>, DataTunnelStateRelational> {
>>>>>>     @TimerId("tunnelStatusExpiryTimer")
>>>>>>     private final TimerSpec tunnelStatusExpiryTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>>>>
>>>>>>     @StateId("tunnelStatus")
>>>>>>     private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
>>>>>>             StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>>>>>
>>>>>> @ProcessElement
>>>>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
>>>>>>                     MultiOutputReceiver out,
>>>>>>                     @StateId("tunnelStatus") ValueState<DataTunnelStatus> tunnelStatusCache,
>>>>>>                     @TimerId("tunnelStatusExpiryTimer") Timer tunnelStatusExpiryTimer,
>>>>>>                     ProcessContext c)
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Hemali Sutaria
>>>>>>
>>>>>>

Re: BEAM-6855

Posted by Ahmet Altay <al...@google.com>.
Hemali, would this be a reasonable workaround for your problem?

/cc +Kenneth Knowles <kl...@google.com> - In case there is an alternative
workaround to BEAM-6855.
/cc +Cosmin Arad <ca...@google.com>

On Thu, Feb 18, 2021 at 1:27 PM Brian Hulette <bh...@google.com> wrote:

> I added JvmInitializer [1] to do some one-time initialization per JVM
> before processing starts. It might be useful here... the intended use-case
> was to perform quick configuration functions, but I suppose you could use
> it to pull some data that you can reference later.
>
> [1]
> https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html
>
> On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada <pa...@google.com> wrote:
>
>> +Brian Hulette <bh...@google.com> I believe you worked on a way to
>> load data on worker startup?
>>
>> On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dp...@google.com>
>> wrote:
>>
>>> The getState function should be static, sorry. "synchronized static
>>> @NotNull MyState getState()"
>>>
>>> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dp...@google.com>
>>> wrote:
>>>
>>>> > On every dataflow start, I want to read from CloudSQL and build the
>>>> cache
>>>>
>>>> If you do this outside of dataflow, you can use a static to do this on
>>>> every worker start. Is that what you're looking for? For example:
>>>>
>>>> final class StateLoader {
>>>>   private StateLoader() {}
>>>>
>>>>   @GuardedBy("this")
>>>>   private static @Nullable MyState state;
>>>>
>>>>   synchronized @NotNull MyState getState() {
>>>>     if (state == null) {
>>>>       state = LoadStateFromSQL();
>>>>     }
>>>>     return state;
>>>>   }
>>>> }
>>>>
>>>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
>>>> hsutaria@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have one question. This is *kind of a blocker for our upcoming
>>>>> release*. It would be great if you could reply at your earliest
>>>>> convenience.
>>>>>
>>>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>>>>> processing (StateId, ValueState). I have also implemented OnTimer for my
>>>>> stateful transformation. On every dataflow start, I want to read from
>>>>> CloudSQL and build the cache. For that, I need to provide the pre-built
>>>>> cache as side-input to my current transform. But, it looks like there is
>>>>> some issue when I add side input to my stateful transform. I think I am
>>>>> hitting BEAM-6855 issue (
>>>>> https://issues.apache.org/jira/browse/BEAM-6855). Is there any
>>>>> workaround? Any help would be appreciated.
>>>>>
>>>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK.
>>>>> I am using GlobalWindow.
>>>>>
>>>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, DataTunnelStatus>, DataTunnelStateRelational> {
>>>>>     @TimerId("tunnelStatusExpiryTimer")
>>>>>     private final TimerSpec tunnelStatusExpiryTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>>>
>>>>>     @StateId("tunnelStatus")
>>>>>     private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
>>>>>             StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>>>>
>>>>> @ProcessElement
>>>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
>>>>>                     MultiOutputReceiver out,
>>>>>                     @StateId("tunnelStatus") ValueState<DataTunnelStatus> tunnelStatusCache,
>>>>>                     @TimerId("tunnelStatusExpiryTimer") Timer tunnelStatusExpiryTimer,
>>>>>                     ProcessContext c)
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Hemali Sutaria
>>>>>
>>>>>

Re: BEAM-6855

Posted by Brian Hulette <bh...@google.com>.
I added JvmInitializer [1] to do some one-time initialization per JVM
before processing starts. It might be useful here... the intended use-case
was to perform quick configuration functions, but I suppose you could use
it to pull some data that you can reference later.

[1]
https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html

On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada <pa...@google.com> wrote:

> +Brian Hulette <bh...@google.com> I believe you worked on a way to
> load data on worker startup?
>
> On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dp...@google.com>
> wrote:
>
>> The getState function should be static, sorry. "synchronized static
>> @NotNull MyState getState()"
>>
>> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dp...@google.com>
>> wrote:
>>
>>> > On every dataflow start, I want to read from CloudSQL and build the
>>> cache
>>>
>>> If you do this outside of dataflow, you can use a static to do this on
>>> every worker start. Is that what you're looking for? For example:
>>>
>>> final class StateLoader {
>>>   private StateLoader() {}
>>>
>>>   @GuardedBy("this")
>>>   private static @Nullable MyState state;
>>>
>>>   synchronized @NotNull MyState getState() {
>>>     if (state == null) {
>>>       state = LoadStateFromSQL();
>>>     }
>>>     return state;
>>>   }
>>> }
>>>
>>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
>>> hsutaria@paloaltonetworks.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have one question. This is *kind of a blocker for our upcoming
>>>> release*. It would be great if you could reply at your earliest
>>>> convenience.
>>>>
>>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>>>> processing (StateId, ValueState). I have also implemented OnTimer for my
>>>> stateful transformation. On every dataflow start, I want to read from
>>>> CloudSQL and build the cache. For that, I need to provide the pre-built
>>>> cache as side-input to my current transform. But, it looks like there is
>>>> some issue when I add side input to my stateful transform. I think I am
>>>> hitting BEAM-6855 issue (
>>>> https://issues.apache.org/jira/browse/BEAM-6855). Is there any
>>>> workaround? Any help would be appreciated.
>>>>
>>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I
>>>> am using GlobalWindow.
>>>>
>>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, DataTunnelStatus>, DataTunnelStateRelational> {
>>>>     @TimerId("tunnelStatusExpiryTimer")
>>>>     private final TimerSpec tunnelStatusExpiryTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>>
>>>>     @StateId("tunnelStatus")
>>>>     private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
>>>>             StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>>>
>>>> @ProcessElement
>>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
>>>>                     MultiOutputReceiver out,
>>>>                     @StateId("tunnelStatus") ValueState<DataTunnelStatus> tunnelStatusCache,
>>>>                     @TimerId("tunnelStatusExpiryTimer") Timer tunnelStatusExpiryTimer,
>>>>                     ProcessContext c)
>>>>
>>>>
>>>>
>>>> Thanks,
>>>> Hemali Sutaria
>>>>
>>>>

Re: BEAM-6855

Posted by Pablo Estrada <pa...@google.com>.
+Brian Hulette <bh...@google.com> I believe you worked on a way to load
data on worker startup?

On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dp...@google.com> wrote:

> The getState function should be static, sorry. "synchronized static
> @NotNull MyState getState()"
>
> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dp...@google.com>
> wrote:
>
>> > On every dataflow start, I want to read from CloudSQL and build the
>> cache
>>
>> If you do this outside of dataflow, you can use a static to do this on
>> every worker start. Is that what you're looking for? For example:
>>
>> final class StateLoader {
>>   private StateLoader() {}
>>
>>   @GuardedBy("this")
>>   private static @Nullable MyState state;
>>
>>   synchronized @NotNull MyState getState() {
>>     if (state == null) {
>>       state = LoadStateFromSQL();
>>     }
>>     return state;
>>   }
>> }
>>
>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
>> hsutaria@paloaltonetworks.com> wrote:
>>
>>> Hi,
>>>
>>> I have one question. This is *kind of a blocker for our upcoming
>>> release*. It would be great if you could reply at your earliest
>>> convenience.
>>>
>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>>> processing (StateId, ValueState). I have also implemented OnTimer for my
>>> stateful transformation. On every dataflow start, I want to read from
>>> CloudSQL and build the cache. For that, I need to provide the pre-built
>>> cache as side-input to my current transform. But, it looks like there is
>>> some issue when I add side input to my stateful transform. I think I am
>>> hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855).
>>> Is there any workaround? Any help would be appreciated.
>>>
>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I
>>> am using GlobalWindow.
>>>
>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, DataTunnelStatus>, DataTunnelStateRelational> {
>>>     @TimerId("tunnelStatusExpiryTimer")
>>>     private final TimerSpec tunnelStatusExpiryTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>
>>>     @StateId("tunnelStatus")
>>>     private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
>>>             StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>>
>>> @ProcessElement
>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
>>>                     MultiOutputReceiver out,
>>>                     @StateId("tunnelStatus") ValueState<DataTunnelStatus> tunnelStatusCache,
>>>                     @TimerId("tunnelStatusExpiryTimer") Timer tunnelStatusExpiryTimer,
>>>                     ProcessContext c)
>>>
>>>
>>>
>>> Thanks,
>>> Hemali Sutaria
>>>
>>>

Re: BEAM-6855

Posted by Daniel Collins <dp...@google.com>.
The getState function should be static, sorry. "synchronized static
@NotNull MyState getState()"

On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dp...@google.com> wrote:

> > On every dataflow start, I want to read from CloudSQL and build the cache
>
> If you do this outside of dataflow, you can use a static to do this on
> every worker start. Is that what you're looking for? For example:
>
> final class StateLoader {
>   private StateLoader() {}
>
>   @GuardedBy("this")
>   private static @Nullable MyState state;
>
>   synchronized @NotNull MyState getState() {
>     if (state == null) {
>       state = LoadStateFromSQL();
>     }
>     return state;
>   }
> }
>
> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
> hsutaria@paloaltonetworks.com> wrote:
>
>> Hi,
>>
>> I have one question. This is *kind of a blocker for our upcoming release*.
>> It would be great if you could reply at your earliest convenience.
>>
>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>> processing (StateId, ValueState). I have also implemented OnTimer for my
>> stateful transformation. On every dataflow start, I want to read from
>> CloudSQL and build the cache. For that, I need to provide the pre-built
>> cache as side-input to my current transform. But, it looks like there is
>> some issue when I add side input to my stateful transform. I think I am
>> hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855).
>> Is there any workaround? Any help would be appreciated.
>>
>> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I
>> am using GlobalWindow.
>>
>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, DataTunnelStatus>, DataTunnelStateRelational> {
>>     @TimerId("tunnelStatusExpiryTimer")
>>     private final TimerSpec tunnelStatusExpiryTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>>     @StateId("tunnelStatus")
>>     private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
>>             StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>
>> @ProcessElement
>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
>>                     MultiOutputReceiver out,
>>                     @StateId("tunnelStatus") ValueState<DataTunnelStatus> tunnelStatusCache,
>>                     @TimerId("tunnelStatusExpiryTimer") Timer tunnelStatusExpiryTimer,
>>                     ProcessContext c)
>>
>>
>>
>> Thanks,
>> Hemali Sutaria
>>
>>

Re: BEAM-6855

Posted by Daniel Collins <dp...@google.com>.
> On every dataflow start, I want to read from CloudSQL and build the cache

If you do this outside of dataflow, you can use a static to do this on
every worker start. Is that what you're looking for? For example:

final class StateLoader {
  private StateLoader() {}

  @GuardedBy("this")
  private static @Nullable MyState state;

  synchronized @NotNull MyState getState() {
    if (state == null) {
      state = LoadStateFromSQL();
    }
    return state;
  }
}

On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
hsutaria@paloaltonetworks.com> wrote:

> Hi,
>
> I have one question. This is *kind of a blocker for our upcoming release*.
> It would be great if you could reply at your earliest convenience.
>
> My dataflow pipeline is stateful. I am using Beam SDK for stateful
> processing (StateId, ValueState). I have also implemented OnTimer for my
> stateful transformation. On every dataflow start, I want to read from
> CloudSQL and build the cache. For that, I need to provide the pre-built
> cache as side-input to my current transform. But, it looks like there is
> some issue when I add side input to my stateful transform. I think I am
> hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855).
> Is there any workaround? Any help would be appreciated.
>
> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I am
> using GlobalWindow.
>
> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, DataTunnelStatus>, DataTunnelStateRelational> {
>     @TimerId("tunnelStatusExpiryTimer")
>     private final TimerSpec tunnelStatusExpiryTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
>     @StateId("tunnelStatus")
>     private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
>             StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>
> @ProcessElement
> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
>                     MultiOutputReceiver out,
>                     @StateId("tunnelStatus") ValueState<DataTunnelStatus> tunnelStatusCache,
>                     @TimerId("tunnelStatusExpiryTimer") Timer tunnelStatusExpiryTimer,
>                     ProcessContext c)
>
>
>
> Thanks,
> Hemali Sutaria
>
>