You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Hemali Sutaria <hs...@paloaltonetworks.com> on 2021/02/18 14:53:42 UTC
BEAM-6855
Hi,
I have one question. This is *kind of a blocker for our upcoming release*.
It would be great if you could reply at your earliest convenience.
My dataflow pipeline is stateful. I am using Beam SDK for stateful
processing (StateId, ValueState). I have also implemented OnTimer for my
stateful transformation. On every dataflow start, I want to read from
CloudSQL and build the cache. For that, I need to provide the pre-built
cache as side-input to my current transform. But, it looks like there is
some issue when I add side input to my stateful transform. I think I am
hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855).
Is there any workaround? Any help would be appreciated.
Following is my definition of Transforms. I am using 2.23.0 beam SDK. I am
using GlobalWindow.
private class GetLatestState extends DoFn<KV<DataTunnelStatusKey,
DataTunnelStatus>, DataTunnelStateRelational> {
@TimerId("tunnelStatusExpiryTimer")
private final TimerSpec tunnelStatusExpiryTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StateId("tunnelStatus")
private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
@ProcessElement
public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
MultiOutputReceiver out,
@StateId("tunnelStatus")
ValueState<DataTunnelStatus> tunnelStatusCache,
@TimerId("tunnelStatusExpiryTimer") Timer
tunnelStatusExpiryTimer,
ProcessContext c)
Thanks,
Hemali Sutaria
Re: BEAM-6855
Posted by Kenneth Knowles <kl...@google.com>.
Another workaround might be to create a PCollection that is the tagged
union of the main input and the side input. I think you can avoid
per-element overhead of checking which input they are from by setting some
sort of timer or threshold where you switch a hardcoded lambda to the "main
input only" path.
Kenn
On Tue, Feb 23, 2021 at 5:07 PM Ahmet Altay <al...@google.com> wrote:
> Hemali, would this be a reasonable workaround for your problem?
>
> /cc +Kenneth Knowles <kl...@google.com> - In case there is an alternative
> workaround to BEAM-6855.
> /cc +Cosmin Arad <ca...@google.com>
>
> On Thu, Feb 18, 2021 at 1:27 PM Brian Hulette <bh...@google.com> wrote:
>
>> I added JvmInitializer [1] to do some one-time initialization per JVM
>> before processing starts. It might be useful here... the intended use-case
>> was to perform quick configuration functions, but I suppose you could use
>> it to pull some data that you can reference later.
>>
>> [1]
>> https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html
>>
>> On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada <pa...@google.com> wrote:
>>
>>> +Brian Hulette <bh...@google.com> I believe you worked on a way to
>>> load data on worker startup?
>>>
>>> On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dp...@google.com>
>>> wrote:
>>>
>>>> The getState function should be static, sorry. "synchronized static
>>>> @NotNull MyState getState()"
>>>>
>>>> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dp...@google.com>
>>>> wrote:
>>>>
>>>>> > On every dataflow start, I want to read from CloudSQL and build the
>>>>> cache
>>>>>
>>>>> If you do this outside of dataflow, you can use a static to do this on
>>>>> every worker start. Is that what you're looking for? For example:
>>>>>
>>>>> final class StateLoader {
>>>>> private StateLoader() {}
>>>>>
>>>>> @GuardedBy("this")
>>>>> private static @Nullable MyState state;
>>>>>
>>>>> synchronized @NotNull MyState getState() {
>>>>> if (state == null) {
>>>>> state = LoadStateFromSQL();
>>>>> }
>>>>> return state;
>>>>> }
>>>>> }
>>>>>
>>>>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
>>>>> hsutaria@paloaltonetworks.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have one question. This is *kind of a blocker for our upcoming
>>>>>> release*. It would be great if you could reply at your earliest
>>>>>> convenience.
>>>>>>
>>>>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>>>>>> processing (StateId, ValueState). I have also implemented OnTimer for my
>>>>>> stateful transformation. On every dataflow start, I want to read from
>>>>>> CloudSQL and build the cache. For that, I need to provide the pre-built
>>>>>> cache as side-input to my current transform. But, it looks like there is
>>>>>> some issue when I add side input to my stateful transform. I think I am
>>>>>> hitting BEAM-6855 issue (
>>>>>> https://issues.apache.org/jira/browse/BEAM-6855). Is there any
>>>>>> workaround? Any help would be appreciated.
>>>>>>
>>>>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK.
>>>>>> I am using GlobalWindow.
>>>>>>
>>>>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, DataTunnelStatus>, DataTunnelStateRelational> {
>>>>>> @TimerId("tunnelStatusExpiryTimer")
>>>>>> private final TimerSpec tunnelStatusExpiryTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>>>>
>>>>>> @StateId("tunnelStatus")
>>>>>> private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
>>>>>> StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>>>>>
>>>>>> @ProcessElement
>>>>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
>>>>>> MultiOutputReceiver out,
>>>>>> @StateId("tunnelStatus") ValueState<DataTunnelStatus> tunnelStatusCache,
>>>>>> @TimerId("tunnelStatusExpiryTimer") Timer tunnelStatusExpiryTimer,
>>>>>> ProcessContext c)
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Hemali Sutaria
>>>>>>
>>>>>>
Re: BEAM-6855
Posted by Ahmet Altay <al...@google.com>.
Hemali, would this be a reasonable workaround for your problem?
/cc +Kenneth Knowles <kl...@google.com> - In case there is an alternative
workaround to BEAM-6855.
/cc +Cosmin Arad <ca...@google.com>
On Thu, Feb 18, 2021 at 1:27 PM Brian Hulette <bh...@google.com> wrote:
> I added JvmInitializer [1] to do some one-time initialization per JVM
> before processing starts. It might be useful here... the intended use-case
> was to perform quick configuration functions, but I suppose you could use
> it to pull some data that you can reference later.
>
> [1]
> https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html
>
> On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada <pa...@google.com> wrote:
>
>> +Brian Hulette <bh...@google.com> I believe you worked on a way to
>> load data on worker startup?
>>
>> On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dp...@google.com>
>> wrote:
>>
>>> The getState function should be static, sorry. "synchronized static
>>> @NotNull MyState getState()"
>>>
>>> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dp...@google.com>
>>> wrote:
>>>
>>>> > On every dataflow start, I want to read from CloudSQL and build the
>>>> cache
>>>>
>>>> If you do this outside of dataflow, you can use a static to do this on
>>>> every worker start. Is that what you're looking for? For example:
>>>>
>>>> final class StateLoader {
>>>> private StateLoader() {}
>>>>
>>>> @GuardedBy("this")
>>>> private static @Nullable MyState state;
>>>>
>>>> synchronized @NotNull MyState getState() {
>>>> if (state == null) {
>>>> state = LoadStateFromSQL();
>>>> }
>>>> return state;
>>>> }
>>>> }
>>>>
>>>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
>>>> hsutaria@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have one question. This is *kind of a blocker for our upcoming
>>>>> release*. It would be great if you could reply at your earliest
>>>>> convenience.
>>>>>
>>>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>>>>> processing (StateId, ValueState). I have also implemented OnTimer for my
>>>>> stateful transformation. On every dataflow start, I want to read from
>>>>> CloudSQL and build the cache. For that, I need to provide the pre-built
>>>>> cache as side-input to my current transform. But, it looks like there is
>>>>> some issue when I add side input to my stateful transform. I think I am
>>>>> hitting BEAM-6855 issue (
>>>>> https://issues.apache.org/jira/browse/BEAM-6855). Is there any
>>>>> workaround? Any help would be appreciated.
>>>>>
>>>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK.
>>>>> I am using GlobalWindow.
>>>>>
>>>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, DataTunnelStatus>, DataTunnelStateRelational> {
>>>>> @TimerId("tunnelStatusExpiryTimer")
>>>>> private final TimerSpec tunnelStatusExpiryTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>>>
>>>>> @StateId("tunnelStatus")
>>>>> private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
>>>>> StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>>>>
>>>>> @ProcessElement
>>>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
>>>>> MultiOutputReceiver out,
>>>>> @StateId("tunnelStatus") ValueState<DataTunnelStatus> tunnelStatusCache,
>>>>> @TimerId("tunnelStatusExpiryTimer") Timer tunnelStatusExpiryTimer,
>>>>> ProcessContext c)
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Hemali Sutaria
>>>>>
>>>>>
Re: BEAM-6855
Posted by Brian Hulette <bh...@google.com>.
I added JvmInitializer [1] to do some one-time initialization per JVM
before processing starts. It might be useful here... the intended use-case
was to perform quick configuration functions, but I suppose you could use
it to pull some data that you can reference later.
[1]
https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html
On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada <pa...@google.com> wrote:
> +Brian Hulette <bh...@google.com> I believe you worked on a way to
> load data on worker startup?
>
> On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dp...@google.com>
> wrote:
>
>> The getState function should be static, sorry. "synchronized static
>> @NotNull MyState getState()"
>>
>> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dp...@google.com>
>> wrote:
>>
>>> > On every dataflow start, I want to read from CloudSQL and build the
>>> cache
>>>
>>> If you do this outside of dataflow, you can use a static to do this on
>>> every worker start. Is that what you're looking for? For example:
>>>
>>> final class StateLoader {
>>> private StateLoader() {}
>>>
>>> @GuardedBy("this")
>>> private static @Nullable MyState state;
>>>
>>> synchronized @NotNull MyState getState() {
>>> if (state == null) {
>>> state = LoadStateFromSQL();
>>> }
>>> return state;
>>> }
>>> }
>>>
>>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
>>> hsutaria@paloaltonetworks.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have one question. This is *kind of a blocker for our upcoming
>>>> release*. It would be great if you could reply at your earliest
>>>> convenience.
>>>>
>>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>>>> processing (StateId, ValueState). I have also implemented OnTimer for my
>>>> stateful transformation. On every dataflow start, I want to read from
>>>> CloudSQL and build the cache. For that, I need to provide the pre-built
>>>> cache as side-input to my current transform. But, it looks like there is
>>>> some issue when I add side input to my stateful transform. I think I am
>>>> hitting BEAM-6855 issue (
>>>> https://issues.apache.org/jira/browse/BEAM-6855). Is there any
>>>> workaround? Any help would be appreciated.
>>>>
>>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I
>>>> am using GlobalWindow.
>>>>
>>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, DataTunnelStatus>, DataTunnelStateRelational> {
>>>> @TimerId("tunnelStatusExpiryTimer")
>>>> private final TimerSpec tunnelStatusExpiryTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>>
>>>> @StateId("tunnelStatus")
>>>> private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
>>>> StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>>>
>>>> @ProcessElement
>>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
>>>> MultiOutputReceiver out,
>>>> @StateId("tunnelStatus") ValueState<DataTunnelStatus> tunnelStatusCache,
>>>> @TimerId("tunnelStatusExpiryTimer") Timer tunnelStatusExpiryTimer,
>>>> ProcessContext c)
>>>>
>>>>
>>>>
>>>> Thanks,
>>>> Hemali Sutaria
>>>>
>>>>
Re: BEAM-6855
Posted by Pablo Estrada <pa...@google.com>.
+Brian Hulette <bh...@google.com> I believe you worked on a way to load
data on worker startup?
On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dp...@google.com> wrote:
> The getState function should be static, sorry. "synchronized static
> @NotNull MyState getState()"
>
> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dp...@google.com>
> wrote:
>
>> > On every dataflow start, I want to read from CloudSQL and build the
>> cache
>>
>> If you do this outside of dataflow, you can use a static to do this on
>> every worker start. Is that what you're looking for? For example:
>>
>> final class StateLoader {
>> private StateLoader() {}
>>
>> @GuardedBy("this")
>> private static @Nullable MyState state;
>>
>> synchronized @NotNull MyState getState() {
>> if (state == null) {
>> state = LoadStateFromSQL();
>> }
>> return state;
>> }
>> }
>>
>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
>> hsutaria@paloaltonetworks.com> wrote:
>>
>>> Hi,
>>>
>>> I have one question. This is *kind of a blocker for our upcoming
>>> release*. It would be great if you could reply at your earliest
>>> convenience.
>>>
>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>>> processing (StateId, ValueState). I have also implemented OnTimer for my
>>> stateful transformation. On every dataflow start, I want to read from
>>> CloudSQL and build the cache. For that, I need to provide the pre-built
>>> cache as side-input to my current transform. But, it looks like there is
>>> some issue when I add side input to my stateful transform. I think I am
>>> hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855).
>>> Is there any workaround? Any help would be appreciated.
>>>
>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I
>>> am using GlobalWindow.
>>>
>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, DataTunnelStatus>, DataTunnelStateRelational> {
>>> @TimerId("tunnelStatusExpiryTimer")
>>> private final TimerSpec tunnelStatusExpiryTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>
>>> @StateId("tunnelStatus")
>>> private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
>>> StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>>
>>> @ProcessElement
>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
>>> MultiOutputReceiver out,
>>> @StateId("tunnelStatus") ValueState<DataTunnelStatus> tunnelStatusCache,
>>> @TimerId("tunnelStatusExpiryTimer") Timer tunnelStatusExpiryTimer,
>>> ProcessContext c)
>>>
>>>
>>>
>>> Thanks,
>>> Hemali Sutaria
>>>
>>>
Re: BEAM-6855
Posted by Daniel Collins <dp...@google.com>.
The getState function should be static, sorry. "synchronized static
@NotNull MyState getState()"
On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dp...@google.com> wrote:
> > On every dataflow start, I want to read from CloudSQL and build the cache
>
> If you do this outside of dataflow, you can use a static to do this on
> every worker start. Is that what you're looking for? For example:
>
> final class StateLoader {
> private StateLoader() {}
>
> @GuardedBy("this")
> private static @Nullable MyState state;
>
> synchronized @NotNull MyState getState() {
> if (state == null) {
> state = LoadStateFromSQL();
> }
> return state;
> }
> }
>
> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
> hsutaria@paloaltonetworks.com> wrote:
>
>> Hi,
>>
>> I have one question. This is *kind of a blocker for our upcoming release*.
>> It would be great if you could reply at your earliest convenience.
>>
>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>> processing (StateId, ValueState). I have also implemented OnTimer for my
>> stateful transformation. On every dataflow start, I want to read from
>> CloudSQL and build the cache. For that, I need to provide the pre-built
>> cache as side-input to my current transform. But, it looks like there is
>> some issue when I add side input to my stateful transform. I think I am
>> hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855).
>> Is there any workaround? Any help would be appreciated.
>>
>> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I
>> am using GlobalWindow.
>>
>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, DataTunnelStatus>, DataTunnelStateRelational> {
>> @TimerId("tunnelStatusExpiryTimer")
>> private final TimerSpec tunnelStatusExpiryTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>> @StateId("tunnelStatus")
>> private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
>> StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>
>> @ProcessElement
>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
>> MultiOutputReceiver out,
>> @StateId("tunnelStatus") ValueState<DataTunnelStatus> tunnelStatusCache,
>> @TimerId("tunnelStatusExpiryTimer") Timer tunnelStatusExpiryTimer,
>> ProcessContext c)
>>
>>
>>
>> Thanks,
>> Hemali Sutaria
>>
>>
Re: BEAM-6855
Posted by Daniel Collins <dp...@google.com>.
> On every dataflow start, I want to read from CloudSQL and build the cache
If you do this outside of dataflow, you can use a static to do this on
every worker start. Is that what you're looking for? For example:
final class StateLoader {
private StateLoader() {}
@GuardedBy("this")
private static @Nullable MyState state;
synchronized @NotNull MyState getState() {
if (state == null) {
state = LoadStateFromSQL();
}
return state;
}
}
On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
hsutaria@paloaltonetworks.com> wrote:
> Hi,
>
> I have one question. This is *kind of a blocker for our upcoming release*.
> It would be great if you could reply at your earliest convenience.
>
> My dataflow pipeline is stateful. I am using Beam SDK for stateful
> processing (StateId, ValueState). I have also implemented OnTimer for my
> stateful transformation. On every dataflow start, I want to read from
> CloudSQL and build the cache. For that, I need to provide the pre-built
> cache as side-input to my current transform. But, it looks like there is
> some issue when I add side input to my stateful transform. I think I am
> hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855).
> Is there any workaround? Any help would be appreciated.
>
> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I am
> using GlobalWindow.
>
> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, DataTunnelStatus>, DataTunnelStateRelational> {
> @TimerId("tunnelStatusExpiryTimer")
> private final TimerSpec tunnelStatusExpiryTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
> @StateId("tunnelStatus")
> private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
> StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>
> @ProcessElement
> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
> MultiOutputReceiver out,
> @StateId("tunnelStatus") ValueState<DataTunnelStatus> tunnelStatusCache,
> @TimerId("tunnelStatusExpiryTimer") Timer tunnelStatusExpiryTimer,
> ProcessContext c)
>
>
>
> Thanks,
> Hemali Sutaria
>
>