You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kenneth Knowles <kl...@google.com> on 2021/03/02 00:11:52 UTC

Re: BEAM-6855

Another workaround might be to create a PCollection that is the tagged
union of the main input and the side input. I think you can avoid
per-element overhead of checking which input they are from by setting some
sort of timer or threshold where you switch a hardcoded lambda to the "main
input only" path.

Kenn

On Tue, Feb 23, 2021 at 5:07 PM Ahmet Altay <al...@google.com> wrote:

> Hemali, would this be a reasonable workaround for your problem?
>
> /cc +Kenneth Knowles <kl...@google.com> - In case there is an alternative
> workaround to BEAM-6855.
> /cc +Cosmin Arad <ca...@google.com>
>
> On Thu, Feb 18, 2021 at 1:27 PM Brian Hulette <bh...@google.com> wrote:
>
>> I added JvmInitializer [1] to do some one-time initialization per JVM
>> before processing starts. It might be useful here... the intended use-case
>> was to perform quick configuration functions, but I suppose you could use
>> it to pull some data that you can reference later.
>>
>> [1]
>> https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html
>>
>> On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada <pa...@google.com> wrote:
>>
>>> +Brian Hulette <bh...@google.com> I believe you worked on a way to
>>> load data on worker startup?
>>>
>>> On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dp...@google.com>
>>> wrote:
>>>
>>>> The getState function should be static, sorry. "synchronized static
>>>> @NotNull MyState getState()"
>>>>
>>>> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dp...@google.com>
>>>> wrote:
>>>>
>>>>> > On every dataflow start, I want to read from CloudSQL and build the
>>>>> cache
>>>>>
>>>>> If you do this outside of dataflow, you can use a static to do this on
>>>>> every worker start. Is that what you're looking for? For example:
>>>>>
>>>>> final class StateLoader {
>>>>>   private StateLoader() {}
>>>>>
>>>>>   @GuardedBy("this")
>>>>>   private static @Nullable MyState state;
>>>>>
>>>>>   synchronized @NotNull MyState getState() {
>>>>>     if (state == null) {
>>>>>       state = LoadStateFromSQL();
>>>>>     }
>>>>>     return state;
>>>>>   }
>>>>> }
>>>>>
>>>>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
>>>>> hsutaria@paloaltonetworks.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have one question. This is *kind of a blocker for our upcoming
>>>>>> release*. It would be great if you could reply at your earliest
>>>>>> convenience.
>>>>>>
>>>>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>>>>>> processing (StateId, ValueState). I have also implemented OnTimer for my
>>>>>> stateful transformation. On every dataflow start, I want to read from
>>>>>> CloudSQL and build the cache. For that, I need to provide the pre-built
>>>>>> cache as side-input to my current transform. But, it looks like there is
>>>>>> some issue when I add side input to my stateful transform. I think I am
>>>>>> hitting BEAM-6855 issue (
>>>>>> https://issues.apache.org/jira/browse/BEAM-6855). Is there any
>>>>>> workaround? Any help would be appreciated.
>>>>>>
>>>>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK.
>>>>>> I am using GlobalWindow.
>>>>>>
>>>>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, DataTunnelStatus>, DataTunnelStateRelational> {
>>>>>>     @TimerId("tunnelStatusExpiryTimer")
>>>>>>     private final TimerSpec tunnelStatusExpiryTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>>>>
>>>>>>     @StateId("tunnelStatus")
>>>>>>     private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
>>>>>>             StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>>>>>
>>>>>> @ProcessElement
>>>>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
>>>>>>                     MultiOutputReceiver out,
>>>>>>                     @StateId("tunnelStatus") ValueState<DataTunnelStatus> tunnelStatusCache,
>>>>>>                     @TimerId("tunnelStatusExpiryTimer") Timer tunnelStatusExpiryTimer,
>>>>>>                     ProcessContext c)
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Hemali Sutaria
>>>>>>
>>>>>>