You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@impala.apache.org by Shuhao Tan <jo...@gmail.com> on 2020/06/05 18:05:26 UTC

Are GetFunctionState/SetFunctionState well-defined in UDA?

Hi all,

I recently wrote some UDA and I noticed that in be/src/udf/udf.h the
comments for SetFunctionState starts with
> Methods for maintaining state across UDF/UDA function calls.
I presume GetFunctionState/SetFunctionState should work for UDA as well.

I first tried to find an example in the repo, but I found the function is
exclusively used by UDF.
I then implemented a simple UDA just to test its behaviour. My current
findings are:
Even when using SetFunctionState(THREAD_LOCAL, some_ptr) only in the Init,
other threads (presumably in the same fragment) can still see it with
GetFunctionState(THREAD_LOCAL) if their Init were invoked later.
Currently it seems that threads in the same fragment were calling Init
sequentially without race condition on FunctionState.

My questions: Are GetFunctionState/SetFunctionState well-defined in UDA?
If so, what are the semantics and execution guarantees? How does passing
different FunctionStateScope change the behavior? Is it guaranteed
thread-safe?

Thanks.

Re: Are GetFunctionState/SetFunctionState well-defined in UDA?

Posted by Tim Armstrong <ta...@cloudera.com>.
Yeah it'll be basically the same for both - Init() is called for each
intermediate value in both case, before the update or merge function is
called.

On Fri, Jun 5, 2020 at 4:47 PM Shuhao Tan <jo...@gmail.com> wrote:

> Thank you. This helps a lot.
>
> One last question: Is this behavior the same for both updating phase and
> merging phase?
>
> On Fri, Jun 5, 2020 at 6:47 PM Tim Armstrong <ta...@cloudera.com>
> wrote:
>
>> I think what you're seeing is expected.
>>
>> There is a different 'context' per aggregate function, per aggregation
>> operator, per thread. So it's expected there are multiple FunctionContext
>> objects.
>>
>> Init() initializes the intermediate value, so if you have an aggregation
>> with a group by, it will be called multiple times for each FunctionContext.
>>
>> On Fri, Jun 5, 2020 at 12:11 PM Shuhao Tan <jo...@gmail.com> wrote:
>>
>>> Sorry for a typo in the code
>>> It should be
>>> void Init(FunctionContext* context, StringVal* result) {
>>>   DebugPrint("Init: current state
>>> 0x%llx", reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>>>   auto ptr = context->allocate(4);
>>>   context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr);
>>>   DebugPrint("Init: store 0x%llx", ptr);
>>> }
>>>
>>> On Fri, Jun 5, 2020 at 3:09 PM Shuhao Tan <jo...@gmail.com> wrote:
>>>
>>>> Thanks. Knowing that Init() would be called many times per thread
>>>> really helps.
>>>>
>>>> Basically I did something like this:
>>>>
>>>> void Init(FunctionContext* context, StringVal* result) {
>>>>   DebugPrint("Init: current state
>>>> 0x%llx", reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>>>>   auto ptr = context->allocate(4);
>>>>   DebugPrint("Init: store
>>>> 0x%llx", reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>>>>   context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr);
>>>> }
>>>> And just nop in the update, serialize, merge, and finalize.
>>>>
>>>> The DebugPrint is printing to a temporary file with timestamp and
>>>> thread id with syscall(SYS_gettid)
>>>> The output in the file looks like:
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xf474008
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7d008
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c008
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xe9f6008
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xe9f7008
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7d008
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7d0c0
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xf475008
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c008
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c0c0
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xff28008
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c0c0
>>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c178
>>>> 2020-06-05 16:13:07[26318]: Init: current state 0xff28008
>>>>
>>>> It does seem that the same thread is calling Init many times, with both
>>>> same FunctionStates and different FunctionStates. In other words, it seems
>>>> that these Init() calls are grouped, where different groups have different
>>>> THREAD_LOCAL FunctionState storage and calls within the same group share
>>>> the same THREAD_LOCAL FunctionState storage.
>>>>
>>>> Does my observation make any sense?
>>>>
>>>> On Fri, Jun 5, 2020 at 2:55 PM Tim Armstrong <ta...@cloudera.com>
>>>> wrote:
>>>>
>>>>> I think it would be easier to understand what you're seeing if you
>>>>> provided an example of what the code for your aggregate function looks
>>>>> like. If you call SetFunctionState(THREAD_LOCAL), I don't see a way that
>>>>> the pointer you set would be returned from GetFunctionState(THREAD_LOCAL)
>>>>> in a different thread.
>>>>>
>>>>> Init() is called for every aggregate tuple, so it can be called many
>>>>> times per thread for aggregations with a grouping key.
>>>>>
>>>>> Setting the FRAGMENT_LOCAL state only really makes sense for UDFs when
>>>>> Prepare(FRAGMENT_LOCAL) is called. After that the state is copied to any
>>>>> thread-local FunctionContexts. Calling SetFunctionState(FRAGMENT_LOCAL)
>>>>> later on is only going to modify the thread-local FunctionContext anyway.
>>>>>
>>>>> On Fri, Jun 5, 2020 at 11:06 AM Shuhao Tan <jo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I recently wrote some UDA and I noticed that in be/src/udf/udf.h the
>>>>>> comments for SetFunctionState starts with
>>>>>> > Methods for maintaining state across UDF/UDA function calls.
>>>>>> I presume GetFunctionState/SetFunctionState should work for UDA as
>>>>>> well.
>>>>>>
>>>>>> I first tried to find an example in the repo, but I found the
>>>>>> function is exclusively used by UDF.
>>>>>> I then implemented a simple UDA just to test its behaviour. My
>>>>>> current findings are:
>>>>>> Even when using SetFunctionState(THREAD_LOCAL, some_ptr) only in the
>>>>>> Init, other threads (presumably in the same fragment) can still see it with
>>>>>> GetFunctionState(THREAD_LOCAL) if their Init were invoked later.
>>>>>> Currently it seems that threads in the same fragment were calling
>>>>>> Init sequentially without race condition on FunctionState.
>>>>>>
>>>>>> My questions: Are GetFunctionState/SetFunctionState well-defined in
>>>>>> UDA?
>>>>>> If so, what are the semantics and execution guarantees? How does
>>>>>> passing different FunctionStateScope change the behavior? Is it guaranteed
>>>>>> thread-safe?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>

Re: Are GetFunctionState/SetFunctionState well-defined in UDA?

Posted by Shuhao Tan <jo...@gmail.com>.
Thank you. This helps a lot.

One last question: Is this behavior the same for both updating phase and
merging phase?

On Fri, Jun 5, 2020 at 6:47 PM Tim Armstrong <ta...@cloudera.com>
wrote:

> I think what you're seeing is expected.
>
> There is a different 'context' per aggregate function, per aggregation
> operator, per thread. So it's expected there are multiple FunctionContext
> objects.
>
> Init() initializes the intermediate value, so if you have an aggregation
> with a group by, it will be called multiple times for each FunctionContext.
>
> On Fri, Jun 5, 2020 at 12:11 PM Shuhao Tan <jo...@gmail.com> wrote:
>
>> Sorry for a typo in the code
>> It should be
>> void Init(FunctionContext* context, StringVal* result) {
>>   DebugPrint("Init: current state
>> 0x%llx", reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>>   auto ptr = context->allocate(4);
>>   context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr);
>>   DebugPrint("Init: store 0x%llx", ptr);
>> }
>>
>> On Fri, Jun 5, 2020 at 3:09 PM Shuhao Tan <jo...@gmail.com> wrote:
>>
>>> Thanks. Knowing that Init() would be called many times per thread really
>>> helps.
>>>
>>> Basically I did something like this:
>>>
>>> void Init(FunctionContext* context, StringVal* result) {
>>>   DebugPrint("Init: current state
>>> 0x%llx", reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>>>   auto ptr = context->allocate(4);
>>>   DebugPrint("Init: store
>>> 0x%llx", reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>>>   context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr);
>>> }
>>> And just nop in the update, serialize, merge, and finalize.
>>>
>>> The DebugPrint is printing to a temporary file with timestamp and thread
>>> id with syscall(SYS_gettid)
>>> The output in the file looks like:
>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xf474008
>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7d008
>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c008
>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xe9f6008
>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xe9f7008
>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7d008
>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7d0c0
>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xf475008
>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c008
>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c0c0
>>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xff28008
>>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c0c0
>>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c178
>>> 2020-06-05 16:13:07[26318]: Init: current state 0xff28008
>>>
>>> It does seem that the same thread is calling Init many times, with both
>>> same FunctionStates and different FunctionStates. In other words, it seems
>>> that these Init() calls are grouped, where different groups have different
>>> THREAD_LOCAL FunctionState storage and calls within the same group share
>>> the same THREAD_LOCAL FunctionState storage.
>>>
>>> Does my observation make any sense?
>>>
>>> On Fri, Jun 5, 2020 at 2:55 PM Tim Armstrong <ta...@cloudera.com>
>>> wrote:
>>>
>>>> I think it would be easier to understand what you're seeing if you
>>>> provided an example of what the code for your aggregate function looks
>>>> like. If you call SetFunctionState(THREAD_LOCAL), I don't see a way that
>>>> the pointer you set would be returned from GetFunctionState(THREAD_LOCAL)
>>>> in a different thread.
>>>>
>>>> Init() is called for every aggregate tuple, so it can be called many
>>>> times per thread for aggregations with a grouping key.
>>>>
>>>> Setting the FRAGMENT_LOCAL state only really makes sense for UDFs when
>>>> Prepare(FRAGMENT_LOCAL) is called. After that the state is copied to any
>>>> thread-local FunctionContexts. Calling SetFunctionState(FRAGMENT_LOCAL)
>>>> later on is only going to modify the thread-local FunctionContext anyway.
>>>>
>>>> On Fri, Jun 5, 2020 at 11:06 AM Shuhao Tan <jo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I recently wrote some UDA and I noticed that in be/src/udf/udf.h the
>>>>> comments for SetFunctionState starts with
>>>>> > Methods for maintaining state across UDF/UDA function calls.
>>>>> I presume GetFunctionState/SetFunctionState should work for UDA as
>>>>> well.
>>>>>
>>>>> I first tried to find an example in the repo, but I found the function
>>>>> is exclusively used by UDF.
>>>>> I then implemented a simple UDA just to test its behaviour. My current
>>>>> findings are:
>>>>> Even when using SetFunctionState(THREAD_LOCAL, some_ptr) only in the
>>>>> Init, other threads (presumably in the same fragment) can still see it with
>>>>> GetFunctionState(THREAD_LOCAL) if their Init were invoked later.
>>>>> Currently it seems that threads in the same fragment were calling Init
>>>>> sequentially without race condition on FunctionState.
>>>>>
>>>>> My questions: Are GetFunctionState/SetFunctionState well-defined in
>>>>> UDA?
>>>>> If so, what are the semantics and execution guarantees? How does
>>>>> passing different FunctionStateScope change the behavior? Is it guaranteed
>>>>> thread-safe?
>>>>>
>>>>> Thanks.
>>>>>
>>>>

Re: Are GetFunctionState/SetFunctionState well-defined in UDA?

Posted by Tim Armstrong <ta...@cloudera.com>.
I think what you're seeing is expected.

There is a different 'context' per aggregate function, per aggregation
operator, per thread. So it's expected there are multiple FunctionContext
objects.

Init() initializes the intermediate value, so if you have an aggregation
with a group by, it will be called multiple times for each FunctionContext.

On Fri, Jun 5, 2020 at 12:11 PM Shuhao Tan <jo...@gmail.com> wrote:

> Sorry for a typo in the code
> It should be
> void Init(FunctionContext* context, StringVal* result) {
>   DebugPrint("Init: current state
> 0x%llx", reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>   auto ptr = context->allocate(4);
>   context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr);
>   DebugPrint("Init: store 0x%llx", ptr);
> }
>
> On Fri, Jun 5, 2020 at 3:09 PM Shuhao Tan <jo...@gmail.com> wrote:
>
>> Thanks. Knowing that Init() would be called many times per thread really
>> helps.
>>
>> Basically I did something like this:
>>
>> void Init(FunctionContext* context, StringVal* result) {
>>   DebugPrint("Init: current state
>> 0x%llx", reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>>   auto ptr = context->allocate(4);
>>   DebugPrint("Init: store
>> 0x%llx", reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>>   context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr);
>> }
>> And just nop in the update, serialize, merge, and finalize.
>>
>> The DebugPrint is printing to a temporary file with timestamp and thread
>> id with syscall(SYS_gettid)
>> The output in the file looks like:
>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>> 2020-06-05 16:13:07[26318]: Init: store 0xf474008
>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>> 2020-06-05 16:13:07[26318]: Init: store 0xea7d008
>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c008
>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>> 2020-06-05 16:13:07[26318]: Init: store 0xe9f6008
>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>> 2020-06-05 16:13:07[26318]: Init: store 0xe9f7008
>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7d008
>> 2020-06-05 16:13:07[26318]: Init: store 0xea7d0c0
>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>> 2020-06-05 16:13:07[26318]: Init: store 0xf475008
>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c008
>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c0c0
>> 2020-06-05 16:13:07[26318]: Init: current state 0x0
>> 2020-06-05 16:13:07[26318]: Init: store 0xff28008
>> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c0c0
>> 2020-06-05 16:13:07[26318]: Init: store 0xea7c178
>> 2020-06-05 16:13:07[26318]: Init: current state 0xff28008
>>
>> It does seem that the same thread is calling Init many times, with both
>> same FunctionStates and different FunctionStates. In other words, it seems
>> that these Init() calls are grouped, where different groups have different
>> THREAD_LOCAL FunctionState storage and calls within the same group share
>> the same THREAD_LOCAL FunctionState storage.
>>
>> Does my observation make any sense?
>>
>> On Fri, Jun 5, 2020 at 2:55 PM Tim Armstrong <ta...@cloudera.com>
>> wrote:
>>
>>> I think it would be easier to understand what you're seeing if you
>>> provided an example of what the code for your aggregate function looks
>>> like. If you call SetFunctionState(THREAD_LOCAL), I don't see a way that
>>> the pointer you set would be returned from GetFunctionState(THREAD_LOCAL)
>>> in a different thread.
>>>
>>> Init() is called for every aggregate tuple, so it can be called many
>>> times per thread for aggregations with a grouping key.
>>>
>>> Setting the FRAGMENT_LOCAL state only really makes sense for UDFs when
>>> Prepare(FRAGMENT_LOCAL) is called. After that the state is copied to any
>>> thread-local FunctionContexts. Calling SetFunctionState(FRAGMENT_LOCAL)
>>> later on is only going to modify the thread-local FunctionContext anyway.
>>>
>>> On Fri, Jun 5, 2020 at 11:06 AM Shuhao Tan <jo...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I recently wrote some UDA and I noticed that in be/src/udf/udf.h the
>>>> comments for SetFunctionState starts with
>>>> > Methods for maintaining state across UDF/UDA function calls.
>>>> I presume GetFunctionState/SetFunctionState should work for UDA as well.
>>>>
>>>> I first tried to find an example in the repo, but I found the function
>>>> is exclusively used by UDF.
>>>> I then implemented a simple UDA just to test its behaviour. My current
>>>> findings are:
>>>> Even when using SetFunctionState(THREAD_LOCAL, some_ptr) only in the
>>>> Init, other threads (presumably in the same fragment) can still see it with
>>>> GetFunctionState(THREAD_LOCAL) if their Init were invoked later.
>>>> Currently it seems that threads in the same fragment were calling Init
>>>> sequentially without race condition on FunctionState.
>>>>
>>>> My questions: Are GetFunctionState/SetFunctionState well-defined in UDA?
>>>> If so, what are the semantics and execution guarantees? How does
>>>> passing different FunctionStateScope change the behavior? Is it guaranteed
>>>> thread-safe?
>>>>
>>>> Thanks.
>>>>
>>>

Re: Are GetFunctionState/SetFunctionState well-defined in UDA?

Posted by Shuhao Tan <jo...@gmail.com>.
Sorry for a typo in the code
It should be
void Init(FunctionContext* context, StringVal* result) {
  DebugPrint("Init: current state
0x%llx", reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
  auto ptr = context->allocate(4);
  context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr);
  DebugPrint("Init: store 0x%llx", ptr);
}

On Fri, Jun 5, 2020 at 3:09 PM Shuhao Tan <jo...@gmail.com> wrote:

> Thanks. Knowing that Init() would be called many times per thread really
> helps.
>
> Basically I did something like this:
>
> void Init(FunctionContext* context, StringVal* result) {
>   DebugPrint("Init: current state
> 0x%llx", reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>   auto ptr = context->allocate(4);
>   DebugPrint("Init: store
> 0x%llx", reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
>   context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr);
> }
> And just nop in the update, serialize, merge, and finalize.
>
> The DebugPrint is printing to a temporary file with timestamp and thread
> id with syscall(SYS_gettid)
> The output in the file looks like:
> 2020-06-05 16:13:07[26318]: Init: current state 0x0
> 2020-06-05 16:13:07[26318]: Init: store 0xf474008
> 2020-06-05 16:13:07[26318]: Init: current state 0x0
> 2020-06-05 16:13:07[26318]: Init: store 0xea7d008
> 2020-06-05 16:13:07[26318]: Init: current state 0x0
> 2020-06-05 16:13:07[26318]: Init: store 0xea7c008
> 2020-06-05 16:13:07[26318]: Init: current state 0x0
> 2020-06-05 16:13:07[26318]: Init: store 0xe9f6008
> 2020-06-05 16:13:07[26318]: Init: current state 0x0
> 2020-06-05 16:13:07[26318]: Init: store 0xe9f7008
> 2020-06-05 16:13:07[26318]: Init: current state 0xea7d008
> 2020-06-05 16:13:07[26318]: Init: store 0xea7d0c0
> 2020-06-05 16:13:07[26318]: Init: current state 0x0
> 2020-06-05 16:13:07[26318]: Init: store 0xf475008
> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c008
> 2020-06-05 16:13:07[26318]: Init: store 0xea7c0c0
> 2020-06-05 16:13:07[26318]: Init: current state 0x0
> 2020-06-05 16:13:07[26318]: Init: store 0xff28008
> 2020-06-05 16:13:07[26318]: Init: current state 0xea7c0c0
> 2020-06-05 16:13:07[26318]: Init: store 0xea7c178
> 2020-06-05 16:13:07[26318]: Init: current state 0xff28008
>
> It does seem that the same thread is calling Init many times, with both
> same FunctionStates and different FunctionStates. In other words, it seems
> that these Init() calls are grouped, where different groups have different
> THREAD_LOCAL FunctionState storage and calls within the same group share
> the same THREAD_LOCAL FunctionState storage.
>
> Does my observation make any sense?
>
> On Fri, Jun 5, 2020 at 2:55 PM Tim Armstrong <ta...@cloudera.com>
> wrote:
>
>> I think it would be easier to understand what you're seeing if you
>> provided an example of what the code for your aggregate function looks
>> like. If you call SetFunctionState(THREAD_LOCAL), I don't see a way that
>> the pointer you set would be returned from GetFunctionState(THREAD_LOCAL)
>> in a different thread.
>>
>> Init() is called for every aggregate tuple, so it can be called many
>> times per thread for aggregations with a grouping key.
>>
>> Setting the FRAGMENT_LOCAL state only really makes sense for UDFs when
>> Prepare(FRAGMENT_LOCAL) is called. After that the state is copied to any
>> thread-local FunctionContexts. Calling SetFunctionState(FRAGMENT_LOCAL)
>> later on is only going to modify the thread-local FunctionContext anyway.
>>
>> On Fri, Jun 5, 2020 at 11:06 AM Shuhao Tan <jo...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I recently wrote some UDA and I noticed that in be/src/udf/udf.h the
>>> comments for SetFunctionState starts with
>>> > Methods for maintaining state across UDF/UDA function calls.
>>> I presume GetFunctionState/SetFunctionState should work for UDA as well.
>>>
>>> I first tried to find an example in the repo, but I found the function
>>> is exclusively used by UDF.
>>> I then implemented a simple UDA just to test its behaviour. My current
>>> findings are:
>>> Even when using SetFunctionState(THREAD_LOCAL, some_ptr) only in the
>>> Init, other threads (presumably in the same fragment) can still see it with
>>> GetFunctionState(THREAD_LOCAL) if their Init were invoked later.
>>> Currently it seems that threads in the same fragment were calling Init
>>> sequentially without race condition on FunctionState.
>>>
>>> My questions: Are GetFunctionState/SetFunctionState well-defined in UDA?
>>> If so, what are the semantics and execution guarantees? How does passing
>>> different FunctionStateScope change the behavior? Is it guaranteed
>>> thread-safe?
>>>
>>> Thanks.
>>>
>>

Re: Are GetFunctionState/SetFunctionState well-defined in UDA?

Posted by Shuhao Tan <jo...@gmail.com>.
Thanks. Knowing that Init() would be called many times per thread really
helps.

Basically I did something like this:

void Init(FunctionContext* context, StringVal* result) {
  DebugPrint("Init: current state
0x%llx", reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
  auto ptr = context->allocate(4);
  DebugPrint("Init: store
0x%llx", reinterpret_cast<std::size_t>(context->GetFunctionState(FunctionContext::THREAD_LOCAL)));
  context->SetFunctionState(FunctionContext::THREAD_LOCAL, ptr);
}
And just nop in the update, serialize, merge, and finalize.

The DebugPrint is printing to a temporary file with timestamp and thread id
with syscall(SYS_gettid)
The output in the file looks like:
2020-06-05 16:13:07[26318]: Init: current state 0x0
2020-06-05 16:13:07[26318]: Init: store 0xf474008
2020-06-05 16:13:07[26318]: Init: current state 0x0
2020-06-05 16:13:07[26318]: Init: store 0xea7d008
2020-06-05 16:13:07[26318]: Init: current state 0x0
2020-06-05 16:13:07[26318]: Init: store 0xea7c008
2020-06-05 16:13:07[26318]: Init: current state 0x0
2020-06-05 16:13:07[26318]: Init: store 0xe9f6008
2020-06-05 16:13:07[26318]: Init: current state 0x0
2020-06-05 16:13:07[26318]: Init: store 0xe9f7008
2020-06-05 16:13:07[26318]: Init: current state 0xea7d008
2020-06-05 16:13:07[26318]: Init: store 0xea7d0c0
2020-06-05 16:13:07[26318]: Init: current state 0x0
2020-06-05 16:13:07[26318]: Init: store 0xf475008
2020-06-05 16:13:07[26318]: Init: current state 0xea7c008
2020-06-05 16:13:07[26318]: Init: store 0xea7c0c0
2020-06-05 16:13:07[26318]: Init: current state 0x0
2020-06-05 16:13:07[26318]: Init: store 0xff28008
2020-06-05 16:13:07[26318]: Init: current state 0xea7c0c0
2020-06-05 16:13:07[26318]: Init: store 0xea7c178
2020-06-05 16:13:07[26318]: Init: current state 0xff28008

It does seem that the same thread is calling Init many times, with both
same FunctionStates and different FunctionStates. In other words, it seems
that these Init() calls are grouped, where different groups have different
THREAD_LOCAL FunctionState storage and calls within the same group share
the same THREAD_LOCAL FunctionState storage.

Does my observation make any sense?

On Fri, Jun 5, 2020 at 2:55 PM Tim Armstrong <ta...@cloudera.com>
wrote:

> I think it would be easier to understand what you're seeing if you
> provided an example of what the code for your aggregate function looks
> like. If you call SetFunctionState(THREAD_LOCAL), I don't see a way that
> the pointer you set would be returned from GetFunctionState(THREAD_LOCAL)
> in a different thread.
>
> Init() is called for every aggregate tuple, so it can be called many times
> per thread for aggregations with a grouping key.
>
> Setting the FRAGMENT_LOCAL state only really makes sense for UDFs when
> Prepare(FRAGMENT_LOCAL) is called. After that the state is copied to any
> thread-local FunctionContexts. Calling SetFunctionState(FRAGMENT_LOCAL)
> later on is only going to modify the thread-local FunctionContext anyway.
>
> On Fri, Jun 5, 2020 at 11:06 AM Shuhao Tan <jo...@gmail.com> wrote:
>
>> Hi all,
>>
>> I recently wrote some UDA and I noticed that in be/src/udf/udf.h the
>> comments for SetFunctionState starts with
>> > Methods for maintaining state across UDF/UDA function calls.
>> I presume GetFunctionState/SetFunctionState should work for UDA as well.
>>
>> I first tried to find an example in the repo, but I found the function is
>> exclusively used by UDF.
>> I then implemented a simple UDA just to test its behaviour. My current
>> findings are:
>> Even when using SetFunctionState(THREAD_LOCAL, some_ptr) only in the
>> Init, other threads (presumably in the same fragment) can still see it with
>> GetFunctionState(THREAD_LOCAL) if their Init were invoked later.
>> Currently it seems that threads in the same fragment were calling Init
>> sequentially without race condition on FunctionState.
>>
>> My questions: Are GetFunctionState/SetFunctionState well-defined in UDA?
>> If so, what are the semantics and execution guarantees? How does passing
>> different FunctionStateScope change the behavior? Is it guaranteed
>> thread-safe?
>>
>> Thanks.
>>
>

Re: Are GetFunctionState/SetFunctionState well-defined in UDA?

Posted by Tim Armstrong <ta...@cloudera.com>.
I think it would be easier to understand what you're seeing if you provided
an example of what the code for your aggregate function looks like. If you
call SetFunctionState(THREAD_LOCAL), I don't see a way that the pointer you
set would be returned from GetFunctionState(THREAD_LOCAL) in a different
thread.

Init() is called for every aggregate tuple, so it can be called many times
per thread for aggregations with a grouping key.

Setting the FRAGMENT_LOCAL state only really makes sense for UDFs when
Prepare(FRAGMENT_LOCAL) is called. After that the state is copied to any
thread-local FunctionContexts. Calling SetFunctionState(FRAGMENT_LOCAL)
later on is only going to modify the thread-local FunctionContext anyway.

On Fri, Jun 5, 2020 at 11:06 AM Shuhao Tan <jo...@gmail.com> wrote:

> Hi all,
>
> I recently wrote some UDA and I noticed that in be/src/udf/udf.h the
> comments for SetFunctionState starts with
> > Methods for maintaining state across UDF/UDA function calls.
> I presume GetFunctionState/SetFunctionState should work for UDA as well.
>
> I first tried to find an example in the repo, but I found the function is
> exclusively used by UDF.
> I then implemented a simple UDA just to test its behaviour. My current
> findings are:
> Even when using SetFunctionState(THREAD_LOCAL, some_ptr) only in the Init,
> other threads (presumably in the same fragment) can still see it with
> GetFunctionState(THREAD_LOCAL) if their Init were invoked later.
> Currently it seems that threads in the same fragment were calling Init
> sequentially without race condition on FunctionState.
>
> My questions: Are GetFunctionState/SetFunctionState well-defined in UDA?
> If so, what are the semantics and execution guarantees? How does passing
> different FunctionStateScope change the behavior? Is it guaranteed
> thread-safe?
>
> Thanks.
>