You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@impala.apache.org by Ravi Kanth <ra...@gmail.com> on 2017/06/20 21:15:31 UTC

Creating Impala UDA

Hi All,

We are using Impala to do various processings in our systems. We have a
requirement recently, wherein we have to handle the updates on the events
i.e, we have an 'e_update' table which has the partial updates received for
various events. The fields that are not updated are being stored as NULL
values.



Ex:


ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
1 0 1 NULL NULL
1 1 2 Hi NULL
1 3 4 Hello Hi
1 2 5 NULL NULL
1 4 NULL NULL Zero



P.S: Please consider Date_time as valid timestamp type values. For easy
understanding, mentioned them as 0,1,2,3,4,5



As seen in the above table, the events have a unique id and as we get
an update to a particular event, we are storing the date_time at which
update has happened and also storing the partial updated values. Apart from
the updated values, the rest are stored as NULL values.



We are planning to mimic inplace updates on the table, so that it would
retrieve the resulting table as follows using the query below: We don't
delete the data.



> SELECT id, current_val(A,date_time) as A, current_val(B,date_time) as B,
current_val(C,date_time) as C from e_update GROUP BY ID;



where, current_val is a custom impala UDA we are planning to implement.
i.e. get* latest non null value for the column.*


ID (Int) A (Int) B (String) C (String)
1 4 Hello Zero





Implemented current_val UDA:

The below code is only for int type inputs:



uda-currentval.h

//This is a sample for retrieving the current value of e_update table
//
void CurrentValueInit(FunctionContext* context, IntVal* val);
void CurrentValueUpdate(FunctionContext* context, const IntVal& input,
const TimestampVal& ts, IntVal* val);
void CurrentValueMerge(FunctionContext* context, const IntVal& src,
IntVal* dst);
IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val);

uda-currentval.cc

// -----------------------------------------------------------------------------------------------
// This is a sample for retrieving the current value of e_update table
//-----------------------------------------------------------------------------------------------
void CurrentValueInit(FunctionContext* context, IntVal* val) {
      val->is_null = false;
      val->val = 0;
}

void CurrentValueUpdate(FunctionContext* context, const IntVal& input,
const TimestampVal& ts, IntVal* val) {
      static TimestampVal* tsTemp;
      tsTemp->date = 0;
      tsTemp->time_of_day = 0;
      if(tsTemp->date==0 && tsTemp->time_of_day==0){
        tsTemp->date = ts.date;
        tsTemp->time_of_day = ts.time_of_day;
        val->val = input.val;
        return;
      }
      if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){
        tsTemp->date = ts.date;
        tsTemp->time_of_day = ts.time_of_day;
        val->val = input.val;
        return;
      }
}

void CurrentValueMerge(FunctionContext* context, const IntVal& src,
IntVal* dst) {
     dst->val += src.val;
}

IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) {
     return val;
}



We are able to build and create an aggregate function in impala, but when
trying to run the select query similar to the one above, it is bringing
down couple of impala deamons and throwing the error below and getting
terminated.



WARNINGS: Cancelled due to unreachable impalad(s):
hadoop102.**.**.**.com:22000





We have impalad running on 14 instances.



Can someone help resolve us this problem and a better way to achieve a
solution for the scenario explained.

Re: Creating Impala UDA

Posted by Ravi Kanth <ra...@gmail.com>.
I see again it's the problem with my version. I need to consider the
intermediate type also to be IntVal type if the return type of finalize is
IntVal

On Thu, Jun 22, 2017 at 22:00 Ravi Kanth <ra...@gmail.com> wrote:

> Thanks for that Matthew. Not to do anything against your suggestion but, I
> just wanted to try impala custom UDA and see if it might give us any
> expected result. If not this, I would learn to write our own custom UDA to
> solve problems other than this.
>
> I am facing issues when trying to return any return type other than
> StringVal in the finalize method of UDA. Its throwing error when
> registering the handlers with impala and creating a function.
>
> ERROR: AnalysisException: Could not find function CurrentValOneUpdate(INT,
> TIMESTAMP) returns INT in:
> hdfs://**************com:8020/dwh/impala/udf/libudasample.so
>
> Check that function name, arguments, and return type are correct.
>
> The above works fine when returning StringVal type.
> Impala version using: Impala Shell v2.5.0-cdh5.7.0
>
> Waiting for your suggestion
>
> Thanks,
> Ravi
>
> On 22 June 2017 at 11:58, Matthew Jacobs <mj...@cloudera.com> wrote:
>
>> I haven't looked at the code, but this isn't going to solve your
>> earlier issue because this is an aggregation function, this will not
>> work for analytic functions. I was saying in an earlier email that we
>> don't expose an interface for registering analytic functions, so you
>> should probably just upgrade Impala. I posted the patch of the
>> analytic function we added to illustrate that it will be difficult and
>> not a matter of registering a UDA.
>>
>> On Thu, Jun 22, 2017 at 11:25 AM, Ravi Kanth <ra...@gmail.com>
>> wrote:
>> > Hi All,
>> >
>> > I wrote the below lines of code to achieve this functionality:
>> >
>> > // Copyright 2012 Cloudera Inc.
>> > //
>> > // Licensed under the Apache License, Version 2.0 (the "License");
>> > // you may not use this file except in compliance with the License.
>> > // You may obtain a copy of the License at
>> > //
>> > // http://www.apache.org/licenses/LICENSE-2.0
>> > //
>> > // Unless required by applicable law or agreed to in writing, software
>> > // distributed under the License is distributed on an "AS IS" BASIS,
>> > // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> > // See the License for the specific language governing permissions and
>> > // limitations under the License.
>> >
>> > #include "uda-sample.h"
>> > #include <assert.h>
>> > #include <sstream>
>> >
>> > using namespace impala_udf;
>> > using namespace std;
>> >
>> >
>> > StringVal ToStringVal(FunctionContext* context, const IntVal& val) {
>> >   stringstream ss;
>> >   ss << val.val;
>> >   string str = ss.str();
>> >   StringVal string_val(context, str.size());
>> >   memcpy(string_val.ptr, str.c_str(), str.size());
>> >   return string_val;
>> > }
>> >
>> > //
>> >
>> ---------------------------------------------------------------------------------------
>> > // // This is an aggregate function for retrieving the latest non-null
>> value
>> > for e_update table
>> > // //
>> >
>> ---------------------------------------------------------------------------------------
>> >
>> > struct CurrentValStruct {
>> >   IntVal value;
>> >   TimestampVal tsTemp;
>> > };
>> >
>> > // Initialize the StringVal intermediate to a zero'd AvgStruct
>> > void CurrentValInit(FunctionContext* context, StringVal* val) {
>> >   val->is_null = false;
>> >   val->len = sizeof(CurrentValStruct);
>> >   val->ptr = context->Allocate(val->len);
>> >   memset(val->ptr, 0, val->len);
>> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr);
>> >   cur->value.is_null = false;
>> >   cur->tsTemp.is_null = false;
>> > }
>> >
>> > void CurrentValUpdate(FunctionContext* context, const IntVal& input,
>> const
>> > TimestampVal& ts, StringVal* val) {
>> >   assert(!val->is_null);
>> >   assert(val->len == sizeof(CurrentValStruct));
>> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr);
>> >   //checking if the incoming input value is null
>> >   if(!input.is_null){
>> >     if(ts.date >= cur->tsTemp.date && ts.time_of_day >
>> > cur->tsTemp.time_of_day){
>> >     cur->value = input.val;
>> >       cur->tsTemp.date = ts.date;
>> >       cur->tsTemp.time_of_day = ts.time_of_day;
>> >     }
>> >   }
>> > }
>> >
>> > void CurrentValMerge(FunctionContext* context, const StringVal& src,
>> > StringVal* dst) {
>> >   if (src.is_null) return;
>> >   const CurrentValStruct* src_cur = reinterpret_cast<const
>> > CurrentValStruct*>(src.ptr);
>> >   CurrentValStruct* dst_cur =
>> reinterpret_cast<CurrentValStruct*>(dst->ptr);
>> >   if(dst_cur->tsTemp.is_null){
>> >     dst_cur->value = src_cur->value;
>> >     dst_cur->tsTemp.date = src_cur->tsTemp.date;
>> >     dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day;
>> >   dst_cur->tsTemp.is_null = false;
>> >   dst_cur->value.is_null = false;
>> >  }
>> >   else{
>> >     if(src_cur->tsTemp.date >= dst_cur->tsTemp.date &&
>> > src_cur->tsTemp.time_of_day > dst_cur->tsTemp.time_of_day){
>> >     dst_cur->value = src_cur->value;
>> >     dst_cur->tsTemp.date = src_cur->tsTemp.date;
>> >       dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day;
>> >     }
>> >   }
>> > }
>> >
>> > StringVal CurrentValSerialize(FunctionContext* context, const StringVal&
>> > val) {
>> >   assert(!val.is_null);
>> >   StringVal result(context, val.len);
>> >   memcpy(result.ptr, val.ptr, val.len);
>> >   context->Free(val.ptr);
>> >   return result;
>> > }
>> >
>> > StringVal CurrentValFinalize(FunctionContext* context, const StringVal&
>> val)
>> > {
>> >   //IntVal intResult;
>> >   assert(!val.is_null);
>> >   assert(val.len == sizeof(CurrentValStruct));
>> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
>> >   StringVal result;
>> >   if (cur->value == 0) {
>> >     result = StringVal::null();
>> >     //intResult = 0;
>> >   } else {
>> >     // intResult = cur->value.val;
>> >     // Copies the result to memory owned by Impala
>> >     result = ToStringVal(context, cur->value.val);
>> > //  intResult = atoi(result.c_str());
>> >   //  std::istringstream(result) >> intResult;
>> >   }
>> >   context->Free(val.ptr);
>> >   return result;
>> > }
>> >
>> > Queries:
>> >
>> > create aggregate function current_val(int,timestamp) returns string
>> location
>> > '/impala/udf/libudasample.so' init_fn='CurrentValInit'
>> > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
>> > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize';
>> >
>> >
>> > select id, current_val(a,date_time) as a from udf_test GROUP BY id;
>> >
>> >
>> > The above is working fine and I am able to achieve my requirement. But,
>> is
>> > there any possibility that we can return an IntVal type rather than
>> > StringVal type? If so where can I make the changes?
>> >
>> > I tried changing the below:
>> >
>> > IntVal CurrentValSerialize(FunctionContext* context, const StringVal&
>> val) {
>> >
>> >   assert(!val.is_null);
>> >
>> >   StringVal result(context, val.len);
>> >
>> >   memcpy(result.ptr, val.ptr, val.len);
>> >
>> >   context->Free(val.ptr);
>> >
>> >   IntVal intResult;
>> >
>> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
>> >
>> >   intResult = cur->value.val;
>> >
>> >   return intResult;
>> >
>> > }
>> >
>> >
>> > IntVal CurrentValFinalize(FunctionContext* context, const StringVal&
>> val) {
>> >
>> >   IntVal intResult;
>> >
>> >   assert(!val.is_null);
>> >
>> >   assert(val.len == sizeof(CurrentValStruct));
>> >
>> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
>> >
>> >   //StringVal result;
>> >
>> >   if (cur->value == 0) {
>> >
>> >     //result = StringVal::null();
>> >
>> >     intResult = 0;
>> >
>> >   } else {
>> >
>> >     intResult = cur->value.val;
>> >
>> >     // Copies the result to memory owned by Impala
>> >
>> >     //result = ToStringVal(context, cur->value.val);
>> >
>> > //  intResult = atoi(result.c_str());
>> >
>> >   //  std::istringstream(result) >> intResult;
>> >
>> >   }
>> >
>> >   context->Free(val.ptr);
>> >
>> >   return intResult;
>> >
>> > }
>> >
>> >
>> > But, when trying to create aggregate function I am facing,
>> >
>> > create aggregate function current_val(int,timestamp) returns int
>> location
>> > '/impala/udf/libudasample.so' init_fn='CurrentValInit'
>> > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
>> > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize';
>> >
>> >
>> > Query: create aggregate function current_val(int,timestamp) returns int
>> > location '/dwh/impala/udf/libudasample.so' init_fn='CurrentValInit'
>> > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
>> > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize'
>> >
>> >
>> > ERROR: AnalysisException: Could not find function CurrentValUpdate(INT,
>> > TIMESTAMP) returns INT in:
>> > hdfs://***************:8020/impala/udf/libudasample.so
>> >
>> > Check that function name, arguments, and return type are correct.
>> >
>> >
>> > I changed the header file function definition also accordingly. Can
>> someone
>> > suggest if I am missing something here?
>> >
>> > Thanks,
>> > Ravi
>> >
>> > On 21 June 2017 at 13:42, Ravi Kanth <ra...@gmail.com> wrote:
>> >>
>> >> Thanks for the suggestion Matthew. Let me look into the patch. I am
>> >> currently working on building a custom UDA. Hopefully the information
>> you
>> >> provided and the discussion we had might be useful to me.
>> >>
>> >> On 21 June 2017 at 13:39, Matthew Jacobs <mj...@cloudera.com> wrote:
>> >>>
>> >>> I'd strongly recommend the latter (upgrading). We don't really expose
>> >>> the analytic function interface, so you'd end up writing an Impala
>> >>> patch, and analytic functions are particularly tricky.
>> >>>
>> >>> Here's Thomas' patch to add 'ignore nulls' in first/last value:
>> >>> https://gerrit.cloudera.org/#/c/3328/
>> >>>
>> >>> On Wed, Jun 21, 2017 at 1:08 PM, Ravi Kanth <ra...@gmail.com>
>> >>> wrote:
>> >>> > Thanks All. I will think of a possible solution either by
>> implementing
>> >>> > a
>> >>> > Custom UDA or would update the version.
>> >>> >
>> >>> > On Wed, Jun 21, 2017 at 13:04 Thomas Tauber-Marshall
>> >>> > <tm...@cloudera.com> wrote:
>> >>> >>
>> >>> >> On Wed, Jun 21, 2017 at 2:33 PM Ravi Kanth <
>> ravikanth.4b0@gmail.com>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Ya. I agree with you Thomas. Probably that's what I'm doing wrong.
>> >>> >>>
>> >>> >>> Unfortunately, as mentioned the version of impala we are using I
>> >>> >>> belive
>> >>> >>> it doesn't support ignore nulls.
>> >>> >>>
>> >>> >>> But, my question is would last_value function retrieve a latest
>> not
>> >>> >>> null
>> >>> >>> value irrespective of using ignore nulls?
>> >>> >>
>> >>> >>
>> >>> >> Not sure I follow - if you use last_value without ignore nulls,
>> you'll
>> >>> >> get
>> >>> >> the latest value taking all values into consideration, which may or
>> >>> >> may not
>> >>> >> be null.
>> >>> >>
>> >>> >>>
>> >>> >>>
>> >>> >>> Ravi
>> >>> >>>
>> >>> >>> On Wed, Jun 21, 2017 at 12:23 Matthew Jacobs <mj...@cloudera.com>
>> wrote:
>> >>> >>>>
>> >>> >>>> Ah I think Thomas is right. I read the expected results and the
>> >>> >>>> query
>> >>> >>>> too quickly, so my comment about the asc/desc is probably wrong.
>> >>> >>>> Clearly my point about analytic functions being tricky holds
>> true :)
>> >>> >>>>
>> >>> >>>> On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall
>> >>> >>>> <tm...@cloudera.com> wrote:
>> >>> >>>> >
>> >>> >>>> >
>> >>> >>>> > On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth
>> >>> >>>> > <ra...@gmail.com>
>> >>> >>>> > wrote:
>> >>> >>>> >>
>> >>> >>>> >> Thomas,
>> >>> >>>> >>
>> >>> >>>> >> The version of Impala we are using is 2.5.0 with CDH 5.7.0
>> and I
>> >>> >>>> >> see
>> >>> >>>> >> ignore nulls has been added in Impala 2.7.0. And, does adding
>> >>> >>>> >> ignore
>> >>> >>>> >> nulls
>> >>> >>>> >> would make a big difference in the expected result?
>> >>> >>>> >
>> >>> >>>> >
>> >>> >>>> > That's too bad. I think that 'ignore nulls' would give you what
>> >>> >>>> > you
>> >>> >>>> > want -
>> >>> >>>> > the problem with the query that you posted is that it
>> eliminates
>> >>> >>>> > rows
>> >>> >>>> > that
>> >>> >>>> > don't match the where clause, so for example the row with
>> "Zero"
>> >>> >>>> > in it
>> >>> >>>> > is
>> >>> >>>> > eliminated because it is filtered out by the "where a is not
>> >>> >>>> > null",
>> >>> >>>> > whereas
>> >>> >>>> > "ignore nulls" only affects the values that could be returned
>> by
>> >>> >>>> > the
>> >>> >>>> > specific analytic function that the ignore is applied to.
>> >>> >>>> >
>> >>> >>>> >>
>> >>> >>>> >>
>> >>> >>>> >> Ravi
>> >>> >>>> >>
>> >>> >>>> >> On 21 June 2017 at 11:20, Thomas Tauber-Marshall
>> >>> >>>> >> <tm...@cloudera.com>
>> >>> >>>> >> wrote:
>> >>> >>>> >>>
>> >>> >>>> >>> Ravi,
>> >>> >>>> >>>
>> >>> >>>> >>> Instead of using the "where ... is not null", have you tried
>> >>> >>>> >>> 'last_value(... ignore nulls)'
>> >>> >>>> >>>
>> >>> >>>> >>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth
>> >>> >>>> >>> <ra...@gmail.com>
>> >>> >>>> >>> wrote:
>> >>> >>>> >>>>
>> >>> >>>> >>>> Antoni,
>> >>> >>>> >>>>
>> >>> >>>> >>>> The problem in using last_value function() as far as I
>> observed
>> >>> >>>> >>>> is,
>> >>> >>>> >>>> if I
>> >>> >>>> >>>> use it on multiple columns in a single query, its not
>> >>> >>>> >>>> retrieving
>> >>> >>>> >>>> results as
>> >>> >>>> >>>> expected.
>> >>> >>>> >>>>
>> >>> >>>> >>>>  Input:
>> >>> >>>> >>>>
>> >>> >>>> >>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String)
>> >>> >>>> >>>> 101NULLNULL
>> >>> >>>> >>>> 112HiNULL
>> >>> >>>> >>>> 134HelloHi
>> >>> >>>> >>>> 125NULLNULL
>> >>> >>>> >>>> 14NULLNULLZero
>> >>> >>>> >>>>
>> >>> >>>> >>>> Expected Output:
>> >>> >>>> >>>>
>> >>> >>>> >>>>
>> >>> >>>> >>>>
>> >>> >>>> >>>> ID (Int)A (Int)B (String)C (String)
>> >>> >>>> >>>> 14HelloZero
>> >>> >>>> >>>>
>> >>> >>>> >>>>
>> >>> >>>> >>>> Query executed:
>> >>> >>>> >>>>
>> >>> >>>> >>>> select id, last_value(a) over(partition by id order by
>> >>> >>>> >>>> date_time
>> >>> >>>> >>>> desc)
>> >>> >>>> >>>> as a, last_value(b) over(partition by id order by date_time
>> >>> >>>> >>>> desc)
>> >>> >>>> >>>> as b,
>> >>> >>>> >>>> last_value(c) over(partition by id order by date_time desc)
>> as
>> >>> >>>> >>>> c
>> >>> >>>> >>>> from
>> >>> >>>> >>>> udf_test where a is not null and b is not null and c is not
>> >>> >>>> >>>> null;
>> >>> >>>> >>>>
>> >>> >>>> >>>>
>> >>> >>>> >>>>
>> >>> >>>> >>>> Output I am getting:
>> >>> >>>> >>>>
>> >>> >>>> >>>> +----+---+-------+----+
>> >>> >>>> >>>>
>> >>> >>>> >>>> | id | a | b     | c  |
>> >>> >>>> >>>>
>> >>> >>>> >>>> +----+---+-------+----+
>> >>> >>>> >>>>
>> >>> >>>> >>>> | 1  | 4 | Hello | Hi ||
>> >>> >>>> >>>>
>> >>> >>>> >>>> +----+---+-------+----+
>> >>> >>>> >>>>
>> >>> >>>> >>>>
>> >>> >>>> >>>> Hopefully, I am clear with the problem above.
>> >>> >>>> >>>>
>> >>> >>>> >>>> Thanks,
>> >>> >>>> >>>> Ravi
>> >>> >>>> >>>>
>> >>> >>>> >>>> On 20 June 2017 at 22:05, Ravi Kanth <
>> ravikanth.4b0@gmail.com>
>> >>> >>>> >>>> wrote:
>> >>> >>>> >>>>>
>> >>> >>>> >>>>> Antoni,
>> >>> >>>> >>>>>
>> >>> >>>> >>>>> Thanks for the suggestion. Let me have a look at it and
>> >>> >>>> >>>>> hopefully
>> >>> >>>> >>>>> we
>> >>> >>>> >>>>> can use it in our use case.
>> >>> >>>> >>>>>
>> >>> >>>> >>>>> Thanks,
>> >>> >>>> >>>>> Ravi
>> >>> >>>> >>>>>
>> >>> >>>> >>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov
>> >>> >>>> >>>>> <ai...@vmware.com>
>> >>> >>>> >>>>> wrote:
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> Hi Ravi,
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> I am curious why you are not using already existing
>> >>> >>>> >>>>>> last_value
>> >>> >>>> >>>>>> function in Impala to get "latest non null value for the
>> >>> >>>> >>>>>> column”
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> e.g
>> >>> >>>> >>>>>> last_value(column_a ignore nulls) over(partition by ID
>> order
>> >>> >>>> >>>>>> by
>> >>> >>>> >>>>>> Date_Time)
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> Thanks,
>> >>> >>>> >>>>>> Antoni
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong
>> >>> >>>> >>>>>> <ta...@cloudera.com>
>> >>> >>>> >>>>>> wrote:
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> This was double-posted to
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
>> >>> >>>> >>>>>> also. I'll continue the discussion here.
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> > Can we have the flexibility of declaring the variable
>> >>> >>>> >>>>>> > globally
>> >>> >>>> >>>>>> > in
>> >>> >>>> >>>>>> > UDF? Globally, I mean outside the function?
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> > And, the reason I am declaring a static variable is to
>> >>> >>>> >>>>>> > restore
>> >>> >>>> >>>>>> > the
>> >>> >>>> >>>>>> > value of timestamp for every record so that I can
>> perform a
>> >>> >>>> >>>>>> > comparison of
>> >>> >>>> >>>>>> > the timestamps. Is there an alternative approach for
>> this?
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> Updating a global or static variable in a UDAF is
>> guaranteed
>> >>> >>>> >>>>>> not
>> >>> >>>> >>>>>> to do
>> >>> >>>> >>>>>> what you expect - the function can be invoked
>> concurrently by
>> >>> >>>> >>>>>> multiple
>> >>> >>>> >>>>>> threads.
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> It seems like you probably want to store some additional
>> >>> >>>> >>>>>> state in
>> >>> >>>> >>>>>> the
>> >>> >>>> >>>>>> intermediate value. There are some sample UDAs here (see
>> >>> >>>> >>>>>> Avg())
>> >>> >>>> >>>>>> where
>> >>> >>>> >>>>>> additional intermediate state is stored in a StringVal:
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth
>> >>> >>>> >>>>>> <ra...@gmail.com>
>> >>> >>>> >>>>>> wrote:
>> >>> >>>> >>>>>>>
>> >>> >>>> >>>>>>> Thanks Bharath. Can you check if the logic I am
>> implementing
>> >>> >>>> >>>>>>> is
>> >>> >>>> >>>>>>> correct or needed any modification in it as well? I am
>> very
>> >>> >>>> >>>>>>> new
>> >>> >>>> >>>>>>> to Impala
>> >>> >>>> >>>>>>> UDF & C++ and having some hard time figuring out the
>> >>> >>>> >>>>>>> problems.
>> >>> >>>> >>>>>>>
>> >>> >>>> >>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada
>> >>> >>>> >>>>>>> <bh...@cloudera.com> wrote:
>> >>> >>>> >>>>>>>>
>> >>> >>>> >>>>>>>> You need to allocate memory for tsTemp, else it can
>> >>> >>>> >>>>>>>> segfault.
>> >>> >>>> >>>>>>>> That
>> >>> >>>> >>>>>>>> could be the issue here.
>> >>> >>>> >>>>>>>>
>> >>> >>>> >>>>>>>>  static TimestampVal* tsTemp;
>> >>> >>>> >>>>>>>>       tsTemp->date = 0;
>> >>> >>>> >>>>>>>>       tsTemp->time_of_day = 0;
>> >>> >>>> >>>>>>>>
>> >>> >>>> >>>>>>>>
>> >>> >>>> >>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth
>> >>> >>>> >>>>>>>> <ra...@gmail.com> wrote:
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> Hi All,
>> >>> >>>> >>>>>>>>> We are using Impala to do various processings in our
>> >>> >>>> >>>>>>>>> systems.
>> >>> >>>> >>>>>>>>> We
>> >>> >>>> >>>>>>>>> have a requirement recently, wherein we have to handle
>> the
>> >>> >>>> >>>>>>>>> updates on the
>> >>> >>>> >>>>>>>>> events i.e, we have an 'e_update' table which has the
>> >>> >>>> >>>>>>>>> partial
>> >>> >>>> >>>>>>>>> updates
>> >>> >>>> >>>>>>>>> received for various events. The fields that are not
>> >>> >>>> >>>>>>>>> updated
>> >>> >>>> >>>>>>>>> are being
>> >>> >>>> >>>>>>>>> stored as NULL values.
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> Ex:
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C
>> >>> >>>> >>>>>>>>> (String)
>> >>> >>>> >>>>>>>>> 1 0 1 NULL NULL
>> >>> >>>> >>>>>>>>> 1 1 2 Hi NULL
>> >>> >>>> >>>>>>>>> 1 3 4 Hello Hi
>> >>> >>>> >>>>>>>>> 1 2 5 NULL NULL
>> >>> >>>> >>>>>>>>> 1 4 NULL NULL Zero
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> P.S: Please consider Date_time as valid timestamp type
>> >>> >>>> >>>>>>>>> values.
>> >>> >>>> >>>>>>>>> For
>> >>> >>>> >>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> As seen in the above table, the events have a unique id
>> >>> >>>> >>>>>>>>> and as
>> >>> >>>> >>>>>>>>> we
>> >>> >>>> >>>>>>>>> get an update to a particular event, we are storing the
>> >>> >>>> >>>>>>>>> date_time at which
>> >>> >>>> >>>>>>>>> update has happened and also storing the partial
>> updated
>> >>> >>>> >>>>>>>>> values. Apart from
>> >>> >>>> >>>>>>>>> the updated values, the rest are stored as NULL values.
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> We are planning to mimic inplace updates on the table,
>> so
>> >>> >>>> >>>>>>>>> that
>> >>> >>>> >>>>>>>>> it
>> >>> >>>> >>>>>>>>> would retrieve the resulting table as follows using the
>> >>> >>>> >>>>>>>>> query
>> >>> >>>> >>>>>>>>> below: We
>> >>> >>>> >>>>>>>>> don't delete the data.
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> > SELECT id, current_val(A,date_time) as A,
>> >>> >>>> >>>>>>>>> > current_val(B,date_time) as B,
>> current_val(C,date_time)
>> >>> >>>> >>>>>>>>> > as C
>> >>> >>>> >>>>>>>>> > from e_update
>> >>> >>>> >>>>>>>>> > GROUP BY ID;
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> where, current_val is a custom impala UDA we are
>> planning
>> >>> >>>> >>>>>>>>> to
>> >>> >>>> >>>>>>>>> implement. i.e. get latest non null value for the
>> column.
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> ID (Int) A (Int) B (String) C (String)
>> >>> >>>> >>>>>>>>> 1 4 Hello Zero
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> Implemented current_val UDA:
>> >>> >>>> >>>>>>>>> The below code is only for int type inputs:
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> uda-currentval.h
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> //This is a sample for retrieving the current value of
>> >>> >>>> >>>>>>>>> e_update
>> >>> >>>> >>>>>>>>> table
>> >>> >>>> >>>>>>>>> //
>> >>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal*
>> >>> >>>> >>>>>>>>> val);
>> >>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
>> >>> >>>> >>>>>>>>> IntVal&
>> >>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val);
>> >>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const
>> >>> >>>> >>>>>>>>> IntVal&
>> >>> >>>> >>>>>>>>> src,
>> >>> >>>> >>>>>>>>> IntVal* dst);
>> >>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context,
>> >>> >>>> >>>>>>>>> const
>> >>> >>>> >>>>>>>>> IntVal&
>> >>> >>>> >>>>>>>>> val);
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> uda-currentval.cc
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> //
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> -----------------------------------------------------------------------------------------------
>> >>> >>>> >>>>>>>>> // This is a sample for retrieving the current value of
>> >>> >>>> >>>>>>>>> e_update
>> >>> >>>> >>>>>>>>> table
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> //-----------------------------------------------------------------------------------------------
>> >>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal*
>> >>> >>>> >>>>>>>>> val) {
>> >>> >>>> >>>>>>>>>       val->is_null = false;
>> >>> >>>> >>>>>>>>>       val->val = 0;
>> >>> >>>> >>>>>>>>> }
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
>> >>> >>>> >>>>>>>>> IntVal&
>> >>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val) {
>> >>> >>>> >>>>>>>>>       static TimestampVal* tsTemp;
>> >>> >>>> >>>>>>>>>       tsTemp->date = 0;
>> >>> >>>> >>>>>>>>>       tsTemp->time_of_day = 0;
>> >>> >>>> >>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>> >>> >>>> >>>>>>>>>         tsTemp->date = ts.date;
>> >>> >>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>> >>> >>>> >>>>>>>>>         val->val = input.val;
>> >>> >>>> >>>>>>>>>         return;
>> >>> >>>> >>>>>>>>>       }
>> >>> >>>> >>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day >
>> >>> >>>> >>>>>>>>> tsTemp->time_of_day){
>> >>> >>>> >>>>>>>>>         tsTemp->date = ts.date;
>> >>> >>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>> >>> >>>> >>>>>>>>>         val->val = input.val;
>> >>> >>>> >>>>>>>>>         return;
>> >>> >>>> >>>>>>>>>       }
>> >>> >>>> >>>>>>>>> }
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const
>> >>> >>>> >>>>>>>>> IntVal&
>> >>> >>>> >>>>>>>>> src,
>> >>> >>>> >>>>>>>>> IntVal* dst) {
>> >>> >>>> >>>>>>>>>      dst->val += src.val;
>> >>> >>>> >>>>>>>>> }
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context,
>> >>> >>>> >>>>>>>>> const
>> >>> >>>> >>>>>>>>> IntVal&
>> >>> >>>> >>>>>>>>> val) {
>> >>> >>>> >>>>>>>>>      return val;
>> >>> >>>> >>>>>>>>> }
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> We are able to build and create an aggregate function
>> in
>> >>> >>>> >>>>>>>>> impala,
>> >>> >>>> >>>>>>>>> but when trying to run the select query similar to the
>> one
>> >>> >>>> >>>>>>>>> above, it is
>> >>> >>>> >>>>>>>>> bringing down couple of impala deamons and throwing the
>> >>> >>>> >>>>>>>>> error
>> >>> >>>> >>>>>>>>> below and
>> >>> >>>> >>>>>>>>> getting terminated.
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>> >>> >>>> >>>>>>>>> hadoop102.**.**.**.com:22000
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> We have impalad running on 14 instances.
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>>
>> >>> >>>> >>>>>>>>> Can someone help resolve us this problem and a better
>> way
>> >>> >>>> >>>>>>>>> to
>> >>> >>>> >>>>>>>>> achieve a solution for the scenario explained.
>> >>> >>>> >>>>>>>>
>> >>> >>>> >>>>>>>>
>> >>> >>>> >>>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>>>
>> >>> >>>> >>>>
>> >>> >>>> >>
>> >>> >>>> >
>> >>
>> >>
>> >
>>
>
>

Re: Creating Impala UDA

Posted by Ravi Kanth <ra...@gmail.com>.
Thanks for that Matthew. Not to do anything against your suggestion but, I
just wanted to try impala custom UDA and see if it might give us any
expected result. If not this, I would learn to write our own custom UDA to
solve problems other than this.

I am facing issues when trying to return any return type other than
StringVal in the finalize method of UDA. Its throwing error when
registering the handlers with impala and creating a function.

ERROR: AnalysisException: Could not find function CurrentValOneUpdate(INT,
TIMESTAMP) returns INT in:
hdfs://**************com:8020/dwh/impala/udf/libudasample.so

Check that function name, arguments, and return type are correct.

The above works fine when returning StringVal type.
Impala version using: Impala Shell v2.5.0-cdh5.7.0

Waiting for your suggestion

Thanks,
Ravi

On 22 June 2017 at 11:58, Matthew Jacobs <mj...@cloudera.com> wrote:

> I haven't looked at the code, but this isn't going to solve your
> earlier issue because this is an aggregation function, this will not
> work for analytic functions. I was saying in an earlier email that we
> don't expose an interface for registering analytic functions, so you
> should probably just upgrade Impala. I posted the patch of the
> analytic function we added to illustrate that it will be difficult and
> not a matter of registering a UDA.
>
> On Thu, Jun 22, 2017 at 11:25 AM, Ravi Kanth <ra...@gmail.com>
> wrote:
> > Hi All,
> >
> > I wrote the below lines of code to achieve this functionality:
> >
> > // Copyright 2012 Cloudera Inc.
> > //
> > // Licensed under the Apache License, Version 2.0 (the "License");
> > // you may not use this file except in compliance with the License.
> > // You may obtain a copy of the License at
> > //
> > // http://www.apache.org/licenses/LICENSE-2.0
> > //
> > // Unless required by applicable law or agreed to in writing, software
> > // distributed under the License is distributed on an "AS IS" BASIS,
> > // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > // See the License for the specific language governing permissions and
> > // limitations under the License.
> >
> > #include "uda-sample.h"
> > #include <assert.h>
> > #include <sstream>
> >
> > using namespace impala_udf;
> > using namespace std;
> >
> >
> > StringVal ToStringVal(FunctionContext* context, const IntVal& val) {
> >   stringstream ss;
> >   ss << val.val;
> >   string str = ss.str();
> >   StringVal string_val(context, str.size());
> >   memcpy(string_val.ptr, str.c_str(), str.size());
> >   return string_val;
> > }
> >
> > //
> > ------------------------------------------------------------
> ---------------------------
> > // // This is an aggregate function for retrieving the latest non-null
> value
> > for e_update table
> > // //
> > ------------------------------------------------------------
> ---------------------------
> >
> > struct CurrentValStruct {
> >   IntVal value;
> >   TimestampVal tsTemp;
> > };
> >
> > // Initialize the StringVal intermediate to a zero'd AvgStruct
> > void CurrentValInit(FunctionContext* context, StringVal* val) {
> >   val->is_null = false;
> >   val->len = sizeof(CurrentValStruct);
> >   val->ptr = context->Allocate(val->len);
> >   memset(val->ptr, 0, val->len);
> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr);
> >   cur->value.is_null = false;
> >   cur->tsTemp.is_null = false;
> > }
> >
> > void CurrentValUpdate(FunctionContext* context, const IntVal& input,
> const
> > TimestampVal& ts, StringVal* val) {
> >   assert(!val->is_null);
> >   assert(val->len == sizeof(CurrentValStruct));
> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr);
> >   //checking if the incoming input value is null
> >   if(!input.is_null){
> >     if(ts.date >= cur->tsTemp.date && ts.time_of_day >
> > cur->tsTemp.time_of_day){
> >     cur->value = input.val;
> >       cur->tsTemp.date = ts.date;
> >       cur->tsTemp.time_of_day = ts.time_of_day;
> >     }
> >   }
> > }
> >
> > void CurrentValMerge(FunctionContext* context, const StringVal& src,
> > StringVal* dst) {
> >   if (src.is_null) return;
> >   const CurrentValStruct* src_cur = reinterpret_cast<const
> > CurrentValStruct*>(src.ptr);
> >   CurrentValStruct* dst_cur = reinterpret_cast<
> CurrentValStruct*>(dst->ptr);
> >   if(dst_cur->tsTemp.is_null){
> >     dst_cur->value = src_cur->value;
> >     dst_cur->tsTemp.date = src_cur->tsTemp.date;
> >     dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day;
> >   dst_cur->tsTemp.is_null = false;
> >   dst_cur->value.is_null = false;
> >  }
> >   else{
> >     if(src_cur->tsTemp.date >= dst_cur->tsTemp.date &&
> > src_cur->tsTemp.time_of_day > dst_cur->tsTemp.time_of_day){
> >     dst_cur->value = src_cur->value;
> >     dst_cur->tsTemp.date = src_cur->tsTemp.date;
> >       dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day;
> >     }
> >   }
> > }
> >
> > StringVal CurrentValSerialize(FunctionContext* context, const StringVal&
> > val) {
> >   assert(!val.is_null);
> >   StringVal result(context, val.len);
> >   memcpy(result.ptr, val.ptr, val.len);
> >   context->Free(val.ptr);
> >   return result;
> > }
> >
> > StringVal CurrentValFinalize(FunctionContext* context, const StringVal&
> val)
> > {
> >   //IntVal intResult;
> >   assert(!val.is_null);
> >   assert(val.len == sizeof(CurrentValStruct));
> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
> >   StringVal result;
> >   if (cur->value == 0) {
> >     result = StringVal::null();
> >     //intResult = 0;
> >   } else {
> >     // intResult = cur->value.val;
> >     // Copies the result to memory owned by Impala
> >     result = ToStringVal(context, cur->value.val);
> > //  intResult = atoi(result.c_str());
> >   //  std::istringstream(result) >> intResult;
> >   }
> >   context->Free(val.ptr);
> >   return result;
> > }
> >
> > Queries:
> >
> > create aggregate function current_val(int,timestamp) returns string
> location
> > '/impala/udf/libudasample.so' init_fn='CurrentValInit'
> > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
> > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize';
> >
> >
> > select id, current_val(a,date_time) as a from udf_test GROUP BY id;
> >
> >
> > The above is working fine and I am able to achieve my requirement. But,
> is
> > there any possibility that we can return an IntVal type rather than
> > StringVal type? If so where can I make the changes?
> >
> > I tried changing the below:
> >
> > IntVal CurrentValSerialize(FunctionContext* context, const StringVal&
> val) {
> >
> >   assert(!val.is_null);
> >
> >   StringVal result(context, val.len);
> >
> >   memcpy(result.ptr, val.ptr, val.len);
> >
> >   context->Free(val.ptr);
> >
> >   IntVal intResult;
> >
> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
> >
> >   intResult = cur->value.val;
> >
> >   return intResult;
> >
> > }
> >
> >
> > IntVal CurrentValFinalize(FunctionContext* context, const StringVal&
> val) {
> >
> >   IntVal intResult;
> >
> >   assert(!val.is_null);
> >
> >   assert(val.len == sizeof(CurrentValStruct));
> >
> >   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
> >
> >   //StringVal result;
> >
> >   if (cur->value == 0) {
> >
> >     //result = StringVal::null();
> >
> >     intResult = 0;
> >
> >   } else {
> >
> >     intResult = cur->value.val;
> >
> >     // Copies the result to memory owned by Impala
> >
> >     //result = ToStringVal(context, cur->value.val);
> >
> > //  intResult = atoi(result.c_str());
> >
> >   //  std::istringstream(result) >> intResult;
> >
> >   }
> >
> >   context->Free(val.ptr);
> >
> >   return intResult;
> >
> > }
> >
> >
> > But, when trying to create aggregate function I am facing,
> >
> > create aggregate function current_val(int,timestamp) returns int location
> > '/impala/udf/libudasample.so' init_fn='CurrentValInit'
> > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
> > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize';
> >
> >
> > Query: create aggregate function current_val(int,timestamp) returns int
> > location '/dwh/impala/udf/libudasample.so' init_fn='CurrentValInit'
> > update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
> > serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize'
> >
> >
> > ERROR: AnalysisException: Could not find function CurrentValUpdate(INT,
> > TIMESTAMP) returns INT in:
> > hdfs://***************:8020/impala/udf/libudasample.so
> >
> > Check that function name, arguments, and return type are correct.
> >
> >
> > I changed the header file function definition also accordingly. Can
> someone
> > suggest if I am missing something here?
> >
> > Thanks,
> > Ravi
> >
> > On 21 June 2017 at 13:42, Ravi Kanth <ra...@gmail.com> wrote:
> >>
> >> Thanks for the suggestion Matthew. Let me look into the patch. I am
> >> currently working on building a custom UDA. Hopefully the information
> you
> >> provided and the discussion we had might be useful to me.
> >>
> >> On 21 June 2017 at 13:39, Matthew Jacobs <mj...@cloudera.com> wrote:
> >>>
> >>> I'd strongly recommend the latter (upgrading). We don't really expose
> >>> the analytic function interface, so you'd end up writing an Impala
> >>> patch, and analytic functions are particularly tricky.
> >>>
> >>> Here's Thomas' patch to add 'ignore nulls' in first/last value:
> >>> https://gerrit.cloudera.org/#/c/3328/
> >>>
> >>> On Wed, Jun 21, 2017 at 1:08 PM, Ravi Kanth <ra...@gmail.com>
> >>> wrote:
> >>> > Thanks All. I will think of a possible solution either by
> implementing
> >>> > a
> >>> > Custom UDA or would update the version.
> >>> >
> >>> > On Wed, Jun 21, 2017 at 13:04 Thomas Tauber-Marshall
> >>> > <tm...@cloudera.com> wrote:
> >>> >>
> >>> >> On Wed, Jun 21, 2017 at 2:33 PM Ravi Kanth <ravikanth.4b0@gmail.com
> >
> >>> >> wrote:
> >>> >>>
> >>> >>> Ya. I agree with you Thomas. Probably that's what I'm doing wrong.
> >>> >>>
> >>> >>> Unfortunately, as mentioned the version of impala we are using I
> >>> >>> belive
> >>> >>> it doesn't support ignore nulls.
> >>> >>>
> >>> >>> But, my question is would last_value function retrieve a latest not
> >>> >>> null
> >>> >>> value irrespective of using ignore nulls?
> >>> >>
> >>> >>
> >>> >> Not sure I follow - if you use last_value without ignore nulls,
> you'll
> >>> >> get
> >>> >> the latest value taking all values into consideration, which may or
> >>> >> may not
> >>> >> be null.
> >>> >>
> >>> >>>
> >>> >>>
> >>> >>> Ravi
> >>> >>>
> >>> >>> On Wed, Jun 21, 2017 at 12:23 Matthew Jacobs <mj...@cloudera.com>
> wrote:
> >>> >>>>
> >>> >>>> Ah I think Thomas is right. I read the expected results and the
> >>> >>>> query
> >>> >>>> too quickly, so my comment about the asc/desc is probably wrong.
> >>> >>>> Clearly my point about analytic functions being tricky holds true
> :)
> >>> >>>>
> >>> >>>> On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall
> >>> >>>> <tm...@cloudera.com> wrote:
> >>> >>>> >
> >>> >>>> >
> >>> >>>> > On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth
> >>> >>>> > <ra...@gmail.com>
> >>> >>>> > wrote:
> >>> >>>> >>
> >>> >>>> >> Thomas,
> >>> >>>> >>
> >>> >>>> >> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and
> I
> >>> >>>> >> see
> >>> >>>> >> ignore nulls has been added in Impala 2.7.0. And, does adding
> >>> >>>> >> ignore
> >>> >>>> >> nulls
> >>> >>>> >> would make a big difference in the expected result?
> >>> >>>> >
> >>> >>>> >
> >>> >>>> > That's too bad. I think that 'ignore nulls' would give you what
> >>> >>>> > you
> >>> >>>> > want -
> >>> >>>> > the problem with the query that you posted is that it eliminates
> >>> >>>> > rows
> >>> >>>> > that
> >>> >>>> > don't match the where clause, so for example the row with "Zero"
> >>> >>>> > in it
> >>> >>>> > is
> >>> >>>> > eliminated because it is filtered out by the "where a is not
> >>> >>>> > null",
> >>> >>>> > whereas
> >>> >>>> > "ignore nulls" only affects the values that could be returned by
> >>> >>>> > the
> >>> >>>> > specific analytic function that the ignore is applied to.
> >>> >>>> >
> >>> >>>> >>
> >>> >>>> >>
> >>> >>>> >> Ravi
> >>> >>>> >>
> >>> >>>> >> On 21 June 2017 at 11:20, Thomas Tauber-Marshall
> >>> >>>> >> <tm...@cloudera.com>
> >>> >>>> >> wrote:
> >>> >>>> >>>
> >>> >>>> >>> Ravi,
> >>> >>>> >>>
> >>> >>>> >>> Instead of using the "where ... is not null", have you tried
> >>> >>>> >>> 'last_value(... ignore nulls)'
> >>> >>>> >>>
> >>> >>>> >>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth
> >>> >>>> >>> <ra...@gmail.com>
> >>> >>>> >>> wrote:
> >>> >>>> >>>>
> >>> >>>> >>>> Antoni,
> >>> >>>> >>>>
> >>> >>>> >>>> The problem in using last_value function() as far as I
> observed
> >>> >>>> >>>> is,
> >>> >>>> >>>> if I
> >>> >>>> >>>> use it on multiple columns in a single query, its not
> >>> >>>> >>>> retrieving
> >>> >>>> >>>> results as
> >>> >>>> >>>> expected.
> >>> >>>> >>>>
> >>> >>>> >>>>  Input:
> >>> >>>> >>>>
> >>> >>>> >>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String)
> >>> >>>> >>>> 101NULLNULL
> >>> >>>> >>>> 112HiNULL
> >>> >>>> >>>> 134HelloHi
> >>> >>>> >>>> 125NULLNULL
> >>> >>>> >>>> 14NULLNULLZero
> >>> >>>> >>>>
> >>> >>>> >>>> Expected Output:
> >>> >>>> >>>>
> >>> >>>> >>>>
> >>> >>>> >>>>
> >>> >>>> >>>> ID (Int)A (Int)B (String)C (String)
> >>> >>>> >>>> 14HelloZero
> >>> >>>> >>>>
> >>> >>>> >>>>
> >>> >>>> >>>> Query executed:
> >>> >>>> >>>>
> >>> >>>> >>>> select id, last_value(a) over(partition by id order by
> >>> >>>> >>>> date_time
> >>> >>>> >>>> desc)
> >>> >>>> >>>> as a, last_value(b) over(partition by id order by date_time
> >>> >>>> >>>> desc)
> >>> >>>> >>>> as b,
> >>> >>>> >>>> last_value(c) over(partition by id order by date_time desc)
> as
> >>> >>>> >>>> c
> >>> >>>> >>>> from
> >>> >>>> >>>> udf_test where a is not null and b is not null and c is not
> >>> >>>> >>>> null;
> >>> >>>> >>>>
> >>> >>>> >>>>
> >>> >>>> >>>>
> >>> >>>> >>>> Output I am getting:
> >>> >>>> >>>>
> >>> >>>> >>>> +----+---+-------+----+
> >>> >>>> >>>>
> >>> >>>> >>>> | id | a | b     | c  |
> >>> >>>> >>>>
> >>> >>>> >>>> +----+---+-------+----+
> >>> >>>> >>>>
> >>> >>>> >>>> | 1  | 4 | Hello | Hi ||
> >>> >>>> >>>>
> >>> >>>> >>>> +----+---+-------+----+
> >>> >>>> >>>>
> >>> >>>> >>>>
> >>> >>>> >>>> Hopefully, I am clear with the problem above.
> >>> >>>> >>>>
> >>> >>>> >>>> Thanks,
> >>> >>>> >>>> Ravi
> >>> >>>> >>>>
> >>> >>>> >>>> On 20 June 2017 at 22:05, Ravi Kanth <
> ravikanth.4b0@gmail.com>
> >>> >>>> >>>> wrote:
> >>> >>>> >>>>>
> >>> >>>> >>>>> Antoni,
> >>> >>>> >>>>>
> >>> >>>> >>>>> Thanks for the suggestion. Let me have a look at it and
> >>> >>>> >>>>> hopefully
> >>> >>>> >>>>> we
> >>> >>>> >>>>> can use it in our use case.
> >>> >>>> >>>>>
> >>> >>>> >>>>> Thanks,
> >>> >>>> >>>>> Ravi
> >>> >>>> >>>>>
> >>> >>>> >>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov
> >>> >>>> >>>>> <ai...@vmware.com>
> >>> >>>> >>>>> wrote:
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> Hi Ravi,
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> I am curious why you are not using already existing
> >>> >>>> >>>>>> last_value
> >>> >>>> >>>>>> function in Impala to get "latest non null value for the
> >>> >>>> >>>>>> column”
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> e.g
> >>> >>>> >>>>>> last_value(column_a ignore nulls) over(partition by ID
> order
> >>> >>>> >>>>>> by
> >>> >>>> >>>>>> Date_Time)
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> Thanks,
> >>> >>>> >>>>>> Antoni
> >>> >>>> >>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong
> >>> >>>> >>>>>> <ta...@cloudera.com>
> >>> >>>> >>>>>> wrote:
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> This was double-posted to
> >>> >>>> >>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> http://community.cloudera.com/t5/Interactive-Short-cycle-
> SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
> >>> >>>> >>>>>> also. I'll continue the discussion here.
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> > Can we have the flexibility of declaring the variable
> >>> >>>> >>>>>> > globally
> >>> >>>> >>>>>> > in
> >>> >>>> >>>>>> > UDF? Globally, I mean outside the function?
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> > And, the reason I am declaring a static variable is to
> >>> >>>> >>>>>> > restore
> >>> >>>> >>>>>> > the
> >>> >>>> >>>>>> > value of timestamp for every record so that I can
> perform a
> >>> >>>> >>>>>> > comparison of
> >>> >>>> >>>>>> > the timestamps. Is there an alternative approach for
> this?
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> Updating a global or static variable in a UDAF is
> guaranteed
> >>> >>>> >>>>>> not
> >>> >>>> >>>>>> to do
> >>> >>>> >>>>>> what you expect - the function can be invoked concurrently
> by
> >>> >>>> >>>>>> multiple
> >>> >>>> >>>>>> threads.
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> It seems like you probably want to store some additional
> >>> >>>> >>>>>> state in
> >>> >>>> >>>>>> the
> >>> >>>> >>>>>> intermediate value. There are some sample UDAs here (see
> >>> >>>> >>>>>> Avg())
> >>> >>>> >>>>>> where
> >>> >>>> >>>>>> additional intermediate state is stored in a StringVal:
> >>> >>>> >>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> https://github.com/cloudera/impala-udf-samples/blob/
> master/uda-sample.cc#L61
> >>> >>>> >>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth
> >>> >>>> >>>>>> <ra...@gmail.com>
> >>> >>>> >>>>>> wrote:
> >>> >>>> >>>>>>>
> >>> >>>> >>>>>>> Thanks Bharath. Can you check if the logic I am
> implementing
> >>> >>>> >>>>>>> is
> >>> >>>> >>>>>>> correct or needed any modification in it as well? I am
> very
> >>> >>>> >>>>>>> new
> >>> >>>> >>>>>>> to Impala
> >>> >>>> >>>>>>> UDF & C++ and having some hard time figuring out the
> >>> >>>> >>>>>>> problems.
> >>> >>>> >>>>>>>
> >>> >>>> >>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada
> >>> >>>> >>>>>>> <bh...@cloudera.com> wrote:
> >>> >>>> >>>>>>>>
> >>> >>>> >>>>>>>> You need to allocate memory for tsTemp, else it can
> >>> >>>> >>>>>>>> segfault.
> >>> >>>> >>>>>>>> That
> >>> >>>> >>>>>>>> could be the issue here.
> >>> >>>> >>>>>>>>
> >>> >>>> >>>>>>>>  static TimestampVal* tsTemp;
> >>> >>>> >>>>>>>>       tsTemp->date = 0;
> >>> >>>> >>>>>>>>       tsTemp->time_of_day = 0;
> >>> >>>> >>>>>>>>
> >>> >>>> >>>>>>>>
> >>> >>>> >>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth
> >>> >>>> >>>>>>>> <ra...@gmail.com> wrote:
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> Hi All,
> >>> >>>> >>>>>>>>> We are using Impala to do various processings in our
> >>> >>>> >>>>>>>>> systems.
> >>> >>>> >>>>>>>>> We
> >>> >>>> >>>>>>>>> have a requirement recently, wherein we have to handle
> the
> >>> >>>> >>>>>>>>> updates on the
> >>> >>>> >>>>>>>>> events i.e, we have an 'e_update' table which has the
> >>> >>>> >>>>>>>>> partial
> >>> >>>> >>>>>>>>> updates
> >>> >>>> >>>>>>>>> received for various events. The fields that are not
> >>> >>>> >>>>>>>>> updated
> >>> >>>> >>>>>>>>> are being
> >>> >>>> >>>>>>>>> stored as NULL values.
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> Ex:
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C
> >>> >>>> >>>>>>>>> (String)
> >>> >>>> >>>>>>>>> 1 0 1 NULL NULL
> >>> >>>> >>>>>>>>> 1 1 2 Hi NULL
> >>> >>>> >>>>>>>>> 1 3 4 Hello Hi
> >>> >>>> >>>>>>>>> 1 2 5 NULL NULL
> >>> >>>> >>>>>>>>> 1 4 NULL NULL Zero
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> P.S: Please consider Date_time as valid timestamp type
> >>> >>>> >>>>>>>>> values.
> >>> >>>> >>>>>>>>> For
> >>> >>>> >>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> As seen in the above table, the events have a unique id
> >>> >>>> >>>>>>>>> and as
> >>> >>>> >>>>>>>>> we
> >>> >>>> >>>>>>>>> get an update to a particular event, we are storing the
> >>> >>>> >>>>>>>>> date_time at which
> >>> >>>> >>>>>>>>> update has happened and also storing the partial updated
> >>> >>>> >>>>>>>>> values. Apart from
> >>> >>>> >>>>>>>>> the updated values, the rest are stored as NULL values.
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> We are planning to mimic inplace updates on the table,
> so
> >>> >>>> >>>>>>>>> that
> >>> >>>> >>>>>>>>> it
> >>> >>>> >>>>>>>>> would retrieve the resulting table as follows using the
> >>> >>>> >>>>>>>>> query
> >>> >>>> >>>>>>>>> below: We
> >>> >>>> >>>>>>>>> don't delete the data.
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> > SELECT id, current_val(A,date_time) as A,
> >>> >>>> >>>>>>>>> > current_val(B,date_time) as B,
> current_val(C,date_time)
> >>> >>>> >>>>>>>>> > as C
> >>> >>>> >>>>>>>>> > from e_update
> >>> >>>> >>>>>>>>> > GROUP BY ID;
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> where, current_val is a custom impala UDA we are
> planning
> >>> >>>> >>>>>>>>> to
> >>> >>>> >>>>>>>>> implement. i.e. get latest non null value for the
> column.
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> ID (Int) A (Int) B (String) C (String)
> >>> >>>> >>>>>>>>> 1 4 Hello Zero
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> Implemented current_val UDA:
> >>> >>>> >>>>>>>>> The below code is only for int type inputs:
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> uda-currentval.h
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> //This is a sample for retrieving the current value of
> >>> >>>> >>>>>>>>> e_update
> >>> >>>> >>>>>>>>> table
> >>> >>>> >>>>>>>>> //
> >>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal*
> >>> >>>> >>>>>>>>> val);
> >>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
> >>> >>>> >>>>>>>>> IntVal&
> >>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val);
> >>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const
> >>> >>>> >>>>>>>>> IntVal&
> >>> >>>> >>>>>>>>> src,
> >>> >>>> >>>>>>>>> IntVal* dst);
> >>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context,
> >>> >>>> >>>>>>>>> const
> >>> >>>> >>>>>>>>> IntVal&
> >>> >>>> >>>>>>>>> val);
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> uda-currentval.cc
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> //
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> ------------------------------
> -----------------------------------------------------------------
> >>> >>>> >>>>>>>>> // This is a sample for retrieving the current value of
> >>> >>>> >>>>>>>>> e_update
> >>> >>>> >>>>>>>>> table
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> //----------------------------
> -------------------------------------------------------------------
> >>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal*
> >>> >>>> >>>>>>>>> val) {
> >>> >>>> >>>>>>>>>       val->is_null = false;
> >>> >>>> >>>>>>>>>       val->val = 0;
> >>> >>>> >>>>>>>>> }
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
> >>> >>>> >>>>>>>>> IntVal&
> >>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val) {
> >>> >>>> >>>>>>>>>       static TimestampVal* tsTemp;
> >>> >>>> >>>>>>>>>       tsTemp->date = 0;
> >>> >>>> >>>>>>>>>       tsTemp->time_of_day = 0;
> >>> >>>> >>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
> >>> >>>> >>>>>>>>>         tsTemp->date = ts.date;
> >>> >>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
> >>> >>>> >>>>>>>>>         val->val = input.val;
> >>> >>>> >>>>>>>>>         return;
> >>> >>>> >>>>>>>>>       }
> >>> >>>> >>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day >
> >>> >>>> >>>>>>>>> tsTemp->time_of_day){
> >>> >>>> >>>>>>>>>         tsTemp->date = ts.date;
> >>> >>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
> >>> >>>> >>>>>>>>>         val->val = input.val;
> >>> >>>> >>>>>>>>>         return;
> >>> >>>> >>>>>>>>>       }
> >>> >>>> >>>>>>>>> }
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const
> >>> >>>> >>>>>>>>> IntVal&
> >>> >>>> >>>>>>>>> src,
> >>> >>>> >>>>>>>>> IntVal* dst) {
> >>> >>>> >>>>>>>>>      dst->val += src.val;
> >>> >>>> >>>>>>>>> }
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context,
> >>> >>>> >>>>>>>>> const
> >>> >>>> >>>>>>>>> IntVal&
> >>> >>>> >>>>>>>>> val) {
> >>> >>>> >>>>>>>>>      return val;
> >>> >>>> >>>>>>>>> }
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> We are able to build and create an aggregate function in
> >>> >>>> >>>>>>>>> impala,
> >>> >>>> >>>>>>>>> but when trying to run the select query similar to the
> one
> >>> >>>> >>>>>>>>> above, it is
> >>> >>>> >>>>>>>>> bringing down couple of impala deamons and throwing the
> >>> >>>> >>>>>>>>> error
> >>> >>>> >>>>>>>>> below and
> >>> >>>> >>>>>>>>> getting terminated.
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
> >>> >>>> >>>>>>>>> hadoop102.**.**.**.com:22000
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> We have impalad running on 14 instances.
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>>
> >>> >>>> >>>>>>>>> Can someone help resolve us this problem and a better
> way
> >>> >>>> >>>>>>>>> to
> >>> >>>> >>>>>>>>> achieve a solution for the scenario explained.
> >>> >>>> >>>>>>>>
> >>> >>>> >>>>>>>>
> >>> >>>> >>>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>>>
> >>> >>>> >>>>
> >>> >>>> >>
> >>> >>>> >
> >>
> >>
> >
>

Re: Creating Impala UDA

Posted by Matthew Jacobs <mj...@cloudera.com>.
I haven't looked at the code, but this isn't going to solve your
earlier issue because this is an aggregation function, this will not
work for analytic functions. I was saying in an earlier email that we
don't expose an interface for registering analytic functions, so you
should probably just upgrade Impala. I posted the patch of the
analytic function we added to illustrate that it will be difficult and
not a matter of registering a UDA.

On Thu, Jun 22, 2017 at 11:25 AM, Ravi Kanth <ra...@gmail.com> wrote:
> Hi All,
>
> I wrote the below lines of code to achieve this functionality:
>
> // Copyright 2012 Cloudera Inc.
> //
> // Licensed under the Apache License, Version 2.0 (the "License");
> // you may not use this file except in compliance with the License.
> // You may obtain a copy of the License at
> //
> // http://www.apache.org/licenses/LICENSE-2.0
> //
> // Unless required by applicable law or agreed to in writing, software
> // distributed under the License is distributed on an "AS IS" BASIS,
> // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> // See the License for the specific language governing permissions and
> // limitations under the License.
>
> #include "uda-sample.h"
> #include <assert.h>
> #include <sstream>
>
> using namespace impala_udf;
> using namespace std;
>
>
> StringVal ToStringVal(FunctionContext* context, const IntVal& val) {
>   stringstream ss;
>   ss << val.val;
>   string str = ss.str();
>   StringVal string_val(context, str.size());
>   memcpy(string_val.ptr, str.c_str(), str.size());
>   return string_val;
> }
>
> //
> ---------------------------------------------------------------------------------------
> // // This is an aggregate function for retrieving the latest non-null value
> for e_update table
> // //
> ---------------------------------------------------------------------------------------
>
> struct CurrentValStruct {
>   IntVal value;
>   TimestampVal tsTemp;
> };
>
> // Initialize the StringVal intermediate to a zero'd AvgStruct
> void CurrentValInit(FunctionContext* context, StringVal* val) {
>   val->is_null = false;
>   val->len = sizeof(CurrentValStruct);
>   val->ptr = context->Allocate(val->len);
>   memset(val->ptr, 0, val->len);
>   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr);
>   cur->value.is_null = false;
>   cur->tsTemp.is_null = false;
> }
>
> void CurrentValUpdate(FunctionContext* context, const IntVal& input, const
> TimestampVal& ts, StringVal* val) {
>   assert(!val->is_null);
>   assert(val->len == sizeof(CurrentValStruct));
>   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr);
>   //checking if the incoming input value is null
>   if(!input.is_null){
>     if(ts.date >= cur->tsTemp.date && ts.time_of_day >
> cur->tsTemp.time_of_day){
>     cur->value = input.val;
>       cur->tsTemp.date = ts.date;
>       cur->tsTemp.time_of_day = ts.time_of_day;
>     }
>   }
> }
>
> void CurrentValMerge(FunctionContext* context, const StringVal& src,
> StringVal* dst) {
>   if (src.is_null) return;
>   const CurrentValStruct* src_cur = reinterpret_cast<const
> CurrentValStruct*>(src.ptr);
>   CurrentValStruct* dst_cur = reinterpret_cast<CurrentValStruct*>(dst->ptr);
>   if(dst_cur->tsTemp.is_null){
>     dst_cur->value = src_cur->value;
>     dst_cur->tsTemp.date = src_cur->tsTemp.date;
>     dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day;
>   dst_cur->tsTemp.is_null = false;
>   dst_cur->value.is_null = false;
>  }
>   else{
>     if(src_cur->tsTemp.date >= dst_cur->tsTemp.date &&
> src_cur->tsTemp.time_of_day > dst_cur->tsTemp.time_of_day){
>     dst_cur->value = src_cur->value;
>     dst_cur->tsTemp.date = src_cur->tsTemp.date;
>       dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day;
>     }
>   }
> }
>
> StringVal CurrentValSerialize(FunctionContext* context, const StringVal&
> val) {
>   assert(!val.is_null);
>   StringVal result(context, val.len);
>   memcpy(result.ptr, val.ptr, val.len);
>   context->Free(val.ptr);
>   return result;
> }
>
> StringVal CurrentValFinalize(FunctionContext* context, const StringVal& val)
> {
>   //IntVal intResult;
>   assert(!val.is_null);
>   assert(val.len == sizeof(CurrentValStruct));
>   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
>   StringVal result;
>   if (cur->value == 0) {
>     result = StringVal::null();
>     //intResult = 0;
>   } else {
>     // intResult = cur->value.val;
>     // Copies the result to memory owned by Impala
>     result = ToStringVal(context, cur->value.val);
> //  intResult = atoi(result.c_str());
>   //  std::istringstream(result) >> intResult;
>   }
>   context->Free(val.ptr);
>   return result;
> }
>
> Queries:
>
> create aggregate function current_val(int,timestamp) returns string location
> '/impala/udf/libudasample.so' init_fn='CurrentValInit'
> update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
> serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize';
>
>
> select id, current_val(a,date_time) as a from udf_test GROUP BY id;
>
>
> The above is working fine and I am able to achieve my requirement. But, is
> there any possibility that we can return an IntVal type rather than
> StringVal type? If so where can I make the changes?
>
> I tried changing the below:
>
> IntVal CurrentValSerialize(FunctionContext* context, const StringVal& val) {
>
>   assert(!val.is_null);
>
>   StringVal result(context, val.len);
>
>   memcpy(result.ptr, val.ptr, val.len);
>
>   context->Free(val.ptr);
>
>   IntVal intResult;
>
>   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
>
>   intResult = cur->value.val;
>
>   return intResult;
>
> }
>
>
> IntVal CurrentValFinalize(FunctionContext* context, const StringVal& val) {
>
>   IntVal intResult;
>
>   assert(!val.is_null);
>
>   assert(val.len == sizeof(CurrentValStruct));
>
>   CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
>
>   //StringVal result;
>
>   if (cur->value == 0) {
>
>     //result = StringVal::null();
>
>     intResult = 0;
>
>   } else {
>
>     intResult = cur->value.val;
>
>     // Copies the result to memory owned by Impala
>
>     //result = ToStringVal(context, cur->value.val);
>
> //  intResult = atoi(result.c_str());
>
>   //  std::istringstream(result) >> intResult;
>
>   }
>
>   context->Free(val.ptr);
>
>   return intResult;
>
> }
>
>
> But, when trying to create aggregate function I am facing,
>
> create aggregate function current_val(int,timestamp) returns int location
> '/impala/udf/libudasample.so' init_fn='CurrentValInit'
> update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
> serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize';
>
>
> Query: create aggregate function current_val(int,timestamp) returns int
> location '/dwh/impala/udf/libudasample.so' init_fn='CurrentValInit'
> update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
> serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize'
>
>
> ERROR: AnalysisException: Could not find function CurrentValUpdate(INT,
> TIMESTAMP) returns INT in:
> hdfs://***************:8020/impala/udf/libudasample.so
>
> Check that function name, arguments, and return type are correct.
>
>
> I changed the header file function definition also accordingly. Can someone
> suggest if I am missing something here?
>
> Thanks,
> Ravi
>
> On 21 June 2017 at 13:42, Ravi Kanth <ra...@gmail.com> wrote:
>>
>> Thanks for the suggestion Matthew. Let me look into the patch. I am
>> currently working on building a custom UDA. Hopefully the information you
>> provided and the discussion we had might be useful to me.
>>
>> On 21 June 2017 at 13:39, Matthew Jacobs <mj...@cloudera.com> wrote:
>>>
>>> I'd strongly recommend the latter (upgrading). We don't really expose
>>> the analytic function interface, so you'd end up writing an Impala
>>> patch, and analytic functions are particularly tricky.
>>>
>>> Here's Thomas' patch to add 'ignore nulls' in first/last value:
>>> https://gerrit.cloudera.org/#/c/3328/
>>>
>>> On Wed, Jun 21, 2017 at 1:08 PM, Ravi Kanth <ra...@gmail.com>
>>> wrote:
>>> > Thanks All. I will think of a possible solution either by implementing
>>> > a
>>> > Custom UDA or would update the version.
>>> >
>>> > On Wed, Jun 21, 2017 at 13:04 Thomas Tauber-Marshall
>>> > <tm...@cloudera.com> wrote:
>>> >>
>>> >> On Wed, Jun 21, 2017 at 2:33 PM Ravi Kanth <ra...@gmail.com>
>>> >> wrote:
>>> >>>
>>> >>> Ya. I agree with you Thomas. Probably that's what I'm doing wrong.
>>> >>>
>>> >>> Unfortunately, as mentioned the version of impala we are using I
>>> >>> belive
>>> >>> it doesn't support ignore nulls.
>>> >>>
>>> >>> But, my question is would last_value function retrieve a latest not
>>> >>> null
>>> >>> value irrespective of using ignore nulls?
>>> >>
>>> >>
>>> >> Not sure I follow - if you use last_value without ignore nulls, you'll
>>> >> get
>>> >> the latest value taking all values into consideration, which may or
>>> >> may not
>>> >> be null.
>>> >>
>>> >>>
>>> >>>
>>> >>> Ravi
>>> >>>
>>> >>> On Wed, Jun 21, 2017 at 12:23 Matthew Jacobs <mj...@cloudera.com> wrote:
>>> >>>>
>>> >>>> Ah I think Thomas is right. I read the expected results and the
>>> >>>> query
>>> >>>> too quickly, so my comment about the asc/desc is probably wrong.
>>> >>>> Clearly my point about analytic functions being tricky holds true :)
>>> >>>>
>>> >>>> On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall
>>> >>>> <tm...@cloudera.com> wrote:
>>> >>>> >
>>> >>>> >
>>> >>>> > On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth
>>> >>>> > <ra...@gmail.com>
>>> >>>> > wrote:
>>> >>>> >>
>>> >>>> >> Thomas,
>>> >>>> >>
>>> >>>> >> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I
>>> >>>> >> see
>>> >>>> >> ignore nulls has been added in Impala 2.7.0. And, does adding
>>> >>>> >> ignore
>>> >>>> >> nulls
>>> >>>> >> would make a big difference in the expected result?
>>> >>>> >
>>> >>>> >
>>> >>>> > That's too bad. I think that 'ignore nulls' would give you what
>>> >>>> > you
>>> >>>> > want -
>>> >>>> > the problem with the query that you posted is that it eliminates
>>> >>>> > rows
>>> >>>> > that
>>> >>>> > don't match the where clause, so for example the row with "Zero"
>>> >>>> > in it
>>> >>>> > is
>>> >>>> > eliminated because it is filtered out by the "where a is not
>>> >>>> > null",
>>> >>>> > whereas
>>> >>>> > "ignore nulls" only affects the values that could be returned by
>>> >>>> > the
>>> >>>> > specific analytic function that the ignore is applied to.
>>> >>>> >
>>> >>>> >>
>>> >>>> >>
>>> >>>> >> Ravi
>>> >>>> >>
>>> >>>> >> On 21 June 2017 at 11:20, Thomas Tauber-Marshall
>>> >>>> >> <tm...@cloudera.com>
>>> >>>> >> wrote:
>>> >>>> >>>
>>> >>>> >>> Ravi,
>>> >>>> >>>
>>> >>>> >>> Instead of using the "where ... is not null", have you tried
>>> >>>> >>> 'last_value(... ignore nulls)'
>>> >>>> >>>
>>> >>>> >>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth
>>> >>>> >>> <ra...@gmail.com>
>>> >>>> >>> wrote:
>>> >>>> >>>>
>>> >>>> >>>> Antoni,
>>> >>>> >>>>
>>> >>>> >>>> The problem in using last_value function() as far as I observed
>>> >>>> >>>> is,
>>> >>>> >>>> if I
>>> >>>> >>>> use it on multiple columns in a single query, its not
>>> >>>> >>>> retrieving
>>> >>>> >>>> results as
>>> >>>> >>>> expected.
>>> >>>> >>>>
>>> >>>> >>>>  Input:
>>> >>>> >>>>
>>> >>>> >>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String)
>>> >>>> >>>> 101NULLNULL
>>> >>>> >>>> 112HiNULL
>>> >>>> >>>> 134HelloHi
>>> >>>> >>>> 125NULLNULL
>>> >>>> >>>> 14NULLNULLZero
>>> >>>> >>>>
>>> >>>> >>>> Expected Output:
>>> >>>> >>>>
>>> >>>> >>>>
>>> >>>> >>>>
>>> >>>> >>>> ID (Int)A (Int)B (String)C (String)
>>> >>>> >>>> 14HelloZero
>>> >>>> >>>>
>>> >>>> >>>>
>>> >>>> >>>> Query executed:
>>> >>>> >>>>
>>> >>>> >>>> select id, last_value(a) over(partition by id order by
>>> >>>> >>>> date_time
>>> >>>> >>>> desc)
>>> >>>> >>>> as a, last_value(b) over(partition by id order by date_time
>>> >>>> >>>> desc)
>>> >>>> >>>> as b,
>>> >>>> >>>> last_value(c) over(partition by id order by date_time desc) as
>>> >>>> >>>> c
>>> >>>> >>>> from
>>> >>>> >>>> udf_test where a is not null and b is not null and c is not
>>> >>>> >>>> null;
>>> >>>> >>>>
>>> >>>> >>>>
>>> >>>> >>>>
>>> >>>> >>>> Output I am getting:
>>> >>>> >>>>
>>> >>>> >>>> +----+---+-------+----+
>>> >>>> >>>>
>>> >>>> >>>> | id | a | b     | c  |
>>> >>>> >>>>
>>> >>>> >>>> +----+---+-------+----+
>>> >>>> >>>>
>>> >>>> >>>> | 1  | 4 | Hello | Hi ||
>>> >>>> >>>>
>>> >>>> >>>> +----+---+-------+----+
>>> >>>> >>>>
>>> >>>> >>>>
>>> >>>> >>>> Hopefully, I am clear with the problem above.
>>> >>>> >>>>
>>> >>>> >>>> Thanks,
>>> >>>> >>>> Ravi
>>> >>>> >>>>
>>> >>>> >>>> On 20 June 2017 at 22:05, Ravi Kanth <ra...@gmail.com>
>>> >>>> >>>> wrote:
>>> >>>> >>>>>
>>> >>>> >>>>> Antoni,
>>> >>>> >>>>>
>>> >>>> >>>>> Thanks for the suggestion. Let me have a look at it and
>>> >>>> >>>>> hopefully
>>> >>>> >>>>> we
>>> >>>> >>>>> can use it in our use case.
>>> >>>> >>>>>
>>> >>>> >>>>> Thanks,
>>> >>>> >>>>> Ravi
>>> >>>> >>>>>
>>> >>>> >>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov
>>> >>>> >>>>> <ai...@vmware.com>
>>> >>>> >>>>> wrote:
>>> >>>> >>>>>>
>>> >>>> >>>>>> Hi Ravi,
>>> >>>> >>>>>>
>>> >>>> >>>>>> I am curious why you are not using already existing
>>> >>>> >>>>>> last_value
>>> >>>> >>>>>> function in Impala to get "latest non null value for the
>>> >>>> >>>>>> column”
>>> >>>> >>>>>>
>>> >>>> >>>>>> e.g
>>> >>>> >>>>>> last_value(column_a ignore nulls) over(partition by ID  order
>>> >>>> >>>>>> by
>>> >>>> >>>>>> Date_Time)
>>> >>>> >>>>>>
>>> >>>> >>>>>> Thanks,
>>> >>>> >>>>>> Antoni
>>> >>>> >>>>>>
>>> >>>> >>>>>>
>>> >>>> >>>>>>
>>> >>>> >>>>>>
>>> >>>> >>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong
>>> >>>> >>>>>> <ta...@cloudera.com>
>>> >>>> >>>>>> wrote:
>>> >>>> >>>>>>
>>> >>>> >>>>>> This was double-posted to
>>> >>>> >>>>>>
>>> >>>> >>>>>>
>>> >>>> >>>>>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
>>> >>>> >>>>>> also. I'll continue the discussion here.
>>> >>>> >>>>>>
>>> >>>> >>>>>> > Can we have the flexibility of declaring the variable
>>> >>>> >>>>>> > globally
>>> >>>> >>>>>> > in
>>> >>>> >>>>>> > UDF? Globally, I mean outside the function?
>>> >>>> >>>>>>
>>> >>>> >>>>>> > And, the reason I am declaring a static variable is to
>>> >>>> >>>>>> > restore
>>> >>>> >>>>>> > the
>>> >>>> >>>>>> > value of timestamp for every record so that I can perform a
>>> >>>> >>>>>> > comparison of
>>> >>>> >>>>>> > the timestamps. Is there an alternative approach for this?
>>> >>>> >>>>>>
>>> >>>> >>>>>> Updating a global or static variable in a UDAF is guaranteed
>>> >>>> >>>>>> not
>>> >>>> >>>>>> to do
>>> >>>> >>>>>> what you expect - the function can be invoked concurrently by
>>> >>>> >>>>>> multiple
>>> >>>> >>>>>> threads.
>>> >>>> >>>>>>
>>> >>>> >>>>>> It seems like you probably want to store some additional
>>> >>>> >>>>>> state in
>>> >>>> >>>>>> the
>>> >>>> >>>>>> intermediate value. There are some sample UDAs here (see
>>> >>>> >>>>>> Avg())
>>> >>>> >>>>>> where
>>> >>>> >>>>>> additional intermediate state is stored in a StringVal:
>>> >>>> >>>>>>
>>> >>>> >>>>>>
>>> >>>> >>>>>> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
>>> >>>> >>>>>>
>>> >>>> >>>>>>
>>> >>>> >>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth
>>> >>>> >>>>>> <ra...@gmail.com>
>>> >>>> >>>>>> wrote:
>>> >>>> >>>>>>>
>>> >>>> >>>>>>> Thanks Bharath. Can you check if the logic I am implementing
>>> >>>> >>>>>>> is
>>> >>>> >>>>>>> correct or needed any modification in it as well? I am very
>>> >>>> >>>>>>> new
>>> >>>> >>>>>>> to Impala
>>> >>>> >>>>>>> UDF & C++ and having some hard time figuring out the
>>> >>>> >>>>>>> problems.
>>> >>>> >>>>>>>
>>> >>>> >>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada
>>> >>>> >>>>>>> <bh...@cloudera.com> wrote:
>>> >>>> >>>>>>>>
>>> >>>> >>>>>>>> You need to allocate memory for tsTemp, else it can
>>> >>>> >>>>>>>> segfault.
>>> >>>> >>>>>>>> That
>>> >>>> >>>>>>>> could be the issue here.
>>> >>>> >>>>>>>>
>>> >>>> >>>>>>>>  static TimestampVal* tsTemp;
>>> >>>> >>>>>>>>       tsTemp->date = 0;
>>> >>>> >>>>>>>>       tsTemp->time_of_day = 0;
>>> >>>> >>>>>>>>
>>> >>>> >>>>>>>>
>>> >>>> >>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth
>>> >>>> >>>>>>>> <ra...@gmail.com> wrote:
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> Hi All,
>>> >>>> >>>>>>>>> We are using Impala to do various processings in our
>>> >>>> >>>>>>>>> systems.
>>> >>>> >>>>>>>>> We
>>> >>>> >>>>>>>>> have a requirement recently, wherein we have to handle the
>>> >>>> >>>>>>>>> updates on the
>>> >>>> >>>>>>>>> events i.e, we have an 'e_update' table which has the
>>> >>>> >>>>>>>>> partial
>>> >>>> >>>>>>>>> updates
>>> >>>> >>>>>>>>> received for various events. The fields that are not
>>> >>>> >>>>>>>>> updated
>>> >>>> >>>>>>>>> are being
>>> >>>> >>>>>>>>> stored as NULL values.
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> Ex:
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C
>>> >>>> >>>>>>>>> (String)
>>> >>>> >>>>>>>>> 1 0 1 NULL NULL
>>> >>>> >>>>>>>>> 1 1 2 Hi NULL
>>> >>>> >>>>>>>>> 1 3 4 Hello Hi
>>> >>>> >>>>>>>>> 1 2 5 NULL NULL
>>> >>>> >>>>>>>>> 1 4 NULL NULL Zero
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> P.S: Please consider Date_time as valid timestamp type
>>> >>>> >>>>>>>>> values.
>>> >>>> >>>>>>>>> For
>>> >>>> >>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> As seen in the above table, the events have a unique id
>>> >>>> >>>>>>>>> and as
>>> >>>> >>>>>>>>> we
>>> >>>> >>>>>>>>> get an update to a particular event, we are storing the
>>> >>>> >>>>>>>>> date_time at which
>>> >>>> >>>>>>>>> update has happened and also storing the partial updated
>>> >>>> >>>>>>>>> values. Apart from
>>> >>>> >>>>>>>>> the updated values, the rest are stored as NULL values.
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> We are planning to mimic inplace updates on the table, so
>>> >>>> >>>>>>>>> that
>>> >>>> >>>>>>>>> it
>>> >>>> >>>>>>>>> would retrieve the resulting table as follows using the
>>> >>>> >>>>>>>>> query
>>> >>>> >>>>>>>>> below: We
>>> >>>> >>>>>>>>> don't delete the data.
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> > SELECT id, current_val(A,date_time) as A,
>>> >>>> >>>>>>>>> > current_val(B,date_time) as B, current_val(C,date_time)
>>> >>>> >>>>>>>>> > as C
>>> >>>> >>>>>>>>> > from e_update
>>> >>>> >>>>>>>>> > GROUP BY ID;
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> where, current_val is a custom impala UDA we are planning
>>> >>>> >>>>>>>>> to
>>> >>>> >>>>>>>>> implement. i.e. get latest non null value for the column.
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> ID (Int) A (Int) B (String) C (String)
>>> >>>> >>>>>>>>> 1 4 Hello Zero
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> Implemented current_val UDA:
>>> >>>> >>>>>>>>> The below code is only for int type inputs:
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> uda-currentval.h
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> //This is a sample for retrieving the current value of
>>> >>>> >>>>>>>>> e_update
>>> >>>> >>>>>>>>> table
>>> >>>> >>>>>>>>> //
>>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal*
>>> >>>> >>>>>>>>> val);
>>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
>>> >>>> >>>>>>>>> IntVal&
>>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val);
>>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const
>>> >>>> >>>>>>>>> IntVal&
>>> >>>> >>>>>>>>> src,
>>> >>>> >>>>>>>>> IntVal* dst);
>>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context,
>>> >>>> >>>>>>>>> const
>>> >>>> >>>>>>>>> IntVal&
>>> >>>> >>>>>>>>> val);
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> uda-currentval.cc
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> //
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> -----------------------------------------------------------------------------------------------
>>> >>>> >>>>>>>>> // This is a sample for retrieving the current value of
>>> >>>> >>>>>>>>> e_update
>>> >>>> >>>>>>>>> table
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> //-----------------------------------------------------------------------------------------------
>>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal*
>>> >>>> >>>>>>>>> val) {
>>> >>>> >>>>>>>>>       val->is_null = false;
>>> >>>> >>>>>>>>>       val->val = 0;
>>> >>>> >>>>>>>>> }
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
>>> >>>> >>>>>>>>> IntVal&
>>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val) {
>>> >>>> >>>>>>>>>       static TimestampVal* tsTemp;
>>> >>>> >>>>>>>>>       tsTemp->date = 0;
>>> >>>> >>>>>>>>>       tsTemp->time_of_day = 0;
>>> >>>> >>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>> >>>> >>>>>>>>>         tsTemp->date = ts.date;
>>> >>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>> >>>> >>>>>>>>>         val->val = input.val;
>>> >>>> >>>>>>>>>         return;
>>> >>>> >>>>>>>>>       }
>>> >>>> >>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day >
>>> >>>> >>>>>>>>> tsTemp->time_of_day){
>>> >>>> >>>>>>>>>         tsTemp->date = ts.date;
>>> >>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>> >>>> >>>>>>>>>         val->val = input.val;
>>> >>>> >>>>>>>>>         return;
>>> >>>> >>>>>>>>>       }
>>> >>>> >>>>>>>>> }
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const
>>> >>>> >>>>>>>>> IntVal&
>>> >>>> >>>>>>>>> src,
>>> >>>> >>>>>>>>> IntVal* dst) {
>>> >>>> >>>>>>>>>      dst->val += src.val;
>>> >>>> >>>>>>>>> }
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context,
>>> >>>> >>>>>>>>> const
>>> >>>> >>>>>>>>> IntVal&
>>> >>>> >>>>>>>>> val) {
>>> >>>> >>>>>>>>>      return val;
>>> >>>> >>>>>>>>> }
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> We are able to build and create an aggregate function in
>>> >>>> >>>>>>>>> impala,
>>> >>>> >>>>>>>>> but when trying to run the select query similar to the one
>>> >>>> >>>>>>>>> above, it is
>>> >>>> >>>>>>>>> bringing down couple of impala deamons and throwing the
>>> >>>> >>>>>>>>> error
>>> >>>> >>>>>>>>> below and
>>> >>>> >>>>>>>>> getting terminated.
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>> >>>> >>>>>>>>> hadoop102.**.**.**.com:22000
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> We have impalad running on 14 instances.
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>>
>>> >>>> >>>>>>>>> Can someone help resolve us this problem and a better way
>>> >>>> >>>>>>>>> to
>>> >>>> >>>>>>>>> achieve a solution for the scenario explained.
>>> >>>> >>>>>>>>
>>> >>>> >>>>>>>>
>>> >>>> >>>>>>>
>>> >>>> >>>>>>
>>> >>>> >>>>>>
>>> >>>> >>>>
>>> >>>> >>
>>> >>>> >
>>
>>
>

Re: Creating Impala UDA

Posted by Ravi Kanth <ra...@gmail.com>.
Hi All,

I wrote the below lines of code to achieve this functionality:

// Copyright 2012 Cloudera Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "uda-sample.h"
#include <assert.h>
#include <sstream>

using namespace impala_udf;
using namespace std;


StringVal ToStringVal(FunctionContext* context, const IntVal& val) {
  stringstream ss;
  ss << val.val;
  string str = ss.str();
  StringVal string_val(context, str.size());
  memcpy(string_val.ptr, str.c_str(), str.size());
  return string_val;
}

//
---------------------------------------------------------------------------------------
// // This is an aggregate function for retrieving the latest non-null
value for e_update table
// //
---------------------------------------------------------------------------------------

struct CurrentValStruct {
  IntVal value;
  TimestampVal tsTemp;
};

// Initialize the StringVal intermediate to a zero'd AvgStruct
void CurrentValInit(FunctionContext* context, StringVal* val) {
  val->is_null = false;
  val->len = sizeof(CurrentValStruct);
  val->ptr = context->Allocate(val->len);
  memset(val->ptr, 0, val->len);
  CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr);
  cur->value.is_null = false;
  cur->tsTemp.is_null = false;
}

void CurrentValUpdate(FunctionContext* context, const IntVal& input, const
TimestampVal& ts, StringVal* val) {
  assert(!val->is_null);
  assert(val->len == sizeof(CurrentValStruct));
  CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val->ptr);
  //checking if the incoming input value is null
  if(!input.is_null){
    if(ts.date >= cur->tsTemp.date && ts.time_of_day >
cur->tsTemp.time_of_day){
    cur->value = input.val;
      cur->tsTemp.date = ts.date;
      cur->tsTemp.time_of_day = ts.time_of_day;
    }
  }
}

void CurrentValMerge(FunctionContext* context, const StringVal& src,
StringVal* dst) {
  if (src.is_null) return;
  const CurrentValStruct* src_cur = reinterpret_cast<const
CurrentValStruct*>(src.ptr);
  CurrentValStruct* dst_cur = reinterpret_cast<CurrentValStruct*>(dst->ptr);
  if(dst_cur->tsTemp.is_null){
    dst_cur->value = src_cur->value;
    dst_cur->tsTemp.date = src_cur->tsTemp.date;
    dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day;
  dst_cur->tsTemp.is_null = false;
  dst_cur->value.is_null = false;
 }
  else{
    if(src_cur->tsTemp.date >= dst_cur->tsTemp.date &&
src_cur->tsTemp.time_of_day > dst_cur->tsTemp.time_of_day){
    dst_cur->value = src_cur->value;
    dst_cur->tsTemp.date = src_cur->tsTemp.date;
      dst_cur->tsTemp.time_of_day = src_cur->tsTemp.time_of_day;
    }
  }
}

StringVal CurrentValSerialize(FunctionContext* context, const StringVal&
val) {
  assert(!val.is_null);
  StringVal result(context, val.len);
  memcpy(result.ptr, val.ptr, val.len);
  context->Free(val.ptr);
  return result;
}

StringVal CurrentValFinalize(FunctionContext* context, const StringVal&
val) {
  //IntVal intResult;
  assert(!val.is_null);
  assert(val.len == sizeof(CurrentValStruct));
  CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);
  StringVal result;
  if (cur->value == 0) {
    result = StringVal::null();
    //intResult = 0;
  } else {
    // intResult = cur->value.val;
    // Copies the result to memory owned by Impala
    result = ToStringVal(context, cur->value.val);
//  intResult = atoi(result.c_str());
  //  std::istringstream(result) >> intResult;
  }
  context->Free(val.ptr);
  return result;
}

Queries:

create aggregate function current_val(int,timestamp) returns string
location '/impala/udf/libudasample.so' init_fn='CurrentValInit'
update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize';

select id, current_val(a,date_time) as a from udf_test GROUP BY id;

The above is working fine and I am able to achieve my requirement. But, is
there any possibility that we can return an IntVal type rather than
StringVal type? If so where can I make the changes?

I tried changing the below:

IntVal CurrentValSerialize(FunctionContext* context, const StringVal& val) {

  assert(!val.is_null);

  StringVal result(context, val.len);

  memcpy(result.ptr, val.ptr, val.len);

  context->Free(val.ptr);

  IntVal intResult;

  CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);

  intResult = cur->value.val;

  return intResult;

}


IntVal CurrentValFinalize(FunctionContext* context, const StringVal& val) {

  IntVal intResult;

  assert(!val.is_null);

  assert(val.len == sizeof(CurrentValStruct));

  CurrentValStruct* cur = reinterpret_cast<CurrentValStruct*>(val.ptr);

  //StringVal result;

  if (cur->value == 0) {

    //result = StringVal::null();

    intResult = 0;

  } else {

    intResult = cur->value.val;

    // Copies the result to memory owned by Impala

    //result = ToStringVal(context, cur->value.val);

//  intResult = atoi(result.c_str());

  //  std::istringstream(result) >> intResult;

  }

  context->Free(val.ptr);

  return intResult;

}

But, when trying to create aggregate function I am facing,

create aggregate function current_val(int,timestamp) returns int location
'/impala/udf/libudasample.so' init_fn='CurrentValInit'
update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize';


Query: create aggregate function current_val(int,timestamp) returns int
location '/dwh/impala/udf/libudasample.so' init_fn='CurrentValInit'
update_fn='CurrentValUpdate' merge_fn='CurrentValMerge'
serialize_fn='CurrentValSerialize' finalize_fn='CurrentValFinalize'


ERROR: AnalysisException: Could not find function CurrentValUpdate(INT,
TIMESTAMP) returns INT in:
hdfs://***************:8020/impala/udf/libudasample.so

Check that function name, arguments, and return type are correct.

I changed the header file function definition also accordingly. Can someone
suggest if I am missing something here?

Thanks,
Ravi

On 21 June 2017 at 13:42, Ravi Kanth <ra...@gmail.com> wrote:

> Thanks for the suggestion Matthew. Let me look into the patch. I am
> currently working on building a custom UDA. Hopefully the information you
> provided and the discussion we had might be useful to me.
>
> On 21 June 2017 at 13:39, Matthew Jacobs <mj...@cloudera.com> wrote:
>
>> I'd strongly recommend the latter (upgrading). We don't really expose
>> the analytic function interface, so you'd end up writing an Impala
>> patch, and analytic functions are particularly tricky.
>>
>> Here's Thomas' patch to add 'ignore nulls' in first/last value:
>> https://gerrit.cloudera.org/#/c/3328/
>>
>> On Wed, Jun 21, 2017 at 1:08 PM, Ravi Kanth <ra...@gmail.com>
>> wrote:
>> > Thanks All. I will think of a possible solution either by implementing a
>> > Custom UDA or would update the version.
>> >
>> > On Wed, Jun 21, 2017 at 13:04 Thomas Tauber-Marshall
>> > <tm...@cloudera.com> wrote:
>> >>
>> >> On Wed, Jun 21, 2017 at 2:33 PM Ravi Kanth <ra...@gmail.com>
>> >> wrote:
>> >>>
>> >>> Ya. I agree with you Thomas. Probably that's what I'm doing wrong.
>> >>>
>> >>> Unfortunately, as mentioned the version of impala we are using I
>> belive
>> >>> it doesn't support ignore nulls.
>> >>>
>> >>> But, my question is would last_value function retrieve a latest not
>> null
>> >>> value irrespective of using ignore nulls?
>> >>
>> >>
>> >> Not sure I follow - if you use last_value without ignore nulls, you'll
>> get
>> >> the latest value taking all values into consideration, which may or
>> may not
>> >> be null.
>> >>
>> >>>
>> >>>
>> >>> Ravi
>> >>>
>> >>> On Wed, Jun 21, 2017 at 12:23 Matthew Jacobs <mj...@cloudera.com> wrote:
>> >>>>
>> >>>> Ah I think Thomas is right. I read the expected results and the query
>> >>>> too quickly, so my comment about the asc/desc is probably wrong.
>> >>>> Clearly my point about analytic functions being tricky holds true :)
>> >>>>
>> >>>> On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall
>> >>>> <tm...@cloudera.com> wrote:
>> >>>> >
>> >>>> >
>> >>>> > On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth <
>> ravikanth.4b0@gmail.com>
>> >>>> > wrote:
>> >>>> >>
>> >>>> >> Thomas,
>> >>>> >>
>> >>>> >> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I
>> see
>> >>>> >> ignore nulls has been added in Impala 2.7.0. And, does adding
>> ignore
>> >>>> >> nulls
>> >>>> >> would make a big difference in the expected result?
>> >>>> >
>> >>>> >
>> >>>> > That's too bad. I think that 'ignore nulls' would give you what you
>> >>>> > want -
>> >>>> > the problem with the query that you posted is that it eliminates
>> rows
>> >>>> > that
>> >>>> > don't match the where clause, so for example the row with "Zero"
>> in it
>> >>>> > is
>> >>>> > eliminated because it is filtered out by the "where a is not null",
>> >>>> > whereas
>> >>>> > "ignore nulls" only affects the values that could be returned by
>> the
>> >>>> > specific analytic function that the ignore is applied to.
>> >>>> >
>> >>>> >>
>> >>>> >>
>> >>>> >> Ravi
>> >>>> >>
>> >>>> >> On 21 June 2017 at 11:20, Thomas Tauber-Marshall
>> >>>> >> <tm...@cloudera.com>
>> >>>> >> wrote:
>> >>>> >>>
>> >>>> >>> Ravi,
>> >>>> >>>
>> >>>> >>> Instead of using the "where ... is not null", have you tried
>> >>>> >>> 'last_value(... ignore nulls)'
>> >>>> >>>
>> >>>> >>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <
>> ravikanth.4b0@gmail.com>
>> >>>> >>> wrote:
>> >>>> >>>>
>> >>>> >>>> Antoni,
>> >>>> >>>>
>> >>>> >>>> The problem in using last_value function() as far as I observed
>> is,
>> >>>> >>>> if I
>> >>>> >>>> use it on multiple columns in a single query, its not retrieving
>> >>>> >>>> results as
>> >>>> >>>> expected.
>> >>>> >>>>
>> >>>> >>>>  Input:
>> >>>> >>>>
>> >>>> >>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String)
>> >>>> >>>> 101NULLNULL
>> >>>> >>>> 112HiNULL
>> >>>> >>>> 134HelloHi
>> >>>> >>>> 125NULLNULL
>> >>>> >>>> 14NULLNULLZero
>> >>>> >>>>
>> >>>> >>>> Expected Output:
>> >>>> >>>>
>> >>>> >>>>
>> >>>> >>>>
>> >>>> >>>> ID (Int)A (Int)B (String)C (String)
>> >>>> >>>> 14HelloZero
>> >>>> >>>>
>> >>>> >>>>
>> >>>> >>>> Query executed:
>> >>>> >>>>
>> >>>> >>>> select id, last_value(a) over(partition by id order by date_time
>> >>>> >>>> desc)
>> >>>> >>>> as a, last_value(b) over(partition by id order by date_time
>> desc)
>> >>>> >>>> as b,
>> >>>> >>>> last_value(c) over(partition by id order by date_time desc) as c
>> >>>> >>>> from
>> >>>> >>>> udf_test where a is not null and b is not null and c is not
>> null;
>> >>>> >>>>
>> >>>> >>>>
>> >>>> >>>>
>> >>>> >>>> Output I am getting:
>> >>>> >>>>
>> >>>> >>>> +----+---+-------+----+
>> >>>> >>>>
>> >>>> >>>> | id | a | b     | c  |
>> >>>> >>>>
>> >>>> >>>> +----+---+-------+----+
>> >>>> >>>>
>> >>>> >>>> | 1  | 4 | Hello | Hi ||
>> >>>> >>>>
>> >>>> >>>> +----+---+-------+----+
>> >>>> >>>>
>> >>>> >>>>
>> >>>> >>>> Hopefully, I am clear with the problem above.
>> >>>> >>>>
>> >>>> >>>> Thanks,
>> >>>> >>>> Ravi
>> >>>> >>>>
>> >>>> >>>> On 20 June 2017 at 22:05, Ravi Kanth <ra...@gmail.com>
>> >>>> >>>> wrote:
>> >>>> >>>>>
>> >>>> >>>>> Antoni,
>> >>>> >>>>>
>> >>>> >>>>> Thanks for the suggestion. Let me have a look at it and
>> hopefully
>> >>>> >>>>> we
>> >>>> >>>>> can use it in our use case.
>> >>>> >>>>>
>> >>>> >>>>> Thanks,
>> >>>> >>>>> Ravi
>> >>>> >>>>>
>> >>>> >>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <
>> aivanov@vmware.com>
>> >>>> >>>>> wrote:
>> >>>> >>>>>>
>> >>>> >>>>>> Hi Ravi,
>> >>>> >>>>>>
>> >>>> >>>>>> I am curious why you are not using already existing last_value
>> >>>> >>>>>> function in Impala to get "latest non null value for the
>> column”
>> >>>> >>>>>>
>> >>>> >>>>>> e.g
>> >>>> >>>>>> last_value(column_a ignore nulls) over(partition by ID  order
>> by
>> >>>> >>>>>> Date_Time)
>> >>>> >>>>>>
>> >>>> >>>>>> Thanks,
>> >>>> >>>>>> Antoni
>> >>>> >>>>>>
>> >>>> >>>>>>
>> >>>> >>>>>>
>> >>>> >>>>>>
>> >>>> >>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong
>> >>>> >>>>>> <ta...@cloudera.com>
>> >>>> >>>>>> wrote:
>> >>>> >>>>>>
>> >>>> >>>>>> This was double-posted to
>> >>>> >>>>>>
>> >>>> >>>>>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL
>> /Creating-Impala-UDA/m-p/56201/highlight/false#M3073
>> >>>> >>>>>> also. I'll continue the discussion here.
>> >>>> >>>>>>
>> >>>> >>>>>> > Can we have the flexibility of declaring the variable
>> globally
>> >>>> >>>>>> > in
>> >>>> >>>>>> > UDF? Globally, I mean outside the function?
>> >>>> >>>>>>
>> >>>> >>>>>> > And, the reason I am declaring a static variable is to
>> restore
>> >>>> >>>>>> > the
>> >>>> >>>>>> > value of timestamp for every record so that I can perform a
>> >>>> >>>>>> > comparison of
>> >>>> >>>>>> > the timestamps. Is there an alternative approach for this?
>> >>>> >>>>>>
>> >>>> >>>>>> Updating a global or static variable in a UDAF is guaranteed
>> not
>> >>>> >>>>>> to do
>> >>>> >>>>>> what you expect - the function can be invoked concurrently by
>> >>>> >>>>>> multiple
>> >>>> >>>>>> threads.
>> >>>> >>>>>>
>> >>>> >>>>>> It seems like you probably want to store some additional
>> state in
>> >>>> >>>>>> the
>> >>>> >>>>>> intermediate value. There are some sample UDAs here (see
>> Avg())
>> >>>> >>>>>> where
>> >>>> >>>>>> additional intermediate state is stored in a StringVal:
>> >>>> >>>>>>
>> >>>> >>>>>> https://github.com/cloudera/impala-udf-samples/blob/master/
>> uda-sample.cc#L61
>> >>>> >>>>>>
>> >>>> >>>>>>
>> >>>> >>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth
>> >>>> >>>>>> <ra...@gmail.com>
>> >>>> >>>>>> wrote:
>> >>>> >>>>>>>
>> >>>> >>>>>>> Thanks Bharath. Can you check if the logic I am implementing
>> is
>> >>>> >>>>>>> correct or needed any modification in it as well? I am very
>> new
>> >>>> >>>>>>> to Impala
>> >>>> >>>>>>> UDF & C++ and having some hard time figuring out the
>> problems.
>> >>>> >>>>>>>
>> >>>> >>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada
>> >>>> >>>>>>> <bh...@cloudera.com> wrote:
>> >>>> >>>>>>>>
>> >>>> >>>>>>>> You need to allocate memory for tsTemp, else it can
>> segfault.
>> >>>> >>>>>>>> That
>> >>>> >>>>>>>> could be the issue here.
>> >>>> >>>>>>>>
>> >>>> >>>>>>>>  static TimestampVal* tsTemp;
>> >>>> >>>>>>>>       tsTemp->date = 0;
>> >>>> >>>>>>>>       tsTemp->time_of_day = 0;
>> >>>> >>>>>>>>
>> >>>> >>>>>>>>
>> >>>> >>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth
>> >>>> >>>>>>>> <ra...@gmail.com> wrote:
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> Hi All,
>> >>>> >>>>>>>>> We are using Impala to do various processings in our
>> systems.
>> >>>> >>>>>>>>> We
>> >>>> >>>>>>>>> have a requirement recently, wherein we have to handle the
>> >>>> >>>>>>>>> updates on the
>> >>>> >>>>>>>>> events i.e, we have an 'e_update' table which has the
>> partial
>> >>>> >>>>>>>>> updates
>> >>>> >>>>>>>>> received for various events. The fields that are not
>> updated
>> >>>> >>>>>>>>> are being
>> >>>> >>>>>>>>> stored as NULL values.
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> Ex:
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C
>> (String)
>> >>>> >>>>>>>>> 1 0 1 NULL NULL
>> >>>> >>>>>>>>> 1 1 2 Hi NULL
>> >>>> >>>>>>>>> 1 3 4 Hello Hi
>> >>>> >>>>>>>>> 1 2 5 NULL NULL
>> >>>> >>>>>>>>> 1 4 NULL NULL Zero
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> P.S: Please consider Date_time as valid timestamp type
>> values.
>> >>>> >>>>>>>>> For
>> >>>> >>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> As seen in the above table, the events have a unique id
>> and as
>> >>>> >>>>>>>>> we
>> >>>> >>>>>>>>> get an update to a particular event, we are storing the
>> >>>> >>>>>>>>> date_time at which
>> >>>> >>>>>>>>> update has happened and also storing the partial updated
>> >>>> >>>>>>>>> values. Apart from
>> >>>> >>>>>>>>> the updated values, the rest are stored as NULL values.
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> We are planning to mimic inplace updates on the table, so
>> that
>> >>>> >>>>>>>>> it
>> >>>> >>>>>>>>> would retrieve the resulting table as follows using the
>> query
>> >>>> >>>>>>>>> below: We
>> >>>> >>>>>>>>> don't delete the data.
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> > SELECT id, current_val(A,date_time) as A,
>> >>>> >>>>>>>>> > current_val(B,date_time) as B, current_val(C,date_time)
>> as C
>> >>>> >>>>>>>>> > from e_update
>> >>>> >>>>>>>>> > GROUP BY ID;
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> where, current_val is a custom impala UDA we are planning
>> to
>> >>>> >>>>>>>>> implement. i.e. get latest non null value for the column.
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> ID (Int) A (Int) B (String) C (String)
>> >>>> >>>>>>>>> 1 4 Hello Zero
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> Implemented current_val UDA:
>> >>>> >>>>>>>>> The below code is only for int type inputs:
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> uda-currentval.h
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> //This is a sample for retrieving the current value of
>> >>>> >>>>>>>>> e_update
>> >>>> >>>>>>>>> table
>> >>>> >>>>>>>>> //
>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal*
>> val);
>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
>> >>>> >>>>>>>>> IntVal&
>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val);
>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const
>> IntVal&
>> >>>> >>>>>>>>> src,
>> >>>> >>>>>>>>> IntVal* dst);
>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context,
>> const
>> >>>> >>>>>>>>> IntVal&
>> >>>> >>>>>>>>> val);
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> uda-currentval.cc
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> //
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> ------------------------------
>> -----------------------------------------------------------------
>> >>>> >>>>>>>>> // This is a sample for retrieving the current value of
>> >>>> >>>>>>>>> e_update
>> >>>> >>>>>>>>> table
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> //----------------------------
>> -------------------------------------------------------------------
>> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal*
>> val) {
>> >>>> >>>>>>>>>       val->is_null = false;
>> >>>> >>>>>>>>>       val->val = 0;
>> >>>> >>>>>>>>> }
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
>> >>>> >>>>>>>>> IntVal&
>> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val) {
>> >>>> >>>>>>>>>       static TimestampVal* tsTemp;
>> >>>> >>>>>>>>>       tsTemp->date = 0;
>> >>>> >>>>>>>>>       tsTemp->time_of_day = 0;
>> >>>> >>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>> >>>> >>>>>>>>>         tsTemp->date = ts.date;
>> >>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>> >>>> >>>>>>>>>         val->val = input.val;
>> >>>> >>>>>>>>>         return;
>> >>>> >>>>>>>>>       }
>> >>>> >>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day >
>> >>>> >>>>>>>>> tsTemp->time_of_day){
>> >>>> >>>>>>>>>         tsTemp->date = ts.date;
>> >>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>> >>>> >>>>>>>>>         val->val = input.val;
>> >>>> >>>>>>>>>         return;
>> >>>> >>>>>>>>>       }
>> >>>> >>>>>>>>> }
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const
>> IntVal&
>> >>>> >>>>>>>>> src,
>> >>>> >>>>>>>>> IntVal* dst) {
>> >>>> >>>>>>>>>      dst->val += src.val;
>> >>>> >>>>>>>>> }
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context,
>> const
>> >>>> >>>>>>>>> IntVal&
>> >>>> >>>>>>>>> val) {
>> >>>> >>>>>>>>>      return val;
>> >>>> >>>>>>>>> }
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> We are able to build and create an aggregate function in
>> >>>> >>>>>>>>> impala,
>> >>>> >>>>>>>>> but when trying to run the select query similar to the one
>> >>>> >>>>>>>>> above, it is
>> >>>> >>>>>>>>> bringing down couple of impala deamons and throwing the
>> error
>> >>>> >>>>>>>>> below and
>> >>>> >>>>>>>>> getting terminated.
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>> >>>> >>>>>>>>> hadoop102.**.**.**.com:22000
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> We have impalad running on 14 instances.
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>>
>> >>>> >>>>>>>>> Can someone help resolve us this problem and a better way
>> to
>> >>>> >>>>>>>>> achieve a solution for the scenario explained.
>> >>>> >>>>>>>>
>> >>>> >>>>>>>>
>> >>>> >>>>>>>
>> >>>> >>>>>>
>> >>>> >>>>>>
>> >>>> >>>>
>> >>>> >>
>> >>>> >
>>
>
>

Re: Creating Impala UDA

Posted by Ravi Kanth <ra...@gmail.com>.
Thanks for the suggestion Matthew. Let me look into the patch. I am
currently working on building a custom UDA. Hopefully the information you
provided and the discussion we had might be useful to me.

On 21 June 2017 at 13:39, Matthew Jacobs <mj...@cloudera.com> wrote:

> I'd strongly recommend the latter (upgrading). We don't really expose
> the analytic function interface, so you'd end up writing an Impala
> patch, and analytic functions are particularly tricky.
>
> Here's Thomas' patch to add 'ignore nulls' in first/last value:
> https://gerrit.cloudera.org/#/c/3328/
>
> On Wed, Jun 21, 2017 at 1:08 PM, Ravi Kanth <ra...@gmail.com>
> wrote:
> > Thanks All. I will think of a possible solution either by implementing a
> > Custom UDA or would update the version.
> >
> > On Wed, Jun 21, 2017 at 13:04 Thomas Tauber-Marshall
> > <tm...@cloudera.com> wrote:
> >>
> >> On Wed, Jun 21, 2017 at 2:33 PM Ravi Kanth <ra...@gmail.com>
> >> wrote:
> >>>
> >>> Ya. I agree with you Thomas. Probably that's what I'm doing wrong.
> >>>
> >>> Unfortunately, as mentioned the version of impala we are using I belive
> >>> it doesn't support ignore nulls.
> >>>
> >>> But, my question is would last_value function retrieve a latest not
> null
> >>> value irrespective of using ignore nulls?
> >>
> >>
> >> Not sure I follow - if you use last_value without ignore nulls, you'll
> get
> >> the latest value taking all values into consideration, which may or may
> not
> >> be null.
> >>
> >>>
> >>>
> >>> Ravi
> >>>
> >>> On Wed, Jun 21, 2017 at 12:23 Matthew Jacobs <mj...@cloudera.com> wrote:
> >>>>
> >>>> Ah I think Thomas is right. I read the expected results and the query
> >>>> too quickly, so my comment about the asc/desc is probably wrong.
> >>>> Clearly my point about analytic functions being tricky holds true :)
> >>>>
> >>>> On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall
> >>>> <tm...@cloudera.com> wrote:
> >>>> >
> >>>> >
> >>>> > On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth <ravikanth.4b0@gmail.com
> >
> >>>> > wrote:
> >>>> >>
> >>>> >> Thomas,
> >>>> >>
> >>>> >> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I
> see
> >>>> >> ignore nulls has been added in Impala 2.7.0. And, does adding
> ignore
> >>>> >> nulls
> >>>> >> would make a big difference in the expected result?
> >>>> >
> >>>> >
> >>>> > That's too bad. I think that 'ignore nulls' would give you what you
> >>>> > want -
> >>>> > the problem with the query that you posted is that it eliminates
> rows
> >>>> > that
> >>>> > don't match the where clause, so for example the row with "Zero" in
> it
> >>>> > is
> >>>> > eliminated because it is filtered out by the "where a is not null",
> >>>> > whereas
> >>>> > "ignore nulls" only affects the values that could be returned by the
> >>>> > specific analytic function that the ignore is applied to.
> >>>> >
> >>>> >>
> >>>> >>
> >>>> >> Ravi
> >>>> >>
> >>>> >> On 21 June 2017 at 11:20, Thomas Tauber-Marshall
> >>>> >> <tm...@cloudera.com>
> >>>> >> wrote:
> >>>> >>>
> >>>> >>> Ravi,
> >>>> >>>
> >>>> >>> Instead of using the "where ... is not null", have you tried
> >>>> >>> 'last_value(... ignore nulls)'
> >>>> >>>
> >>>> >>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <
> ravikanth.4b0@gmail.com>
> >>>> >>> wrote:
> >>>> >>>>
> >>>> >>>> Antoni,
> >>>> >>>>
> >>>> >>>> The problem in using last_value function() as far as I observed
> is,
> >>>> >>>> if I
> >>>> >>>> use it on multiple columns in a single query, its not retrieving
> >>>> >>>> results as
> >>>> >>>> expected.
> >>>> >>>>
> >>>> >>>>  Input:
> >>>> >>>>
> >>>> >>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String)
> >>>> >>>> 101NULLNULL
> >>>> >>>> 112HiNULL
> >>>> >>>> 134HelloHi
> >>>> >>>> 125NULLNULL
> >>>> >>>> 14NULLNULLZero
> >>>> >>>>
> >>>> >>>> Expected Output:
> >>>> >>>>
> >>>> >>>>
> >>>> >>>>
> >>>> >>>> ID (Int)A (Int)B (String)C (String)
> >>>> >>>> 14HelloZero
> >>>> >>>>
> >>>> >>>>
> >>>> >>>> Query executed:
> >>>> >>>>
> >>>> >>>> select id, last_value(a) over(partition by id order by date_time
> >>>> >>>> desc)
> >>>> >>>> as a, last_value(b) over(partition by id order by date_time desc)
> >>>> >>>> as b,
> >>>> >>>> last_value(c) over(partition by id order by date_time desc) as c
> >>>> >>>> from
> >>>> >>>> udf_test where a is not null and b is not null and c is not null;
> >>>> >>>>
> >>>> >>>>
> >>>> >>>>
> >>>> >>>> Output I am getting:
> >>>> >>>>
> >>>> >>>> +----+---+-------+----+
> >>>> >>>>
> >>>> >>>> | id | a | b     | c  |
> >>>> >>>>
> >>>> >>>> +----+---+-------+----+
> >>>> >>>>
> >>>> >>>> | 1  | 4 | Hello | Hi ||
> >>>> >>>>
> >>>> >>>> +----+---+-------+----+
> >>>> >>>>
> >>>> >>>>
> >>>> >>>> Hopefully, I am clear with the problem above.
> >>>> >>>>
> >>>> >>>> Thanks,
> >>>> >>>> Ravi
> >>>> >>>>
> >>>> >>>> On 20 June 2017 at 22:05, Ravi Kanth <ra...@gmail.com>
> >>>> >>>> wrote:
> >>>> >>>>>
> >>>> >>>>> Antoni,
> >>>> >>>>>
> >>>> >>>>> Thanks for the suggestion. Let me have a look at it and
> hopefully
> >>>> >>>>> we
> >>>> >>>>> can use it in our use case.
> >>>> >>>>>
> >>>> >>>>> Thanks,
> >>>> >>>>> Ravi
> >>>> >>>>>
> >>>> >>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <aivanov@vmware.com
> >
> >>>> >>>>> wrote:
> >>>> >>>>>>
> >>>> >>>>>> Hi Ravi,
> >>>> >>>>>>
> >>>> >>>>>> I am curious why you are not using already existing last_value
> >>>> >>>>>> function in Impala to get "latest non null value for the
> column”
> >>>> >>>>>>
> >>>> >>>>>> e.g
> >>>> >>>>>> last_value(column_a ignore nulls) over(partition by ID  order
> by
> >>>> >>>>>> Date_Time)
> >>>> >>>>>>
> >>>> >>>>>> Thanks,
> >>>> >>>>>> Antoni
> >>>> >>>>>>
> >>>> >>>>>>
> >>>> >>>>>>
> >>>> >>>>>>
> >>>> >>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong
> >>>> >>>>>> <ta...@cloudera.com>
> >>>> >>>>>> wrote:
> >>>> >>>>>>
> >>>> >>>>>> This was double-posted to
> >>>> >>>>>>
> >>>> >>>>>> http://community.cloudera.com/t5/Interactive-Short-cycle-
> SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
> >>>> >>>>>> also. I'll continue the discussion here.
> >>>> >>>>>>
> >>>> >>>>>> > Can we have the flexibility of declaring the variable
> globally
> >>>> >>>>>> > in
> >>>> >>>>>> > UDF? Globally, I mean outside the function?
> >>>> >>>>>>
> >>>> >>>>>> > And, the reason I am declaring a static variable is to
> restore
> >>>> >>>>>> > the
> >>>> >>>>>> > value of timestamp for every record so that I can perform a
> >>>> >>>>>> > comparison of
> >>>> >>>>>> > the timestamps. Is there an alternative approach for this?
> >>>> >>>>>>
> >>>> >>>>>> Updating a global or static variable in a UDAF is guaranteed
> not
> >>>> >>>>>> to do
> >>>> >>>>>> what you expect - the function can be invoked concurrently by
> >>>> >>>>>> multiple
> >>>> >>>>>> threads.
> >>>> >>>>>>
> >>>> >>>>>> It seems like you probably want to store some additional state
> in
> >>>> >>>>>> the
> >>>> >>>>>> intermediate value. There are some sample UDAs here (see Avg())
> >>>> >>>>>> where
> >>>> >>>>>> additional intermediate state is stored in a StringVal:
> >>>> >>>>>>
> >>>> >>>>>> https://github.com/cloudera/impala-udf-samples/blob/
> master/uda-sample.cc#L61
> >>>> >>>>>>
> >>>> >>>>>>
> >>>> >>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth
> >>>> >>>>>> <ra...@gmail.com>
> >>>> >>>>>> wrote:
> >>>> >>>>>>>
> >>>> >>>>>>> Thanks Bharath. Can you check if the logic I am implementing
> is
> >>>> >>>>>>> correct or needed any modification in it as well? I am very
> new
> >>>> >>>>>>> to Impala
> >>>> >>>>>>> UDF & C++ and having some hard time figuring out the problems.
> >>>> >>>>>>>
> >>>> >>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada
> >>>> >>>>>>> <bh...@cloudera.com> wrote:
> >>>> >>>>>>>>
> >>>> >>>>>>>> You need to allocate memory for tsTemp, else it can segfault.
> >>>> >>>>>>>> That
> >>>> >>>>>>>> could be the issue here.
> >>>> >>>>>>>>
> >>>> >>>>>>>>  static TimestampVal* tsTemp;
> >>>> >>>>>>>>       tsTemp->date = 0;
> >>>> >>>>>>>>       tsTemp->time_of_day = 0;
> >>>> >>>>>>>>
> >>>> >>>>>>>>
> >>>> >>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth
> >>>> >>>>>>>> <ra...@gmail.com> wrote:
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> Hi All,
> >>>> >>>>>>>>> We are using Impala to do various processings in our
> systems.
> >>>> >>>>>>>>> We
> >>>> >>>>>>>>> have a requirement recently, wherein we have to handle the
> >>>> >>>>>>>>> updates on the
> >>>> >>>>>>>>> events i.e, we have an 'e_update' table which has the
> partial
> >>>> >>>>>>>>> updates
> >>>> >>>>>>>>> received for various events. The fields that are not updated
> >>>> >>>>>>>>> are being
> >>>> >>>>>>>>> stored as NULL values.
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> Ex:
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
> >>>> >>>>>>>>> 1 0 1 NULL NULL
> >>>> >>>>>>>>> 1 1 2 Hi NULL
> >>>> >>>>>>>>> 1 3 4 Hello Hi
> >>>> >>>>>>>>> 1 2 5 NULL NULL
> >>>> >>>>>>>>> 1 4 NULL NULL Zero
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> P.S: Please consider Date_time as valid timestamp type
> values.
> >>>> >>>>>>>>> For
> >>>> >>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> As seen in the above table, the events have a unique id and
> as
> >>>> >>>>>>>>> we
> >>>> >>>>>>>>> get an update to a particular event, we are storing the
> >>>> >>>>>>>>> date_time at which
> >>>> >>>>>>>>> update has happened and also storing the partial updated
> >>>> >>>>>>>>> values. Apart from
> >>>> >>>>>>>>> the updated values, the rest are stored as NULL values.
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> We are planning to mimic inplace updates on the table, so
> that
> >>>> >>>>>>>>> it
> >>>> >>>>>>>>> would retrieve the resulting table as follows using the
> query
> >>>> >>>>>>>>> below: We
> >>>> >>>>>>>>> don't delete the data.
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> > SELECT id, current_val(A,date_time) as A,
> >>>> >>>>>>>>> > current_val(B,date_time) as B, current_val(C,date_time)
> as C
> >>>> >>>>>>>>> > from e_update
> >>>> >>>>>>>>> > GROUP BY ID;
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> where, current_val is a custom impala UDA we are planning to
> >>>> >>>>>>>>> implement. i.e. get latest non null value for the column.
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> ID (Int) A (Int) B (String) C (String)
> >>>> >>>>>>>>> 1 4 Hello Zero
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> Implemented current_val UDA:
> >>>> >>>>>>>>> The below code is only for int type inputs:
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> uda-currentval.h
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> //This is a sample for retrieving the current value of
> >>>> >>>>>>>>> e_update
> >>>> >>>>>>>>> table
> >>>> >>>>>>>>> //
> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal*
> val);
> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
> >>>> >>>>>>>>> IntVal&
> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val);
> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const
> IntVal&
> >>>> >>>>>>>>> src,
> >>>> >>>>>>>>> IntVal* dst);
> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const
> >>>> >>>>>>>>> IntVal&
> >>>> >>>>>>>>> val);
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> uda-currentval.cc
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> //
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> ------------------------------
> -----------------------------------------------------------------
> >>>> >>>>>>>>> // This is a sample for retrieving the current value of
> >>>> >>>>>>>>> e_update
> >>>> >>>>>>>>> table
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> //----------------------------
> -------------------------------------------------------------------
> >>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal*
> val) {
> >>>> >>>>>>>>>       val->is_null = false;
> >>>> >>>>>>>>>       val->val = 0;
> >>>> >>>>>>>>> }
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
> >>>> >>>>>>>>> IntVal&
> >>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val) {
> >>>> >>>>>>>>>       static TimestampVal* tsTemp;
> >>>> >>>>>>>>>       tsTemp->date = 0;
> >>>> >>>>>>>>>       tsTemp->time_of_day = 0;
> >>>> >>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
> >>>> >>>>>>>>>         tsTemp->date = ts.date;
> >>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
> >>>> >>>>>>>>>         val->val = input.val;
> >>>> >>>>>>>>>         return;
> >>>> >>>>>>>>>       }
> >>>> >>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day >
> >>>> >>>>>>>>> tsTemp->time_of_day){
> >>>> >>>>>>>>>         tsTemp->date = ts.date;
> >>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
> >>>> >>>>>>>>>         val->val = input.val;
> >>>> >>>>>>>>>         return;
> >>>> >>>>>>>>>       }
> >>>> >>>>>>>>> }
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const
> IntVal&
> >>>> >>>>>>>>> src,
> >>>> >>>>>>>>> IntVal* dst) {
> >>>> >>>>>>>>>      dst->val += src.val;
> >>>> >>>>>>>>> }
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const
> >>>> >>>>>>>>> IntVal&
> >>>> >>>>>>>>> val) {
> >>>> >>>>>>>>>      return val;
> >>>> >>>>>>>>> }
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> We are able to build and create an aggregate function in
> >>>> >>>>>>>>> impala,
> >>>> >>>>>>>>> but when trying to run the select query similar to the one
> >>>> >>>>>>>>> above, it is
> >>>> >>>>>>>>> bringing down couple of impala deamons and throwing the
> error
> >>>> >>>>>>>>> below and
> >>>> >>>>>>>>> getting terminated.
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
> >>>> >>>>>>>>> hadoop102.**.**.**.com:22000
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> We have impalad running on 14 instances.
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>>
> >>>> >>>>>>>>> Can someone help resolve us this problem and a better way to
> >>>> >>>>>>>>> achieve a solution for the scenario explained.
> >>>> >>>>>>>>
> >>>> >>>>>>>>
> >>>> >>>>>>>
> >>>> >>>>>>
> >>>> >>>>>>
> >>>> >>>>
> >>>> >>
> >>>> >
>

Re: Creating Impala UDA

Posted by Matthew Jacobs <mj...@cloudera.com>.
I'd strongly recommend the latter (upgrading). We don't really expose
the analytic function interface, so you'd end up writing an Impala
patch, and analytic functions are particularly tricky.

Here's Thomas' patch to add 'ignore nulls' in first/last value:
https://gerrit.cloudera.org/#/c/3328/

On Wed, Jun 21, 2017 at 1:08 PM, Ravi Kanth <ra...@gmail.com> wrote:
> Thanks All. I will think of a possible solution either by implementing a
> Custom UDA or would update the version.
>
> On Wed, Jun 21, 2017 at 13:04 Thomas Tauber-Marshall
> <tm...@cloudera.com> wrote:
>>
>> On Wed, Jun 21, 2017 at 2:33 PM Ravi Kanth <ra...@gmail.com>
>> wrote:
>>>
>>> Ya. I agree with you Thomas. Probably that's what I'm doing wrong.
>>>
>>> Unfortunately, as mentioned the version of impala we are using I belive
>>> it doesn't support ignore nulls.
>>>
>>> But, my question is would last_value function retrieve a latest not null
>>> value irrespective of using ignore nulls?
>>
>>
>> Not sure I follow - if you use last_value without ignore nulls, you'll get
>> the latest value taking all values into consideration, which may or may not
>> be null.
>>
>>>
>>>
>>> Ravi
>>>
>>> On Wed, Jun 21, 2017 at 12:23 Matthew Jacobs <mj...@cloudera.com> wrote:
>>>>
>>>> Ah I think Thomas is right. I read the expected results and the query
>>>> too quickly, so my comment about the asc/desc is probably wrong.
>>>> Clearly my point about analytic functions being tricky holds true :)
>>>>
>>>> On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall
>>>> <tm...@cloudera.com> wrote:
>>>> >
>>>> >
>>>> > On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth <ra...@gmail.com>
>>>> > wrote:
>>>> >>
>>>> >> Thomas,
>>>> >>
>>>> >> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I see
>>>> >> ignore nulls has been added in Impala 2.7.0. And, does adding ignore
>>>> >> nulls
>>>> >> would make a big difference in the expected result?
>>>> >
>>>> >
>>>> > That's too bad. I think that 'ignore nulls' would give you what you
>>>> > want -
>>>> > the problem with the query that you posted is that it eliminates rows
>>>> > that
>>>> > don't match the where clause, so for example the row with "Zero" in it
>>>> > is
>>>> > eliminated because it is filtered out by the "where a is not null",
>>>> > whereas
>>>> > "ignore nulls" only affects the values that could be returned by the
>>>> > specific analytic function that the ignore is applied to.
>>>> >
>>>> >>
>>>> >>
>>>> >> Ravi
>>>> >>
>>>> >> On 21 June 2017 at 11:20, Thomas Tauber-Marshall
>>>> >> <tm...@cloudera.com>
>>>> >> wrote:
>>>> >>>
>>>> >>> Ravi,
>>>> >>>
>>>> >>> Instead of using the "where ... is not null", have you tried
>>>> >>> 'last_value(... ignore nulls)'
>>>> >>>
>>>> >>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <ra...@gmail.com>
>>>> >>> wrote:
>>>> >>>>
>>>> >>>> Antoni,
>>>> >>>>
>>>> >>>> The problem in using last_value function() as far as I observed is,
>>>> >>>> if I
>>>> >>>> use it on multiple columns in a single query, its not retrieving
>>>> >>>> results as
>>>> >>>> expected.
>>>> >>>>
>>>> >>>>  Input:
>>>> >>>>
>>>> >>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String)
>>>> >>>> 101NULLNULL
>>>> >>>> 112HiNULL
>>>> >>>> 134HelloHi
>>>> >>>> 125NULLNULL
>>>> >>>> 14NULLNULLZero
>>>> >>>>
>>>> >>>> Expected Output:
>>>> >>>>
>>>> >>>>
>>>> >>>>
>>>> >>>> ID (Int)A (Int)B (String)C (String)
>>>> >>>> 14HelloZero
>>>> >>>>
>>>> >>>>
>>>> >>>> Query executed:
>>>> >>>>
>>>> >>>> select id, last_value(a) over(partition by id order by date_time
>>>> >>>> desc)
>>>> >>>> as a, last_value(b) over(partition by id order by date_time desc)
>>>> >>>> as b,
>>>> >>>> last_value(c) over(partition by id order by date_time desc) as c
>>>> >>>> from
>>>> >>>> udf_test where a is not null and b is not null and c is not null;
>>>> >>>>
>>>> >>>>
>>>> >>>>
>>>> >>>> Output I am getting:
>>>> >>>>
>>>> >>>> +----+---+-------+----+
>>>> >>>>
>>>> >>>> | id | a | b     | c  |
>>>> >>>>
>>>> >>>> +----+---+-------+----+
>>>> >>>>
>>>> >>>> | 1  | 4 | Hello | Hi ||
>>>> >>>>
>>>> >>>> +----+---+-------+----+
>>>> >>>>
>>>> >>>>
>>>> >>>> Hopefully, I am clear with the problem above.
>>>> >>>>
>>>> >>>> Thanks,
>>>> >>>> Ravi
>>>> >>>>
>>>> >>>> On 20 June 2017 at 22:05, Ravi Kanth <ra...@gmail.com>
>>>> >>>> wrote:
>>>> >>>>>
>>>> >>>>> Antoni,
>>>> >>>>>
>>>> >>>>> Thanks for the suggestion. Let me have a look at it and hopefully
>>>> >>>>> we
>>>> >>>>> can use it in our use case.
>>>> >>>>>
>>>> >>>>> Thanks,
>>>> >>>>> Ravi
>>>> >>>>>
>>>> >>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <ai...@vmware.com>
>>>> >>>>> wrote:
>>>> >>>>>>
>>>> >>>>>> Hi Ravi,
>>>> >>>>>>
>>>> >>>>>> I am curious why you are not using already existing last_value
>>>> >>>>>> function in Impala to get "latest non null value for the column”
>>>> >>>>>>
>>>> >>>>>> e.g
>>>> >>>>>> last_value(column_a ignore nulls) over(partition by ID  order by
>>>> >>>>>> Date_Time)
>>>> >>>>>>
>>>> >>>>>> Thanks,
>>>> >>>>>> Antoni
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong
>>>> >>>>>> <ta...@cloudera.com>
>>>> >>>>>> wrote:
>>>> >>>>>>
>>>> >>>>>> This was double-posted to
>>>> >>>>>>
>>>> >>>>>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
>>>> >>>>>> also. I'll continue the discussion here.
>>>> >>>>>>
>>>> >>>>>> > Can we have the flexibility of declaring the variable globally
>>>> >>>>>> > in
>>>> >>>>>> > UDF? Globally, I mean outside the function?
>>>> >>>>>>
>>>> >>>>>> > And, the reason I am declaring a static variable is to restore
>>>> >>>>>> > the
>>>> >>>>>> > value of timestamp for every record so that I can perform a
>>>> >>>>>> > comparison of
>>>> >>>>>> > the timestamps. Is there an alternative approach for this?
>>>> >>>>>>
>>>> >>>>>> Updating a global or static variable in a UDAF is guaranteed not
>>>> >>>>>> to do
>>>> >>>>>> what you expect - the function can be invoked concurrently by
>>>> >>>>>> multiple
>>>> >>>>>> threads.
>>>> >>>>>>
>>>> >>>>>> It seems like you probably want to store some additional state in
>>>> >>>>>> the
>>>> >>>>>> intermediate value. There are some sample UDAs here (see Avg())
>>>> >>>>>> where
>>>> >>>>>> additional intermediate state is stored in a StringVal:
>>>> >>>>>>
>>>> >>>>>> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth
>>>> >>>>>> <ra...@gmail.com>
>>>> >>>>>> wrote:
>>>> >>>>>>>
>>>> >>>>>>> Thanks Bharath. Can you check if the logic I am implementing is
>>>> >>>>>>> correct or needed any modification in it as well? I am very new
>>>> >>>>>>> to Impala
>>>> >>>>>>> UDF & C++ and having some hard time figuring out the problems.
>>>> >>>>>>>
>>>> >>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada
>>>> >>>>>>> <bh...@cloudera.com> wrote:
>>>> >>>>>>>>
>>>> >>>>>>>> You need to allocate memory for tsTemp, else it can segfault.
>>>> >>>>>>>> That
>>>> >>>>>>>> could be the issue here.
>>>> >>>>>>>>
>>>> >>>>>>>>  static TimestampVal* tsTemp;
>>>> >>>>>>>>       tsTemp->date = 0;
>>>> >>>>>>>>       tsTemp->time_of_day = 0;
>>>> >>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth
>>>> >>>>>>>> <ra...@gmail.com> wrote:
>>>> >>>>>>>>>
>>>> >>>>>>>>> Hi All,
>>>> >>>>>>>>> We are using Impala to do various processings in our systems.
>>>> >>>>>>>>> We
>>>> >>>>>>>>> have a requirement recently, wherein we have to handle the
>>>> >>>>>>>>> updates on the
>>>> >>>>>>>>> events i.e, we have an 'e_update' table which has the partial
>>>> >>>>>>>>> updates
>>>> >>>>>>>>> received for various events. The fields that are not updated
>>>> >>>>>>>>> are being
>>>> >>>>>>>>> stored as NULL values.
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> Ex:
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>>> >>>>>>>>> 1 0 1 NULL NULL
>>>> >>>>>>>>> 1 1 2 Hi NULL
>>>> >>>>>>>>> 1 3 4 Hello Hi
>>>> >>>>>>>>> 1 2 5 NULL NULL
>>>> >>>>>>>>> 1 4 NULL NULL Zero
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> P.S: Please consider Date_time as valid timestamp type values.
>>>> >>>>>>>>> For
>>>> >>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> As seen in the above table, the events have a unique id and as
>>>> >>>>>>>>> we
>>>> >>>>>>>>> get an update to a particular event, we are storing the
>>>> >>>>>>>>> date_time at which
>>>> >>>>>>>>> update has happened and also storing the partial updated
>>>> >>>>>>>>> values. Apart from
>>>> >>>>>>>>> the updated values, the rest are stored as NULL values.
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> We are planning to mimic inplace updates on the table, so that
>>>> >>>>>>>>> it
>>>> >>>>>>>>> would retrieve the resulting table as follows using the query
>>>> >>>>>>>>> below: We
>>>> >>>>>>>>> don't delete the data.
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> > SELECT id, current_val(A,date_time) as A,
>>>> >>>>>>>>> > current_val(B,date_time) as B, current_val(C,date_time) as C
>>>> >>>>>>>>> > from e_update
>>>> >>>>>>>>> > GROUP BY ID;
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> where, current_val is a custom impala UDA we are planning to
>>>> >>>>>>>>> implement. i.e. get latest non null value for the column.
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> ID (Int) A (Int) B (String) C (String)
>>>> >>>>>>>>> 1 4 Hello Zero
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> Implemented current_val UDA:
>>>> >>>>>>>>> The below code is only for int type inputs:
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> uda-currentval.h
>>>> >>>>>>>>>
>>>> >>>>>>>>> //This is a sample for retrieving the current value of
>>>> >>>>>>>>> e_update
>>>> >>>>>>>>> table
>>>> >>>>>>>>> //
>>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
>>>> >>>>>>>>> IntVal&
>>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val);
>>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal&
>>>> >>>>>>>>> src,
>>>> >>>>>>>>> IntVal* dst);
>>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const
>>>> >>>>>>>>> IntVal&
>>>> >>>>>>>>> val);
>>>> >>>>>>>>>
>>>> >>>>>>>>> uda-currentval.cc
>>>> >>>>>>>>>
>>>> >>>>>>>>> //
>>>> >>>>>>>>>
>>>> >>>>>>>>> -----------------------------------------------------------------------------------------------
>>>> >>>>>>>>> // This is a sample for retrieving the current value of
>>>> >>>>>>>>> e_update
>>>> >>>>>>>>> table
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> //-----------------------------------------------------------------------------------------------
>>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>>> >>>>>>>>>       val->is_null = false;
>>>> >>>>>>>>>       val->val = 0;
>>>> >>>>>>>>> }
>>>> >>>>>>>>>
>>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const
>>>> >>>>>>>>> IntVal&
>>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val) {
>>>> >>>>>>>>>       static TimestampVal* tsTemp;
>>>> >>>>>>>>>       tsTemp->date = 0;
>>>> >>>>>>>>>       tsTemp->time_of_day = 0;
>>>> >>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>>> >>>>>>>>>         tsTemp->date = ts.date;
>>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>> >>>>>>>>>         val->val = input.val;
>>>> >>>>>>>>>         return;
>>>> >>>>>>>>>       }
>>>> >>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day >
>>>> >>>>>>>>> tsTemp->time_of_day){
>>>> >>>>>>>>>         tsTemp->date = ts.date;
>>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>> >>>>>>>>>         val->val = input.val;
>>>> >>>>>>>>>         return;
>>>> >>>>>>>>>       }
>>>> >>>>>>>>> }
>>>> >>>>>>>>>
>>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal&
>>>> >>>>>>>>> src,
>>>> >>>>>>>>> IntVal* dst) {
>>>> >>>>>>>>>      dst->val += src.val;
>>>> >>>>>>>>> }
>>>> >>>>>>>>>
>>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const
>>>> >>>>>>>>> IntVal&
>>>> >>>>>>>>> val) {
>>>> >>>>>>>>>      return val;
>>>> >>>>>>>>> }
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> We are able to build and create an aggregate function in
>>>> >>>>>>>>> impala,
>>>> >>>>>>>>> but when trying to run the select query similar to the one
>>>> >>>>>>>>> above, it is
>>>> >>>>>>>>> bringing down couple of impala deamons and throwing the error
>>>> >>>>>>>>> below and
>>>> >>>>>>>>> getting terminated.
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>>> >>>>>>>>> hadoop102.**.**.**.com:22000
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> We have impalad running on 14 instances.
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> Can someone help resolve us this problem and a better way to
>>>> >>>>>>>>> achieve a solution for the scenario explained.
>>>> >>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>>
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>
>>>> >>
>>>> >

Re: Creating Impala UDA

Posted by Ravi Kanth <ra...@gmail.com>.
Thanks All. I will think of a possible solution either by implementing a
Custom UDA or would update the version.

On Wed, Jun 21, 2017 at 13:04 Thomas Tauber-Marshall <tm...@cloudera.com>
wrote:

> On Wed, Jun 21, 2017 at 2:33 PM Ravi Kanth <ra...@gmail.com>
> wrote:
>
>> Ya. I agree with you Thomas. Probably that's what I'm doing wrong.
>>
>> Unfortunately, as mentioned the version of impala we are using I belive
>> it doesn't support ignore nulls.
>>
>> But, my question is would last_value function retrieve a latest not null
>> value irrespective of using ignore nulls?
>>
>
> Not sure I follow - if you use last_value without ignore nulls, you'll get
> the latest value taking all values into consideration, which may or may not
> be null.
>
>
>>
>> Ravi
>>
>> On Wed, Jun 21, 2017 at 12:23 Matthew Jacobs <mj...@cloudera.com> wrote:
>>
>>> Ah I think Thomas is right. I read the expected results and the query
>>> too quickly, so my comment about the asc/desc is probably wrong.
>>> Clearly my point about analytic functions being tricky holds true :)
>>>
>>> On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall
>>> <tm...@cloudera.com> wrote:
>>> >
>>> >
>>> > On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth <ra...@gmail.com>
>>> wrote:
>>> >>
>>> >> Thomas,
>>> >>
>>> >> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I see
>>> >> ignore nulls has been added in Impala 2.7.0. And, does adding ignore
>>> nulls
>>> >> would make a big difference in the expected result?
>>> >
>>> >
>>> > That's too bad. I think that 'ignore nulls' would give you what you
>>> want -
>>> > the problem with the query that you posted is that it eliminates rows
>>> that
>>> > don't match the where clause, so for example the row with "Zero" in it
>>> is
>>> > eliminated because it is filtered out by the "where a is not null",
>>> whereas
>>> > "ignore nulls" only affects the values that could be returned by the
>>> > specific analytic function that the ignore is applied to.
>>> >
>>> >>
>>> >>
>>> >> Ravi
>>> >>
>>> >> On 21 June 2017 at 11:20, Thomas Tauber-Marshall <
>>> tmarshall@cloudera.com>
>>> >> wrote:
>>> >>>
>>> >>> Ravi,
>>> >>>
>>> >>> Instead of using the "where ... is not null", have you tried
>>> >>> 'last_value(... ignore nulls)'
>>> >>>
>>> >>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <ra...@gmail.com>
>>> >>> wrote:
>>> >>>>
>>> >>>> Antoni,
>>> >>>>
>>> >>>> The problem in using last_value function() as far as I observed is,
>>> if I
>>> >>>> use it on multiple columns in a single query, its not retrieving
>>> results as
>>> >>>> expected.
>>> >>>>
>>> >>>>  Input:
>>> >>>>
>>> >>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String)
>>> >>>> 101NULLNULL
>>> >>>> 112HiNULL
>>> >>>> 134HelloHi
>>> >>>> 125NULLNULL
>>> >>>> 14NULLNULLZero
>>> >>>>
>>> >>>> Expected Output:
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> ID (Int)A (Int)B (String)C (String)
>>> >>>> 14HelloZero
>>> >>>>
>>> >>>>
>>> >>>> Query executed:
>>> >>>>
>>> >>>> select id, last_value(a) over(partition by id order by date_time
>>> desc)
>>> >>>> as a, last_value(b) over(partition by id order by date_time desc)
>>> as b,
>>> >>>> last_value(c) over(partition by id order by date_time desc) as c
>>> from
>>> >>>> udf_test where a is not null and b is not null and c is not null;
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> Output I am getting:
>>> >>>>
>>> >>>> +----+---+-------+----+
>>> >>>>
>>> >>>> | id | a | b     | c  |
>>> >>>>
>>> >>>> +----+---+-------+----+
>>> >>>>
>>> >>>> | 1  | 4 | Hello | Hi ||
>>> >>>>
>>> >>>> +----+---+-------+----+
>>> >>>>
>>> >>>>
>>> >>>> Hopefully, I am clear with the problem above.
>>> >>>>
>>> >>>> Thanks,
>>> >>>> Ravi
>>> >>>>
>>> >>>> On 20 June 2017 at 22:05, Ravi Kanth <ra...@gmail.com>
>>> wrote:
>>> >>>>>
>>> >>>>> Antoni,
>>> >>>>>
>>> >>>>> Thanks for the suggestion. Let me have a look at it and hopefully
>>> we
>>> >>>>> can use it in our use case.
>>> >>>>>
>>> >>>>> Thanks,
>>> >>>>> Ravi
>>> >>>>>
>>> >>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <ai...@vmware.com>
>>> wrote:
>>> >>>>>>
>>> >>>>>> Hi Ravi,
>>> >>>>>>
>>> >>>>>> I am curious why you are not using already existing last_value
>>> >>>>>> function in Impala to get "latest non null value for the column”
>>> >>>>>>
>>> >>>>>> e.g
>>> >>>>>> last_value(column_a ignore nulls) over(partition by ID  order by
>>> >>>>>> Date_Time)
>>> >>>>>>
>>> >>>>>> Thanks,
>>> >>>>>> Antoni
>>> >>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <
>>> tarmstrong@cloudera.com>
>>> >>>>>> wrote:
>>> >>>>>>
>>> >>>>>> This was double-posted to
>>> >>>>>>
>>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
>>> >>>>>> also. I'll continue the discussion here.
>>> >>>>>>
>>> >>>>>> > Can we have the flexibility of declaring the variable globally
>>> in
>>> >>>>>> > UDF? Globally, I mean outside the function?
>>> >>>>>>
>>> >>>>>> > And, the reason I am declaring a static variable is to restore
>>> the
>>> >>>>>> > value of timestamp for every record so that I can perform a
>>> comparison of
>>> >>>>>> > the timestamps. Is there an alternative approach for this?
>>> >>>>>>
>>> >>>>>> Updating a global or static variable in a UDAF is guaranteed not
>>> to do
>>> >>>>>> what you expect - the function can be invoked concurrently by
>>> multiple
>>> >>>>>> threads.
>>> >>>>>>
>>> >>>>>> It seems like you probably want to store some additional state in
>>> the
>>> >>>>>> intermediate value. There are some sample UDAs here (see Avg())
>>> where
>>> >>>>>> additional intermediate state is stored in a StringVal:
>>> >>>>>>
>>> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <
>>> ravikanth.4b0@gmail.com>
>>> >>>>>> wrote:
>>> >>>>>>>
>>> >>>>>>> Thanks Bharath. Can you check if the logic I am implementing is
>>> >>>>>>> correct or needed any modification in it as well? I am very new
>>> to Impala
>>> >>>>>>> UDF & C++ and having some hard time figuring out the problems.
>>> >>>>>>>
>>> >>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada
>>> >>>>>>> <bh...@cloudera.com> wrote:
>>> >>>>>>>>
>>> >>>>>>>> You need to allocate memory for tsTemp, else it can segfault.
>>> That
>>> >>>>>>>> could be the issue here.
>>> >>>>>>>>
>>> >>>>>>>>  static TimestampVal* tsTemp;
>>> >>>>>>>>       tsTemp->date = 0;
>>> >>>>>>>>       tsTemp->time_of_day = 0;
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth
>>> >>>>>>>> <ra...@gmail.com> wrote:
>>> >>>>>>>>>
>>> >>>>>>>>> Hi All,
>>> >>>>>>>>> We are using Impala to do various processings in our systems.
>>> We
>>> >>>>>>>>> have a requirement recently, wherein we have to handle the
>>> updates on the
>>> >>>>>>>>> events i.e, we have an 'e_update' table which has the partial
>>> updates
>>> >>>>>>>>> received for various events. The fields that are not updated
>>> are being
>>> >>>>>>>>> stored as NULL values.
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> Ex:
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>> >>>>>>>>> 1 0 1 NULL NULL
>>> >>>>>>>>> 1 1 2 Hi NULL
>>> >>>>>>>>> 1 3 4 Hello Hi
>>> >>>>>>>>> 1 2 5 NULL NULL
>>> >>>>>>>>> 1 4 NULL NULL Zero
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> P.S: Please consider Date_time as valid timestamp type values.
>>> For
>>> >>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> As seen in the above table, the events have a unique id and as
>>> we
>>> >>>>>>>>> get an update to a particular event, we are storing the
>>> date_time at which
>>> >>>>>>>>> update has happened and also storing the partial updated
>>> values. Apart from
>>> >>>>>>>>> the updated values, the rest are stored as NULL values.
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> We are planning to mimic inplace updates on the table, so that
>>> it
>>> >>>>>>>>> would retrieve the resulting table as follows using the query
>>> below: We
>>> >>>>>>>>> don't delete the data.
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> > SELECT id, current_val(A,date_time) as A,
>>> >>>>>>>>> > current_val(B,date_time) as B, current_val(C,date_time) as C
>>> from e_update
>>> >>>>>>>>> > GROUP BY ID;
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> where, current_val is a custom impala UDA we are planning to
>>> >>>>>>>>> implement. i.e. get latest non null value for the column.
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> ID (Int) A (Int) B (String) C (String)
>>> >>>>>>>>> 1 4 Hello Zero
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> Implemented current_val UDA:
>>> >>>>>>>>> The below code is only for int type inputs:
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> uda-currentval.h
>>> >>>>>>>>>
>>> >>>>>>>>> //This is a sample for retrieving the current value of e_update
>>> >>>>>>>>> table
>>> >>>>>>>>> //
>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal&
>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val);
>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal&
>>> src,
>>> >>>>>>>>> IntVal* dst);
>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const
>>> IntVal&
>>> >>>>>>>>> val);
>>> >>>>>>>>>
>>> >>>>>>>>> uda-currentval.cc
>>> >>>>>>>>>
>>> >>>>>>>>> //
>>> >>>>>>>>>
>>> -----------------------------------------------------------------------------------------------
>>> >>>>>>>>> // This is a sample for retrieving the current value of
>>> e_update
>>> >>>>>>>>> table
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> //-----------------------------------------------------------------------------------------------
>>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>> >>>>>>>>>       val->is_null = false;
>>> >>>>>>>>>       val->val = 0;
>>> >>>>>>>>> }
>>> >>>>>>>>>
>>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal&
>>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val) {
>>> >>>>>>>>>       static TimestampVal* tsTemp;
>>> >>>>>>>>>       tsTemp->date = 0;
>>> >>>>>>>>>       tsTemp->time_of_day = 0;
>>> >>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>> >>>>>>>>>         tsTemp->date = ts.date;
>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>> >>>>>>>>>         val->val = input.val;
>>> >>>>>>>>>         return;
>>> >>>>>>>>>       }
>>> >>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day >
>>> >>>>>>>>> tsTemp->time_of_day){
>>> >>>>>>>>>         tsTemp->date = ts.date;
>>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>> >>>>>>>>>         val->val = input.val;
>>> >>>>>>>>>         return;
>>> >>>>>>>>>       }
>>> >>>>>>>>> }
>>> >>>>>>>>>
>>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal&
>>> src,
>>> >>>>>>>>> IntVal* dst) {
>>> >>>>>>>>>      dst->val += src.val;
>>> >>>>>>>>> }
>>> >>>>>>>>>
>>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const
>>> IntVal&
>>> >>>>>>>>> val) {
>>> >>>>>>>>>      return val;
>>> >>>>>>>>> }
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> We are able to build and create an aggregate function in
>>> impala,
>>> >>>>>>>>> but when trying to run the select query similar to the one
>>> above, it is
>>> >>>>>>>>> bringing down couple of impala deamons and throwing the error
>>> below and
>>> >>>>>>>>> getting terminated.
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>> >>>>>>>>> hadoop102.**.**.**.com:22000
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> We have impalad running on 14 instances.
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> Can someone help resolve us this problem and a better way to
>>> >>>>>>>>> achieve a solution for the scenario explained.
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>>
>>> >>
>>> >
>>>
>>

Re: Creating Impala UDA

Posted by Thomas Tauber-Marshall <tm...@cloudera.com>.
On Wed, Jun 21, 2017 at 2:33 PM Ravi Kanth <ra...@gmail.com> wrote:

> Ya. I agree with you Thomas. Probably that's what I'm doing wrong.
>
> Unfortunately, as mentioned the version of impala we are using I belive it
> doesn't support ignore nulls.
>
> But, my question is would last_value function retrieve a latest not null
> value irrespective of using ignore nulls?
>

Not sure I follow - if you use last_value without ignore nulls, you'll get
the latest value taking all values into consideration, which may or may not
be null.


>
> Ravi
>
> On Wed, Jun 21, 2017 at 12:23 Matthew Jacobs <mj...@cloudera.com> wrote:
>
>> Ah I think Thomas is right. I read the expected results and the query
>> too quickly, so my comment about the asc/desc is probably wrong.
>> Clearly my point about analytic functions being tricky holds true :)
>>
>> On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall
>> <tm...@cloudera.com> wrote:
>> >
>> >
>> > On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth <ra...@gmail.com>
>> wrote:
>> >>
>> >> Thomas,
>> >>
>> >> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I see
>> >> ignore nulls has been added in Impala 2.7.0. And, does adding ignore
>> nulls
>> >> would make a big difference in the expected result?
>> >
>> >
>> > That's too bad. I think that 'ignore nulls' would give you what you
>> want -
>> > the problem with the query that you posted is that it eliminates rows
>> that
>> > don't match the where clause, so for example the row with "Zero" in it
>> is
>> > eliminated because it is filtered out by the "where a is not null",
>> whereas
>> > "ignore nulls" only affects the values that could be returned by the
>> > specific analytic function that the ignore is applied to.
>> >
>> >>
>> >>
>> >> Ravi
>> >>
>> >> On 21 June 2017 at 11:20, Thomas Tauber-Marshall <
>> tmarshall@cloudera.com>
>> >> wrote:
>> >>>
>> >>> Ravi,
>> >>>
>> >>> Instead of using the "where ... is not null", have you tried
>> >>> 'last_value(... ignore nulls)'
>> >>>
>> >>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <ra...@gmail.com>
>> >>> wrote:
>> >>>>
>> >>>> Antoni,
>> >>>>
>> >>>> The problem in using last_value function() as far as I observed is,
>> if I
>> >>>> use it on multiple columns in a single query, its not retrieving
>> results as
>> >>>> expected.
>> >>>>
>> >>>>  Input:
>> >>>>
>> >>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String)
>> >>>> 101NULLNULL
>> >>>> 112HiNULL
>> >>>> 134HelloHi
>> >>>> 125NULLNULL
>> >>>> 14NULLNULLZero
>> >>>>
>> >>>> Expected Output:
>> >>>>
>> >>>>
>> >>>>
>> >>>> ID (Int)A (Int)B (String)C (String)
>> >>>> 14HelloZero
>> >>>>
>> >>>>
>> >>>> Query executed:
>> >>>>
>> >>>> select id, last_value(a) over(partition by id order by date_time
>> desc)
>> >>>> as a, last_value(b) over(partition by id order by date_time desc) as
>> b,
>> >>>> last_value(c) over(partition by id order by date_time desc) as c from
>> >>>> udf_test where a is not null and b is not null and c is not null;
>> >>>>
>> >>>>
>> >>>>
>> >>>> Output I am getting:
>> >>>>
>> >>>> +----+---+-------+----+
>> >>>>
>> >>>> | id | a | b     | c  |
>> >>>>
>> >>>> +----+---+-------+----+
>> >>>>
>> >>>> | 1  | 4 | Hello | Hi ||
>> >>>>
>> >>>> +----+---+-------+----+
>> >>>>
>> >>>>
>> >>>> Hopefully, I am clear with the problem above.
>> >>>>
>> >>>> Thanks,
>> >>>> Ravi
>> >>>>
>> >>>> On 20 June 2017 at 22:05, Ravi Kanth <ra...@gmail.com>
>> wrote:
>> >>>>>
>> >>>>> Antoni,
>> >>>>>
>> >>>>> Thanks for the suggestion. Let me have a look at it and hopefully we
>> >>>>> can use it in our use case.
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Ravi
>> >>>>>
>> >>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <ai...@vmware.com>
>> wrote:
>> >>>>>>
>> >>>>>> Hi Ravi,
>> >>>>>>
>> >>>>>> I am curious why you are not using already existing last_value
>> >>>>>> function in Impala to get "latest non null value for the column”
>> >>>>>>
>> >>>>>> e.g
>> >>>>>> last_value(column_a ignore nulls) over(partition by ID  order by
>> >>>>>> Date_Time)
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Antoni
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <
>> tarmstrong@cloudera.com>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>> This was double-posted to
>> >>>>>>
>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
>> >>>>>> also. I'll continue the discussion here.
>> >>>>>>
>> >>>>>> > Can we have the flexibility of declaring the variable globally in
>> >>>>>> > UDF? Globally, I mean outside the function?
>> >>>>>>
>> >>>>>> > And, the reason I am declaring a static variable is to restore
>> the
>> >>>>>> > value of timestamp for every record so that I can perform a
>> comparison of
>> >>>>>> > the timestamps. Is there an alternative approach for this?
>> >>>>>>
>> >>>>>> Updating a global or static variable in a UDAF is guaranteed not
>> to do
>> >>>>>> what you expect - the function can be invoked concurrently by
>> multiple
>> >>>>>> threads.
>> >>>>>>
>> >>>>>> It seems like you probably want to store some additional state in
>> the
>> >>>>>> intermediate value. There are some sample UDAs here (see Avg())
>> where
>> >>>>>> additional intermediate state is stored in a StringVal:
>> >>>>>>
>> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
>> >>>>>>
>> >>>>>>
>> >>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <
>> ravikanth.4b0@gmail.com>
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>> Thanks Bharath. Can you check if the logic I am implementing is
>> >>>>>>> correct or needed any modification in it as well? I am very new
>> to Impala
>> >>>>>>> UDF & C++ and having some hard time figuring out the problems.
>> >>>>>>>
>> >>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada
>> >>>>>>> <bh...@cloudera.com> wrote:
>> >>>>>>>>
>> >>>>>>>> You need to allocate memory for tsTemp, else it can segfault.
>> That
>> >>>>>>>> could be the issue here.
>> >>>>>>>>
>> >>>>>>>>  static TimestampVal* tsTemp;
>> >>>>>>>>       tsTemp->date = 0;
>> >>>>>>>>       tsTemp->time_of_day = 0;
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth
>> >>>>>>>> <ra...@gmail.com> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi All,
>> >>>>>>>>> We are using Impala to do various processings in our systems. We
>> >>>>>>>>> have a requirement recently, wherein we have to handle the
>> updates on the
>> >>>>>>>>> events i.e, we have an 'e_update' table which has the partial
>> updates
>> >>>>>>>>> received for various events. The fields that are not updated
>> are being
>> >>>>>>>>> stored as NULL values.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Ex:
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>> >>>>>>>>> 1 0 1 NULL NULL
>> >>>>>>>>> 1 1 2 Hi NULL
>> >>>>>>>>> 1 3 4 Hello Hi
>> >>>>>>>>> 1 2 5 NULL NULL
>> >>>>>>>>> 1 4 NULL NULL Zero
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> P.S: Please consider Date_time as valid timestamp type values.
>> For
>> >>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> As seen in the above table, the events have a unique id and as
>> we
>> >>>>>>>>> get an update to a particular event, we are storing the
>> date_time at which
>> >>>>>>>>> update has happened and also storing the partial updated
>> values. Apart from
>> >>>>>>>>> the updated values, the rest are stored as NULL values.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> We are planning to mimic inplace updates on the table, so that
>> it
>> >>>>>>>>> would retrieve the resulting table as follows using the query
>> below: We
>> >>>>>>>>> don't delete the data.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> > SELECT id, current_val(A,date_time) as A,
>> >>>>>>>>> > current_val(B,date_time) as B, current_val(C,date_time) as C
>> from e_update
>> >>>>>>>>> > GROUP BY ID;
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> where, current_val is a custom impala UDA we are planning to
>> >>>>>>>>> implement. i.e. get latest non null value for the column.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> ID (Int) A (Int) B (String) C (String)
>> >>>>>>>>> 1 4 Hello Zero
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Implemented current_val UDA:
>> >>>>>>>>> The below code is only for int type inputs:
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> uda-currentval.h
>> >>>>>>>>>
>> >>>>>>>>> //This is a sample for retrieving the current value of e_update
>> >>>>>>>>> table
>> >>>>>>>>> //
>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal&
>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val);
>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal&
>> src,
>> >>>>>>>>> IntVal* dst);
>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const
>> IntVal&
>> >>>>>>>>> val);
>> >>>>>>>>>
>> >>>>>>>>> uda-currentval.cc
>> >>>>>>>>>
>> >>>>>>>>> //
>> >>>>>>>>>
>> -----------------------------------------------------------------------------------------------
>> >>>>>>>>> // This is a sample for retrieving the current value of e_update
>> >>>>>>>>> table
>> >>>>>>>>>
>> >>>>>>>>>
>> //-----------------------------------------------------------------------------------------------
>> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>> >>>>>>>>>       val->is_null = false;
>> >>>>>>>>>       val->val = 0;
>> >>>>>>>>> }
>> >>>>>>>>>
>> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal&
>> >>>>>>>>> input, const TimestampVal& ts, IntVal* val) {
>> >>>>>>>>>       static TimestampVal* tsTemp;
>> >>>>>>>>>       tsTemp->date = 0;
>> >>>>>>>>>       tsTemp->time_of_day = 0;
>> >>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>> >>>>>>>>>         tsTemp->date = ts.date;
>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>> >>>>>>>>>         val->val = input.val;
>> >>>>>>>>>         return;
>> >>>>>>>>>       }
>> >>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day >
>> >>>>>>>>> tsTemp->time_of_day){
>> >>>>>>>>>         tsTemp->date = ts.date;
>> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>> >>>>>>>>>         val->val = input.val;
>> >>>>>>>>>         return;
>> >>>>>>>>>       }
>> >>>>>>>>> }
>> >>>>>>>>>
>> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal&
>> src,
>> >>>>>>>>> IntVal* dst) {
>> >>>>>>>>>      dst->val += src.val;
>> >>>>>>>>> }
>> >>>>>>>>>
>> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const
>> IntVal&
>> >>>>>>>>> val) {
>> >>>>>>>>>      return val;
>> >>>>>>>>> }
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> We are able to build and create an aggregate function in impala,
>> >>>>>>>>> but when trying to run the select query similar to the one
>> above, it is
>> >>>>>>>>> bringing down couple of impala deamons and throwing the error
>> below and
>> >>>>>>>>> getting terminated.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>> >>>>>>>>> hadoop102.**.**.**.com:22000
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> We have impalad running on 14 instances.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Can someone help resolve us this problem and a better way to
>> >>>>>>>>> achieve a solution for the scenario explained.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>
>> >>
>> >
>>
>

Re: Creating Impala UDA

Posted by Ravi Kanth <ra...@gmail.com>.
Ya. I agree with you Thomas. Probably that's what I'm doing wrong.

Unfortunately, as mentioned the version of impala we are using I belive it
doesn't support ignore nulls.

But, my question is would last_value function retrieve a latest not null
value irrespective of using ignore nulls?

Ravi

On Wed, Jun 21, 2017 at 12:23 Matthew Jacobs <mj...@cloudera.com> wrote:

> Ah I think Thomas is right. I read the expected results and the query
> too quickly, so my comment about the asc/desc is probably wrong.
> Clearly my point about analytic functions being tricky holds true :)
>
> On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall
> <tm...@cloudera.com> wrote:
> >
> >
> > On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth <ra...@gmail.com>
> wrote:
> >>
> >> Thomas,
> >>
> >> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I see
> >> ignore nulls has been added in Impala 2.7.0. And, does adding ignore
> nulls
> >> would make a big difference in the expected result?
> >
> >
> > That's too bad. I think that 'ignore nulls' would give you what you want
> -
> > the problem with the query that you posted is that it eliminates rows
> that
> > don't match the where clause, so for example the row with "Zero" in it is
> > eliminated because it is filtered out by the "where a is not null",
> whereas
> > "ignore nulls" only affects the values that could be returned by the
> > specific analytic function that the ignore is applied to.
> >
> >>
> >>
> >> Ravi
> >>
> >> On 21 June 2017 at 11:20, Thomas Tauber-Marshall <
> tmarshall@cloudera.com>
> >> wrote:
> >>>
> >>> Ravi,
> >>>
> >>> Instead of using the "where ... is not null", have you tried
> >>> 'last_value(... ignore nulls)'
> >>>
> >>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <ra...@gmail.com>
> >>> wrote:
> >>>>
> >>>> Antoni,
> >>>>
> >>>> The problem in using last_value function() as far as I observed is,
> if I
> >>>> use it on multiple columns in a single query, its not retrieving
> results as
> >>>> expected.
> >>>>
> >>>>  Input:
> >>>>
> >>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String)
> >>>> 101NULLNULL
> >>>> 112HiNULL
> >>>> 134HelloHi
> >>>> 125NULLNULL
> >>>> 14NULLNULLZero
> >>>>
> >>>> Expected Output:
> >>>>
> >>>>
> >>>>
> >>>> ID (Int)A (Int)B (String)C (String)
> >>>> 14HelloZero
> >>>>
> >>>>
> >>>> Query executed:
> >>>>
> >>>> select id, last_value(a) over(partition by id order by date_time desc)
> >>>> as a, last_value(b) over(partition by id order by date_time desc) as
> b,
> >>>> last_value(c) over(partition by id order by date_time desc) as c from
> >>>> udf_test where a is not null and b is not null and c is not null;
> >>>>
> >>>>
> >>>>
> >>>> Output I am getting:
> >>>>
> >>>> +----+---+-------+----+
> >>>>
> >>>> | id | a | b     | c  |
> >>>>
> >>>> +----+---+-------+----+
> >>>>
> >>>> | 1  | 4 | Hello | Hi ||
> >>>>
> >>>> +----+---+-------+----+
> >>>>
> >>>>
> >>>> Hopefully, I am clear with the problem above.
> >>>>
> >>>> Thanks,
> >>>> Ravi
> >>>>
> >>>> On 20 June 2017 at 22:05, Ravi Kanth <ra...@gmail.com> wrote:
> >>>>>
> >>>>> Antoni,
> >>>>>
> >>>>> Thanks for the suggestion. Let me have a look at it and hopefully we
> >>>>> can use it in our use case.
> >>>>>
> >>>>> Thanks,
> >>>>> Ravi
> >>>>>
> >>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <ai...@vmware.com>
> wrote:
> >>>>>>
> >>>>>> Hi Ravi,
> >>>>>>
> >>>>>> I am curious why you are not using already existing last_value
> >>>>>> function in Impala to get "latest non null value for the column”
> >>>>>>
> >>>>>> e.g
> >>>>>> last_value(column_a ignore nulls) over(partition by ID  order by
> >>>>>> Date_Time)
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Antoni
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <tarmstrong@cloudera.com
> >
> >>>>>> wrote:
> >>>>>>
> >>>>>> This was double-posted to
> >>>>>>
> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
> >>>>>> also. I'll continue the discussion here.
> >>>>>>
> >>>>>> > Can we have the flexibility of declaring the variable globally in
> >>>>>> > UDF? Globally, I mean outside the function?
> >>>>>>
> >>>>>> > And, the reason I am declaring a static variable is to restore the
> >>>>>> > value of timestamp for every record so that I can perform a
> comparison of
> >>>>>> > the timestamps. Is there an alternative approach for this?
> >>>>>>
> >>>>>> Updating a global or static variable in a UDAF is guaranteed not to
> do
> >>>>>> what you expect - the function can be invoked concurrently by
> multiple
> >>>>>> threads.
> >>>>>>
> >>>>>> It seems like you probably want to store some additional state in
> the
> >>>>>> intermediate value. There are some sample UDAs here (see Avg())
> where
> >>>>>> additional intermediate state is stored in a StringVal:
> >>>>>>
> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <
> ravikanth.4b0@gmail.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> Thanks Bharath. Can you check if the logic I am implementing is
> >>>>>>> correct or needed any modification in it as well? I am very new to
> Impala
> >>>>>>> UDF & C++ and having some hard time figuring out the problems.
> >>>>>>>
> >>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada
> >>>>>>> <bh...@cloudera.com> wrote:
> >>>>>>>>
> >>>>>>>> You need to allocate memory for tsTemp, else it can segfault. That
> >>>>>>>> could be the issue here.
> >>>>>>>>
> >>>>>>>>  static TimestampVal* tsTemp;
> >>>>>>>>       tsTemp->date = 0;
> >>>>>>>>       tsTemp->time_of_day = 0;
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth
> >>>>>>>> <ra...@gmail.com> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi All,
> >>>>>>>>> We are using Impala to do various processings in our systems. We
> >>>>>>>>> have a requirement recently, wherein we have to handle the
> updates on the
> >>>>>>>>> events i.e, we have an 'e_update' table which has the partial
> updates
> >>>>>>>>> received for various events. The fields that are not updated are
> being
> >>>>>>>>> stored as NULL values.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Ex:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
> >>>>>>>>> 1 0 1 NULL NULL
> >>>>>>>>> 1 1 2 Hi NULL
> >>>>>>>>> 1 3 4 Hello Hi
> >>>>>>>>> 1 2 5 NULL NULL
> >>>>>>>>> 1 4 NULL NULL Zero
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> P.S: Please consider Date_time as valid timestamp type values.
> For
> >>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> As seen in the above table, the events have a unique id and as we
> >>>>>>>>> get an update to a particular event, we are storing the
> date_time at which
> >>>>>>>>> update has happened and also storing the partial updated values.
> Apart from
> >>>>>>>>> the updated values, the rest are stored as NULL values.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> We are planning to mimic inplace updates on the table, so that it
> >>>>>>>>> would retrieve the resulting table as follows using the query
> below: We
> >>>>>>>>> don't delete the data.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> > SELECT id, current_val(A,date_time) as A,
> >>>>>>>>> > current_val(B,date_time) as B, current_val(C,date_time) as C
> from e_update
> >>>>>>>>> > GROUP BY ID;
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> where, current_val is a custom impala UDA we are planning to
> >>>>>>>>> implement. i.e. get latest non null value for the column.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> ID (Int) A (Int) B (String) C (String)
> >>>>>>>>> 1 4 Hello Zero
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Implemented current_val UDA:
> >>>>>>>>> The below code is only for int type inputs:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> uda-currentval.h
> >>>>>>>>>
> >>>>>>>>> //This is a sample for retrieving the current value of e_update
> >>>>>>>>> table
> >>>>>>>>> //
> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal&
> >>>>>>>>> input, const TimestampVal& ts, IntVal* val);
> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal&
> src,
> >>>>>>>>> IntVal* dst);
> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const
> IntVal&
> >>>>>>>>> val);
> >>>>>>>>>
> >>>>>>>>> uda-currentval.cc
> >>>>>>>>>
> >>>>>>>>> //
> >>>>>>>>>
> -----------------------------------------------------------------------------------------------
> >>>>>>>>> // This is a sample for retrieving the current value of e_update
> >>>>>>>>> table
> >>>>>>>>>
> >>>>>>>>>
> //-----------------------------------------------------------------------------------------------
> >>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
> >>>>>>>>>       val->is_null = false;
> >>>>>>>>>       val->val = 0;
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal&
> >>>>>>>>> input, const TimestampVal& ts, IntVal* val) {
> >>>>>>>>>       static TimestampVal* tsTemp;
> >>>>>>>>>       tsTemp->date = 0;
> >>>>>>>>>       tsTemp->time_of_day = 0;
> >>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
> >>>>>>>>>         tsTemp->date = ts.date;
> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
> >>>>>>>>>         val->val = input.val;
> >>>>>>>>>         return;
> >>>>>>>>>       }
> >>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day >
> >>>>>>>>> tsTemp->time_of_day){
> >>>>>>>>>         tsTemp->date = ts.date;
> >>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
> >>>>>>>>>         val->val = input.val;
> >>>>>>>>>         return;
> >>>>>>>>>       }
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal&
> src,
> >>>>>>>>> IntVal* dst) {
> >>>>>>>>>      dst->val += src.val;
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const
> IntVal&
> >>>>>>>>> val) {
> >>>>>>>>>      return val;
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> We are able to build and create an aggregate function in impala,
> >>>>>>>>> but when trying to run the select query similar to the one
> above, it is
> >>>>>>>>> bringing down couple of impala deamons and throwing the error
> below and
> >>>>>>>>> getting terminated.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
> >>>>>>>>> hadoop102.**.**.**.com:22000
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> We have impalad running on 14 instances.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Can someone help resolve us this problem and a better way to
> >>>>>>>>> achieve a solution for the scenario explained.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>
> >
>

Re: Creating Impala UDA

Posted by Matthew Jacobs <mj...@cloudera.com>.
Ah I think Thomas is right. I read the expected results and the query
too quickly, so my comment about the asc/desc is probably wrong.
Clearly my point about analytic functions being tricky holds true :)

On Wed, Jun 21, 2017 at 12:12 PM, Thomas Tauber-Marshall
<tm...@cloudera.com> wrote:
>
>
> On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth <ra...@gmail.com> wrote:
>>
>> Thomas,
>>
>> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I see
>> ignore nulls has been added in Impala 2.7.0. And, does adding ignore nulls
>> would make a big difference in the expected result?
>
>
> That's too bad. I think that 'ignore nulls' would give you what you want -
> the problem with the query that you posted is that it eliminates rows that
> don't match the where clause, so for example the row with "Zero" in it is
> eliminated because it is filtered out by the "where a is not null", whereas
> "ignore nulls" only affects the values that could be returned by the
> specific analytic function that the ignore is applied to.
>
>>
>>
>> Ravi
>>
>> On 21 June 2017 at 11:20, Thomas Tauber-Marshall <tm...@cloudera.com>
>> wrote:
>>>
>>> Ravi,
>>>
>>> Instead of using the "where ... is not null", have you tried
>>> 'last_value(... ignore nulls)'
>>>
>>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <ra...@gmail.com>
>>> wrote:
>>>>
>>>> Antoni,
>>>>
>>>> The problem in using last_value function() as far as I observed is, if I
>>>> use it on multiple columns in a single query, its not retrieving results as
>>>> expected.
>>>>
>>>>  Input:
>>>>
>>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String)
>>>> 101NULLNULL
>>>> 112HiNULL
>>>> 134HelloHi
>>>> 125NULLNULL
>>>> 14NULLNULLZero
>>>>
>>>> Expected Output:
>>>>
>>>>
>>>>
>>>> ID (Int)A (Int)B (String)C (String)
>>>> 14HelloZero
>>>>
>>>>
>>>> Query executed:
>>>>
>>>> select id, last_value(a) over(partition by id order by date_time desc)
>>>> as a, last_value(b) over(partition by id order by date_time desc) as b,
>>>> last_value(c) over(partition by id order by date_time desc) as c from
>>>> udf_test where a is not null and b is not null and c is not null;
>>>>
>>>>
>>>>
>>>> Output I am getting:
>>>>
>>>> +----+---+-------+----+
>>>>
>>>> | id | a | b     | c  |
>>>>
>>>> +----+---+-------+----+
>>>>
>>>> | 1  | 4 | Hello | Hi ||
>>>>
>>>> +----+---+-------+----+
>>>>
>>>>
>>>> Hopefully, I am clear with the problem above.
>>>>
>>>> Thanks,
>>>> Ravi
>>>>
>>>> On 20 June 2017 at 22:05, Ravi Kanth <ra...@gmail.com> wrote:
>>>>>
>>>>> Antoni,
>>>>>
>>>>> Thanks for the suggestion. Let me have a look at it and hopefully we
>>>>> can use it in our use case.
>>>>>
>>>>> Thanks,
>>>>> Ravi
>>>>>
>>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <ai...@vmware.com> wrote:
>>>>>>
>>>>>> Hi Ravi,
>>>>>>
>>>>>> I am curious why you are not using already existing last_value
>>>>>> function in Impala to get "latest non null value for the column”
>>>>>>
>>>>>> e.g
>>>>>> last_value(column_a ignore nulls) over(partition by ID  order by
>>>>>> Date_Time)
>>>>>>
>>>>>> Thanks,
>>>>>> Antoni
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <ta...@cloudera.com>
>>>>>> wrote:
>>>>>>
>>>>>> This was double-posted to
>>>>>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
>>>>>> also. I'll continue the discussion here.
>>>>>>
>>>>>> > Can we have the flexibility of declaring the variable globally in
>>>>>> > UDF? Globally, I mean outside the function?
>>>>>>
>>>>>> > And, the reason I am declaring a static variable is to restore the
>>>>>> > value of timestamp for every record so that I can perform a comparison of
>>>>>> > the timestamps. Is there an alternative approach for this?
>>>>>>
>>>>>> Updating a global or static variable in a UDAF is guaranteed not to do
>>>>>> what you expect - the function can be invoked concurrently by multiple
>>>>>> threads.
>>>>>>
>>>>>> It seems like you probably want to store some additional state in the
>>>>>> intermediate value. There are some sample UDAs here (see Avg()) where
>>>>>> additional intermediate state is stored in a StringVal:
>>>>>> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <ra...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>> Thanks Bharath. Can you check if the logic I am implementing is
>>>>>>> correct or needed any modification in it as well? I am very new to Impala
>>>>>>> UDF & C++ and having some hard time figuring out the problems.
>>>>>>>
>>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada
>>>>>>> <bh...@cloudera.com> wrote:
>>>>>>>>
>>>>>>>> You need to allocate memory for tsTemp, else it can segfault. That
>>>>>>>> could be the issue here.
>>>>>>>>
>>>>>>>>  static TimestampVal* tsTemp;
>>>>>>>>       tsTemp->date = 0;
>>>>>>>>       tsTemp->time_of_day = 0;
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth
>>>>>>>> <ra...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>> We are using Impala to do various processings in our systems. We
>>>>>>>>> have a requirement recently, wherein we have to handle the updates on the
>>>>>>>>> events i.e, we have an 'e_update' table which has the partial updates
>>>>>>>>> received for various events. The fields that are not updated are being
>>>>>>>>> stored as NULL values.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Ex:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>>>>>>>> 1 0 1 NULL NULL
>>>>>>>>> 1 1 2 Hi NULL
>>>>>>>>> 1 3 4 Hello Hi
>>>>>>>>> 1 2 5 NULL NULL
>>>>>>>>> 1 4 NULL NULL Zero
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> P.S: Please consider Date_time as valid timestamp type values. For
>>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> As seen in the above table, the events have a unique id and as we
>>>>>>>>> get an update to a particular event, we are storing the date_time at which
>>>>>>>>> update has happened and also storing the partial updated values. Apart from
>>>>>>>>> the updated values, the rest are stored as NULL values.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> We are planning to mimic inplace updates on the table, so that it
>>>>>>>>> would retrieve the resulting table as follows using the query below: We
>>>>>>>>> don't delete the data.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> > SELECT id, current_val(A,date_time) as A,
>>>>>>>>> > current_val(B,date_time) as B, current_val(C,date_time) as C from e_update
>>>>>>>>> > GROUP BY ID;
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> where, current_val is a custom impala UDA we are planning to
>>>>>>>>> implement. i.e. get latest non null value for the column.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ID (Int) A (Int) B (String) C (String)
>>>>>>>>> 1 4 Hello Zero
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Implemented current_val UDA:
>>>>>>>>> The below code is only for int type inputs:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> uda-currentval.h
>>>>>>>>>
>>>>>>>>> //This is a sample for retrieving the current value of e_update
>>>>>>>>> table
>>>>>>>>> //
>>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal&
>>>>>>>>> input, const TimestampVal& ts, IntVal* val);
>>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src,
>>>>>>>>> IntVal* dst);
>>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal&
>>>>>>>>> val);
>>>>>>>>>
>>>>>>>>> uda-currentval.cc
>>>>>>>>>
>>>>>>>>> //
>>>>>>>>> -----------------------------------------------------------------------------------------------
>>>>>>>>> // This is a sample for retrieving the current value of e_update
>>>>>>>>> table
>>>>>>>>>
>>>>>>>>> //-----------------------------------------------------------------------------------------------
>>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>>>>>>>>       val->is_null = false;
>>>>>>>>>       val->val = 0;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal&
>>>>>>>>> input, const TimestampVal& ts, IntVal* val) {
>>>>>>>>>       static TimestampVal* tsTemp;
>>>>>>>>>       tsTemp->date = 0;
>>>>>>>>>       tsTemp->time_of_day = 0;
>>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>>>>>>>>         tsTemp->date = ts.date;
>>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>>>>         val->val = input.val;
>>>>>>>>>         return;
>>>>>>>>>       }
>>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day >
>>>>>>>>> tsTemp->time_of_day){
>>>>>>>>>         tsTemp->date = ts.date;
>>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>>>>         val->val = input.val;
>>>>>>>>>         return;
>>>>>>>>>       }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src,
>>>>>>>>> IntVal* dst) {
>>>>>>>>>      dst->val += src.val;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal&
>>>>>>>>> val) {
>>>>>>>>>      return val;
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> We are able to build and create an aggregate function in impala,
>>>>>>>>> but when trying to run the select query similar to the one above, it is
>>>>>>>>> bringing down couple of impala deamons and throwing the error below and
>>>>>>>>> getting terminated.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>>>>>>>> hadoop102.**.**.**.com:22000
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> We have impalad running on 14 instances.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Can someone help resolve us this problem and a better way to
>>>>>>>>> achieve a solution for the scenario explained.
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>
>

Re: Creating Impala UDA

Posted by Thomas Tauber-Marshall <tm...@cloudera.com>.
On Wed, Jun 21, 2017 at 1:52 PM Ravi Kanth <ra...@gmail.com> wrote:

> Thomas,
>
> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I see
> ignore nulls has been added in Impala 2.7.0. And, does adding ignore nulls
> would make a big difference in the expected result?
>

That's too bad. I think that 'ignore nulls' would give you what you want -
the problem with the query that you posted is that it eliminates rows that
don't match the where clause, so for example the row with "Zero" in it is
eliminated because it is filtered out by the "where a is not null", whereas
"ignore nulls" only affects the values that could be returned by the
specific analytic function that the ignore is applied to.


>
> Ravi
>
> On 21 June 2017 at 11:20, Thomas Tauber-Marshall <tm...@cloudera.com>
> wrote:
>
>> Ravi,
>>
>> Instead of using the "where ... is not null", have you tried
>> 'last_value(... ignore nulls)'
>>
>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <ra...@gmail.com>
>> wrote:
>>
>>> Antoni,
>>>
>>> The problem in using last_value function() as far as I observed is, if I
>>> use it on multiple columns in a single query, its not retrieving results as
>>> expected.
>>>
>>>  Input:
>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>> 1 0 1 NULL NULL
>>> 1 1 2 Hi NULL
>>> 1 3 4 Hello Hi
>>> 1 2 5 NULL NULL
>>> 1 4 NULL NULL Zero
>>>
>>> Expected Output:
>>>
>>>
>>> ID (Int) A (Int) B (String) C (String)
>>> 1 4 Hello Zero
>>>
>>>
>>> Query executed:
>>>
>>> select id, last_value(a) over(partition by id order by date_time desc)
>>> as a, last_value(b) over(partition by id order by date_time desc) as b,
>>> last_value(c) over(partition by id order by date_time desc) as c from
>>> udf_test where a is not null and b is not null and c is not null;
>>>
>>>
>>> Output I am getting:
>>>
>>> +----+---+-------+----+
>>>
>>> | id | a | b     | c  |
>>>
>>> +----+---+-------+----+
>>>
>>> | 1  | 4 | Hello | Hi ||
>>>
>>> +----+---+-------+----+
>>>
>>> Hopefully, I am clear with the problem above.
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On 20 June 2017 at 22:05, Ravi Kanth <ra...@gmail.com> wrote:
>>>
>>>> Antoni,
>>>>
>>>> Thanks for the suggestion. Let me have a look at it and hopefully we
>>>> can use it in our use case.
>>>>
>>>> Thanks,
>>>> Ravi
>>>>
>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <ai...@vmware.com> wrote:
>>>>
>>>>> Hi Ravi,
>>>>>
>>>>> I am curious why you are not using already existing last_value
>>>>> function in Impala to get "latest non null value for the column”
>>>>>
>>>>> e.g
>>>>> last_value(column_a ignore nulls) over(partition by ID  order by
>>>>> Date_Time)
>>>>>
>>>>> Thanks,
>>>>> Antoni
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <ta...@cloudera.com>
>>>>> wrote:
>>>>>
>>>>> This was double-posted to
>>>>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
>>>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__community.cloudera.com_t5_Interactive-2DShort-2Dcycle-2DSQL_Creating-2DImpala-2DUDA_m-2Dp_56201_highlight_false-23M3073&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=ySJ0TF-4OH9kZJebUfH8uNEFpwKylGthB9pmsRSfXJc&e=>
>>>>> also. I'll continue the discussion here.
>>>>>
>>>>> > Can we have the flexibility of declaring the variable globally in
>>>>> UDF? Globally, I mean outside the function?
>>>>>
>>>>> > And, the reason I am declaring a static variable is to restore the
>>>>> value of timestamp for every record so that I can perform a comparison of
>>>>> the timestamps. Is there an alternative approach for this?
>>>>>
>>>>> Updating a global or static variable in a UDAF is guaranteed not to do
>>>>> what you expect - the function can be invoked concurrently by multiple
>>>>> threads.
>>>>>
>>>>> It seems like you probably want to store some additional state in the
>>>>> intermediate value. There are some sample UDAs here (see Avg()) where
>>>>> additional intermediate state is stored in a StringVal:
>>>>> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_cloudera_impala-2Dudf-2Dsamples_blob_master_uda-2Dsample.cc-23L61&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=5NvnQzufO43--_qRtdwsm7ukEoVM2Z3bwVv5R9HBM1Q&e=>
>>>>>
>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <ra...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Bharath. Can you check if the logic I am implementing is
>>>>>> correct or needed any modification in it as well? I am very new to Impala
>>>>>> UDF & C++ and having some hard time figuring out the problems.
>>>>>>
>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada <bharathv@cloudera.com
>>>>>> > wrote:
>>>>>>
>>>>>>> You need to allocate memory for tsTemp, else it can segfault. That
>>>>>>> could be the issue here.
>>>>>>>
>>>>>>>  static TimestampVal* tsTemp;
>>>>>>>       tsTemp->date = 0;
>>>>>>>       tsTemp->time_of_day = 0;
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <ravikanth.4b0@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>> We are using Impala to do various processings in our systems. We
>>>>>>>> have a requirement recently, wherein we have to handle the updates on the
>>>>>>>> events i.e, we have an 'e_update' table which has the partial updates
>>>>>>>> received for various events. The fields that are not updated are being
>>>>>>>> stored as NULL values.
>>>>>>>>
>>>>>>>>
>>>>>>>> Ex:
>>>>>>>>
>>>>>>>>
>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>>>>>>> 1 0 1 NULL NULL
>>>>>>>> 1 1 2 Hi NULL
>>>>>>>> 1 3 4 Hello Hi
>>>>>>>> 1 2 5 NULL NULL
>>>>>>>> 1 4 NULL NULL Zero
>>>>>>>>
>>>>>>>>
>>>>>>>> P.S: Please consider Date_time as valid timestamp type values. For
>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>>>>>>>>
>>>>>>>>
>>>>>>>> As seen in the above table, the events have a unique id and as we
>>>>>>>> get an update to a particular event, we are storing the date_time at which
>>>>>>>> update has happened and also storing the partial updated values. Apart from
>>>>>>>> the updated values, the rest are stored as NULL values.
>>>>>>>>
>>>>>>>>
>>>>>>>> We are planning to mimic inplace updates on the table, so that it
>>>>>>>> would retrieve the resulting table as follows using the query below: We
>>>>>>>> don't delete the data.
>>>>>>>>
>>>>>>>>
>>>>>>>> > SELECT id, current_val(A,date_time) as A,
>>>>>>>> current_val(B,date_time) as B, current_val(C,date_time) as C from e_update
>>>>>>>> GROUP BY ID;
>>>>>>>>
>>>>>>>>
>>>>>>>> where, current_val is a custom impala UDA we are planning to
>>>>>>>> implement. i.e. get* latest non null value for the column.*
>>>>>>>>
>>>>>>>>
>>>>>>>> ID (Int) A (Int) B (String) C (String)
>>>>>>>> 1 4 Hello Zero
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Implemented current_val UDA:
>>>>>>>> The below code is only for int type inputs:
>>>>>>>>
>>>>>>>>
>>>>>>>> uda-currentval.h
>>>>>>>>
>>>>>>>> //This is a sample for retrieving the current value of e_update table
>>>>>>>> //
>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val);
>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst);
>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val);
>>>>>>>>
>>>>>>>> uda-currentval.cc
>>>>>>>>
>>>>>>>> // -----------------------------------------------------------------------------------------------
>>>>>>>> // This is a sample for retrieving the current value of e_update table
>>>>>>>> //-----------------------------------------------------------------------------------------------
>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>>>>>>>       val->is_null = false;
>>>>>>>>       val->val = 0;
>>>>>>>> }
>>>>>>>>
>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val) {
>>>>>>>>       static TimestampVal* tsTemp;
>>>>>>>>       tsTemp->date = 0;
>>>>>>>>       tsTemp->time_of_day = 0;
>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>>>>>>>         tsTemp->date = ts.date;
>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>>>         val->val = input.val;
>>>>>>>>         return;
>>>>>>>>       }
>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){
>>>>>>>>         tsTemp->date = ts.date;
>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>>>         val->val = input.val;
>>>>>>>>         return;
>>>>>>>>       }
>>>>>>>> }
>>>>>>>>
>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst) {
>>>>>>>>      dst->val += src.val;
>>>>>>>> }
>>>>>>>>
>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) {
>>>>>>>>      return val;
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> We are able to build and create an aggregate function in impala,
>>>>>>>> but when trying to run the select query similar to the one above, it is
>>>>>>>> bringing down couple of impala deamons and throwing the error below and
>>>>>>>> getting terminated.
>>>>>>>>
>>>>>>>>
>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>>>>>>> hadoop102.**.**.**.com:22000
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> We have impalad running on 14 instances.
>>>>>>>>
>>>>>>>>
>>>>>>>> Can someone help resolve us this problem and a better way to
>>>>>>>> achieve a solution for the scenario explained.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>
>

Re: Creating Impala UDA

Posted by Matthew Jacobs <mj...@cloudera.com>.
I think you may need to look at the 'order by' clause. Are you sure
you want 'order by date_time desc'? It looks to me like 'order by
date_time asc' would give you the 'Zero' you want.

Analytic functions are very powerful but can be tricky to reason
about. Make sure you spend some time looking at the docs and examples:
https://www.cloudera.com/documentation/enterprise/latest/topics/impala_analytic_functions.html#last_value

-Matt

On Wed, Jun 21, 2017 at 11:52 AM, Ravi Kanth <ra...@gmail.com> wrote:
> Thomas,
>
> The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I see ignore
> nulls has been added in Impala 2.7.0. And, does adding ignore nulls would
> make a big difference in the expected result?
>
> Ravi
>
> On 21 June 2017 at 11:20, Thomas Tauber-Marshall <tm...@cloudera.com>
> wrote:
>>
>> Ravi,
>>
>> Instead of using the "where ... is not null", have you tried
>> 'last_value(... ignore nulls)'
>>
>> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <ra...@gmail.com>
>> wrote:
>>>
>>> Antoni,
>>>
>>> The problem in using last_value function() as far as I observed is, if I
>>> use it on multiple columns in a single query, its not retrieving results as
>>> expected.
>>>
>>>  Input:
>>>
>>> ID (Int)Date_Time (timestamp)A (Int)B (String)C (String)
>>> 101NULLNULL
>>> 112HiNULL
>>> 134HelloHi
>>> 125NULLNULL
>>> 14NULLNULLZero
>>>
>>> Expected Output:
>>>
>>>
>>>
>>> ID (Int)A (Int)B (String)C (String)
>>> 14HelloZero
>>>
>>>
>>> Query executed:
>>>
>>> select id, last_value(a) over(partition by id order by date_time desc) as
>>> a, last_value(b) over(partition by id order by date_time desc) as b,
>>> last_value(c) over(partition by id order by date_time desc) as c from
>>> udf_test where a is not null and b is not null and c is not null;
>>>
>>>
>>>
>>> Output I am getting:
>>>
>>> +----+---+-------+----+
>>>
>>> | id | a | b     | c  |
>>>
>>> +----+---+-------+----+
>>>
>>> | 1  | 4 | Hello | Hi ||
>>>
>>> +----+---+-------+----+
>>>
>>>
>>> Hopefully, I am clear with the problem above.
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On 20 June 2017 at 22:05, Ravi Kanth <ra...@gmail.com> wrote:
>>>>
>>>> Antoni,
>>>>
>>>> Thanks for the suggestion. Let me have a look at it and hopefully we can
>>>> use it in our use case.
>>>>
>>>> Thanks,
>>>> Ravi
>>>>
>>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <ai...@vmware.com> wrote:
>>>>>
>>>>> Hi Ravi,
>>>>>
>>>>> I am curious why you are not using already existing last_value function
>>>>> in Impala to get "latest non null value for the column”
>>>>>
>>>>> e.g
>>>>> last_value(column_a ignore nulls) over(partition by ID  order by
>>>>> Date_Time)
>>>>>
>>>>> Thanks,
>>>>> Antoni
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <ta...@cloudera.com>
>>>>> wrote:
>>>>>
>>>>> This was double-posted to
>>>>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
>>>>> also. I'll continue the discussion here.
>>>>>
>>>>> > Can we have the flexibility of declaring the variable globally in
>>>>> > UDF? Globally, I mean outside the function?
>>>>>
>>>>> > And, the reason I am declaring a static variable is to restore the
>>>>> > value of timestamp for every record so that I can perform a comparison of
>>>>> > the timestamps. Is there an alternative approach for this?
>>>>>
>>>>> Updating a global or static variable in a UDAF is guaranteed not to do
>>>>> what you expect - the function can be invoked concurrently by multiple
>>>>> threads.
>>>>>
>>>>> It seems like you probably want to store some additional state in the
>>>>> intermediate value. There are some sample UDAs here (see Avg()) where
>>>>> additional intermediate state is stored in a StringVal:
>>>>> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
>>>>>
>>>>>
>>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <ra...@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>> Thanks Bharath. Can you check if the logic I am implementing is
>>>>>> correct or needed any modification in it as well? I am very new to Impala
>>>>>> UDF & C++ and having some hard time figuring out the problems.
>>>>>>
>>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada <bh...@cloudera.com>
>>>>>> wrote:
>>>>>>>
>>>>>>> You need to allocate memory for tsTemp, else it can segfault. That
>>>>>>> could be the issue here.
>>>>>>>
>>>>>>>  static TimestampVal* tsTemp;
>>>>>>>       tsTemp->date = 0;
>>>>>>>       tsTemp->time_of_day = 0;
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <ra...@gmail.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi All,
>>>>>>>> We are using Impala to do various processings in our systems. We
>>>>>>>> have a requirement recently, wherein we have to handle the updates on the
>>>>>>>> events i.e, we have an 'e_update' table which has the partial updates
>>>>>>>> received for various events. The fields that are not updated are being
>>>>>>>> stored as NULL values.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Ex:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>>>>>>> 1 0 1 NULL NULL
>>>>>>>> 1 1 2 Hi NULL
>>>>>>>> 1 3 4 Hello Hi
>>>>>>>> 1 2 5 NULL NULL
>>>>>>>> 1 4 NULL NULL Zero
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> P.S: Please consider Date_time as valid timestamp type values. For
>>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> As seen in the above table, the events have a unique id and as we
>>>>>>>> get an update to a particular event, we are storing the date_time at which
>>>>>>>> update has happened and also storing the partial updated values. Apart from
>>>>>>>> the updated values, the rest are stored as NULL values.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> We are planning to mimic inplace updates on the table, so that it
>>>>>>>> would retrieve the resulting table as follows using the query below: We
>>>>>>>> don't delete the data.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time)
>>>>>>>> > as B, current_val(C,date_time) as C from e_update GROUP BY ID;
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> where, current_val is a custom impala UDA we are planning to
>>>>>>>> implement. i.e. get latest non null value for the column.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> ID (Int) A (Int) B (String) C (String)
>>>>>>>> 1 4 Hello Zero
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Implemented current_val UDA:
>>>>>>>> The below code is only for int type inputs:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> uda-currentval.h
>>>>>>>>
>>>>>>>> //This is a sample for retrieving the current value of e_update
>>>>>>>> table
>>>>>>>> //
>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal&
>>>>>>>> input, const TimestampVal& ts, IntVal* val);
>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src,
>>>>>>>> IntVal* dst);
>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal&
>>>>>>>> val);
>>>>>>>>
>>>>>>>> uda-currentval.cc
>>>>>>>>
>>>>>>>> //
>>>>>>>> -----------------------------------------------------------------------------------------------
>>>>>>>> // This is a sample for retrieving the current value of e_update
>>>>>>>> table
>>>>>>>>
>>>>>>>> //-----------------------------------------------------------------------------------------------
>>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>>>>>>>       val->is_null = false;
>>>>>>>>       val->val = 0;
>>>>>>>> }
>>>>>>>>
>>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal&
>>>>>>>> input, const TimestampVal& ts, IntVal* val) {
>>>>>>>>       static TimestampVal* tsTemp;
>>>>>>>>       tsTemp->date = 0;
>>>>>>>>       tsTemp->time_of_day = 0;
>>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>>>>>>>         tsTemp->date = ts.date;
>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>>>         val->val = input.val;
>>>>>>>>         return;
>>>>>>>>       }
>>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day >
>>>>>>>> tsTemp->time_of_day){
>>>>>>>>         tsTemp->date = ts.date;
>>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>>>         val->val = input.val;
>>>>>>>>         return;
>>>>>>>>       }
>>>>>>>> }
>>>>>>>>
>>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src,
>>>>>>>> IntVal* dst) {
>>>>>>>>      dst->val += src.val;
>>>>>>>> }
>>>>>>>>
>>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal&
>>>>>>>> val) {
>>>>>>>>      return val;
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> We are able to build and create an aggregate function in impala, but
>>>>>>>> when trying to run the select query similar to the one above, it is bringing
>>>>>>>> down couple of impala deamons and throwing the error below and getting
>>>>>>>> terminated.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>>>>>>> hadoop102.**.**.**.com:22000
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> We have impalad running on 14 instances.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Can someone help resolve us this problem and a better way to achieve
>>>>>>>> a solution for the scenario explained.
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>
>

Re: Creating Impala UDA

Posted by Ravi Kanth <ra...@gmail.com>.
Thomas,

The version of Impala we are using is 2.5.0 with CDH 5.7.0 and I see ignore
nulls has been added in Impala 2.7.0. And, does adding ignore nulls would
make a big difference in the expected result?

Ravi

On 21 June 2017 at 11:20, Thomas Tauber-Marshall <tm...@cloudera.com>
wrote:

> Ravi,
>
> Instead of using the "where ... is not null", have you tried
> 'last_value(... ignore nulls)'
>
> On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <ra...@gmail.com>
> wrote:
>
>> Antoni,
>>
>> The problem in using last_value function() as far as I observed is, if I
>> use it on multiple columns in a single query, its not retrieving results as
>> expected.
>>
>>  Input:
>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>> 1 0 1 NULL NULL
>> 1 1 2 Hi NULL
>> 1 3 4 Hello Hi
>> 1 2 5 NULL NULL
>> 1 4 NULL NULL Zero
>>
>> Expected Output:
>>
>>
>> ID (Int) A (Int) B (String) C (String)
>> 1 4 Hello Zero
>>
>>
>> Query executed:
>>
>> select id, last_value(a) over(partition by id order by date_time desc) as
>> a, last_value(b) over(partition by id order by date_time desc) as b,
>> last_value(c) over(partition by id order by date_time desc) as c from
>> udf_test where a is not null and b is not null and c is not null;
>>
>>
>> Output I am getting:
>>
>> +----+---+-------+----+
>>
>> | id | a | b     | c  |
>>
>> +----+---+-------+----+
>>
>> | 1  | 4 | Hello | Hi ||
>>
>> +----+---+-------+----+
>>
>> Hopefully, I am clear with the problem above.
>>
>> Thanks,
>> Ravi
>>
>> On 20 June 2017 at 22:05, Ravi Kanth <ra...@gmail.com> wrote:
>>
>>> Antoni,
>>>
>>> Thanks for the suggestion. Let me have a look at it and hopefully we can
>>> use it in our use case.
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <ai...@vmware.com> wrote:
>>>
>>>> Hi Ravi,
>>>>
>>>> I am curious why you are not using already existing last_value function
>>>> in Impala to get "latest non null value for the column”
>>>>
>>>> e.g
>>>> last_value(column_a ignore nulls) over(partition by ID  order by
>>>> Date_Time)
>>>>
>>>> Thanks,
>>>> Antoni
>>>>
>>>>
>>>>
>>>>
>>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <ta...@cloudera.com>
>>>> wrote:
>>>>
>>>> This was double-posted to http://community.cloudera.com/
>>>> t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/
>>>> 56201/highlight/false#M3073
>>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__community.cloudera.com_t5_Interactive-2DShort-2Dcycle-2DSQL_Creating-2DImpala-2DUDA_m-2Dp_56201_highlight_false-23M3073&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=ySJ0TF-4OH9kZJebUfH8uNEFpwKylGthB9pmsRSfXJc&e=>
>>>> also. I'll continue the discussion here.
>>>>
>>>> > Can we have the flexibility of declaring the variable globally in
>>>> UDF? Globally, I mean outside the function?
>>>>
>>>> > And, the reason I am declaring a static variable is to restore the
>>>> value of timestamp for every record so that I can perform a comparison of
>>>> the timestamps. Is there an alternative approach for this?
>>>>
>>>> Updating a global or static variable in a UDAF is guaranteed not to do
>>>> what you expect - the function can be invoked concurrently by multiple
>>>> threads.
>>>>
>>>> It seems like you probably want to store some additional state in the
>>>> intermediate value. There are some sample UDAs here (see Avg()) where
>>>> additional intermediate state is stored in a StringVal:
>>>> https://github.com/cloudera/impala-udf-samples/blob/
>>>> master/uda-sample.cc#L61
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_cloudera_impala-2Dudf-2Dsamples_blob_master_uda-2Dsample.cc-23L61&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=5NvnQzufO43--_qRtdwsm7ukEoVM2Z3bwVv5R9HBM1Q&e=>
>>>>
>>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <ra...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Bharath. Can you check if the logic I am implementing is
>>>>> correct or needed any modification in it as well? I am very new to Impala
>>>>> UDF & C++ and having some hard time figuring out the problems.
>>>>>
>>>>> On 20 June 2017 at 14:27, Bharath Vissapragada <bh...@cloudera.com>
>>>>> wrote:
>>>>>
>>>>>> You need to allocate memory for tsTemp, else it can segfault. That
>>>>>> could be the issue here.
>>>>>>
>>>>>>  static TimestampVal* tsTemp;
>>>>>>       tsTemp->date = 0;
>>>>>>       tsTemp->time_of_day = 0;
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <ra...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>> We are using Impala to do various processings in our systems. We
>>>>>>> have a requirement recently, wherein we have to handle the updates on the
>>>>>>> events i.e, we have an 'e_update' table which has the partial updates
>>>>>>> received for various events. The fields that are not updated are being
>>>>>>> stored as NULL values.
>>>>>>>
>>>>>>>
>>>>>>> Ex:
>>>>>>>
>>>>>>>
>>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>>>>>> 1 0 1 NULL NULL
>>>>>>> 1 1 2 Hi NULL
>>>>>>> 1 3 4 Hello Hi
>>>>>>> 1 2 5 NULL NULL
>>>>>>> 1 4 NULL NULL Zero
>>>>>>>
>>>>>>>
>>>>>>> P.S: Please consider Date_time as valid timestamp type values. For
>>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>>>>>>>
>>>>>>>
>>>>>>> As seen in the above table, the events have a unique id and as we
>>>>>>> get an update to a particular event, we are storing the date_time at which
>>>>>>> update has happened and also storing the partial updated values. Apart from
>>>>>>> the updated values, the rest are stored as NULL values.
>>>>>>>
>>>>>>>
>>>>>>> We are planning to mimic inplace updates on the table, so that it
>>>>>>> would retrieve the resulting table as follows using the query below: We
>>>>>>> don't delete the data.
>>>>>>>
>>>>>>>
>>>>>>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time)
>>>>>>> as B, current_val(C,date_time) as C from e_update GROUP BY ID;
>>>>>>>
>>>>>>>
>>>>>>> where, current_val is a custom impala UDA we are planning to
>>>>>>> implement. i.e. get* latest non null value for the column.*
>>>>>>>
>>>>>>>
>>>>>>> ID (Int) A (Int) B (String) C (String)
>>>>>>> 1 4 Hello Zero
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Implemented current_val UDA:
>>>>>>> The below code is only for int type inputs:
>>>>>>>
>>>>>>>
>>>>>>> uda-currentval.h
>>>>>>>
>>>>>>> //This is a sample for retrieving the current value of e_update table
>>>>>>> //
>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val);
>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst);
>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val);
>>>>>>>
>>>>>>> uda-currentval.cc
>>>>>>>
>>>>>>> // -----------------------------------------------------------------------------------------------
>>>>>>> // This is a sample for retrieving the current value of e_update table
>>>>>>> //-----------------------------------------------------------------------------------------------
>>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>>>>>>       val->is_null = false;
>>>>>>>       val->val = 0;
>>>>>>> }
>>>>>>>
>>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val) {
>>>>>>>       static TimestampVal* tsTemp;
>>>>>>>       tsTemp->date = 0;
>>>>>>>       tsTemp->time_of_day = 0;
>>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>>>>>>         tsTemp->date = ts.date;
>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>>         val->val = input.val;
>>>>>>>         return;
>>>>>>>       }
>>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){
>>>>>>>         tsTemp->date = ts.date;
>>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>>         val->val = input.val;
>>>>>>>         return;
>>>>>>>       }
>>>>>>> }
>>>>>>>
>>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst) {
>>>>>>>      dst->val += src.val;
>>>>>>> }
>>>>>>>
>>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) {
>>>>>>>      return val;
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> We are able to build and create an aggregate function in impala, but
>>>>>>> when trying to run the select query similar to the one above, it is
>>>>>>> bringing down couple of impala deamons and throwing the error below and
>>>>>>> getting terminated.
>>>>>>>
>>>>>>>
>>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>>>>>> hadoop102.**.**.**.com:22000
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> We have impalad running on 14 instances.
>>>>>>>
>>>>>>>
>>>>>>> Can someone help resolve us this problem and a better way to achieve
>>>>>>> a solution for the scenario explained.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>

Re: Creating Impala UDA

Posted by Thomas Tauber-Marshall <tm...@cloudera.com>.
Ravi,

Instead of using the "where ... is not null", have you tried
'last_value(... ignore nulls)'

On Wed, Jun 21, 2017 at 1:08 PM Ravi Kanth <ra...@gmail.com> wrote:

> Antoni,
>
> The problem in using last_value function() as far as I observed is, if I
> use it on multiple columns in a single query, its not retrieving results as
> expected.
>
>  Input:
> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
> 1 0 1 NULL NULL
> 1 1 2 Hi NULL
> 1 3 4 Hello Hi
> 1 2 5 NULL NULL
> 1 4 NULL NULL Zero
>
> Expected Output:
>
>
> ID (Int) A (Int) B (String) C (String)
> 1 4 Hello Zero
>
>
> Query executed:
>
> select id, last_value(a) over(partition by id order by date_time desc) as
> a, last_value(b) over(partition by id order by date_time desc) as b,
> last_value(c) over(partition by id order by date_time desc) as c from
> udf_test where a is not null and b is not null and c is not null;
>
>
> Output I am getting:
>
> +----+---+-------+----+
>
> | id | a | b     | c  |
>
> +----+---+-------+----+
>
> | 1  | 4 | Hello | Hi ||
>
> +----+---+-------+----+
>
> Hopefully, I am clear with the problem above.
>
> Thanks,
> Ravi
>
> On 20 June 2017 at 22:05, Ravi Kanth <ra...@gmail.com> wrote:
>
>> Antoni,
>>
>> Thanks for the suggestion. Let me have a look at it and hopefully we can
>> use it in our use case.
>>
>> Thanks,
>> Ravi
>>
>> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <ai...@vmware.com> wrote:
>>
>>> Hi Ravi,
>>>
>>> I am curious why you are not using already existing last_value function
>>> in Impala to get "latest non null value for the column”
>>>
>>> e.g
>>> last_value(column_a ignore nulls) over(partition by ID  order by
>>> Date_Time)
>>>
>>> Thanks,
>>> Antoni
>>>
>>>
>>>
>>>
>>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <ta...@cloudera.com>
>>> wrote:
>>>
>>> This was double-posted to
>>> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
>>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__community.cloudera.com_t5_Interactive-2DShort-2Dcycle-2DSQL_Creating-2DImpala-2DUDA_m-2Dp_56201_highlight_false-23M3073&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=ySJ0TF-4OH9kZJebUfH8uNEFpwKylGthB9pmsRSfXJc&e=>
>>> also. I'll continue the discussion here.
>>>
>>> > Can we have the flexibility of declaring the variable globally in UDF?
>>> Globally, I mean outside the function?
>>>
>>> > And, the reason I am declaring a static variable is to restore the
>>> value of timestamp for every record so that I can perform a comparison of
>>> the timestamps. Is there an alternative approach for this?
>>>
>>> Updating a global or static variable in a UDAF is guaranteed not to do
>>> what you expect - the function can be invoked concurrently by multiple
>>> threads.
>>>
>>> It seems like you probably want to store some additional state in the
>>> intermediate value. There are some sample UDAs here (see Avg()) where
>>> additional intermediate state is stored in a StringVal:
>>> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_cloudera_impala-2Dudf-2Dsamples_blob_master_uda-2Dsample.cc-23L61&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=5NvnQzufO43--_qRtdwsm7ukEoVM2Z3bwVv5R9HBM1Q&e=>
>>>
>>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <ra...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Bharath. Can you check if the logic I am implementing is correct
>>>> or needed any modification in it as well? I am very new to Impala UDF & C++
>>>> and having some hard time figuring out the problems.
>>>>
>>>> On 20 June 2017 at 14:27, Bharath Vissapragada <bh...@cloudera.com>
>>>> wrote:
>>>>
>>>>> You need to allocate memory for tsTemp, else it can segfault. That
>>>>> could be the issue here.
>>>>>
>>>>>  static TimestampVal* tsTemp;
>>>>>       tsTemp->date = 0;
>>>>>       tsTemp->time_of_day = 0;
>>>>>
>>>>>
>>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <ra...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>> We are using Impala to do various processings in our systems. We have
>>>>>> a requirement recently, wherein we have to handle the updates on the events
>>>>>> i.e, we have an 'e_update' table which has the partial updates received for
>>>>>> various events. The fields that are not updated are being stored as NULL
>>>>>> values.
>>>>>>
>>>>>>
>>>>>> Ex:
>>>>>>
>>>>>>
>>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>>>>> 1 0 1 NULL NULL
>>>>>> 1 1 2 Hi NULL
>>>>>> 1 3 4 Hello Hi
>>>>>> 1 2 5 NULL NULL
>>>>>> 1 4 NULL NULL Zero
>>>>>>
>>>>>>
>>>>>> P.S: Please consider Date_time as valid timestamp type values. For
>>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>>>>>>
>>>>>>
>>>>>> As seen in the above table, the events have a unique id and as we get
>>>>>> an update to a particular event, we are storing the date_time at which
>>>>>> update has happened and also storing the partial updated values. Apart from
>>>>>> the updated values, the rest are stored as NULL values.
>>>>>>
>>>>>>
>>>>>> We are planning to mimic inplace updates on the table, so that it
>>>>>> would retrieve the resulting table as follows using the query below: We
>>>>>> don't delete the data.
>>>>>>
>>>>>>
>>>>>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time)
>>>>>> as B, current_val(C,date_time) as C from e_update GROUP BY ID;
>>>>>>
>>>>>>
>>>>>> where, current_val is a custom impala UDA we are planning to
>>>>>> implement. i.e. get* latest non null value for the column.*
>>>>>>
>>>>>>
>>>>>> ID (Int) A (Int) B (String) C (String)
>>>>>> 1 4 Hello Zero
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Implemented current_val UDA:
>>>>>> The below code is only for int type inputs:
>>>>>>
>>>>>>
>>>>>> uda-currentval.h
>>>>>>
>>>>>> //This is a sample for retrieving the current value of e_update table
>>>>>> //
>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val);
>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst);
>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val);
>>>>>>
>>>>>> uda-currentval.cc
>>>>>>
>>>>>> // -----------------------------------------------------------------------------------------------
>>>>>> // This is a sample for retrieving the current value of e_update table
>>>>>> //-----------------------------------------------------------------------------------------------
>>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>>>>>       val->is_null = false;
>>>>>>       val->val = 0;
>>>>>> }
>>>>>>
>>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val) {
>>>>>>       static TimestampVal* tsTemp;
>>>>>>       tsTemp->date = 0;
>>>>>>       tsTemp->time_of_day = 0;
>>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>>>>>         tsTemp->date = ts.date;
>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>         val->val = input.val;
>>>>>>         return;
>>>>>>       }
>>>>>>       if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){
>>>>>>         tsTemp->date = ts.date;
>>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>>         val->val = input.val;
>>>>>>         return;
>>>>>>       }
>>>>>> }
>>>>>>
>>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst) {
>>>>>>      dst->val += src.val;
>>>>>> }
>>>>>>
>>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) {
>>>>>>      return val;
>>>>>> }
>>>>>>
>>>>>>
>>>>>> We are able to build and create an aggregate function in impala, but
>>>>>> when trying to run the select query similar to the one above, it is
>>>>>> bringing down couple of impala deamons and throwing the error below and
>>>>>> getting terminated.
>>>>>>
>>>>>>
>>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>>>>> hadoop102.**.**.**.com:22000
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> We have impalad running on 14 instances.
>>>>>>
>>>>>>
>>>>>> Can someone help resolve us this problem and a better way to achieve
>>>>>> a solution for the scenario explained.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>

Re: Creating Impala UDA

Posted by Ravi Kanth <ra...@gmail.com>.
Antoni,

The problem in using last_value function() as far as I observed is, if I
use it on multiple columns in a single query, its not retrieving results as
expected.

 Input:
ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
1 0 1 NULL NULL
1 1 2 Hi NULL
1 3 4 Hello Hi
1 2 5 NULL NULL
1 4 NULL NULL Zero

Expected Output:


ID (Int) A (Int) B (String) C (String)
1 4 Hello Zero


Query executed:

select id, last_value(a) over(partition by id order by date_time desc) as
a, last_value(b) over(partition by id order by date_time desc) as b,
last_value(c) over(partition by id order by date_time desc) as c from
udf_test where a is not null and b is not null and c is not null;


Output I am getting:

+----+---+-------+----+

| id | a | b     | c  |

+----+---+-------+----+

| 1  | 4 | Hello | Hi ||

+----+---+-------+----+

Hopefully, I am clear with the problem above.

Thanks,
Ravi

On 20 June 2017 at 22:05, Ravi Kanth <ra...@gmail.com> wrote:

> Antoni,
>
> Thanks for the suggestion. Let me have a look at it and hopefully we can
> use it in our use case.
>
> Thanks,
> Ravi
>
> On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <ai...@vmware.com> wrote:
>
>> Hi Ravi,
>>
>> I am curious why you are not using already existing last_value function
>> in Impala to get "latest non null value for the column”
>>
>> e.g
>> last_value(column_a ignore nulls) over(partition by ID  order by
>> Date_Time)
>>
>> Thanks,
>> Antoni
>>
>>
>>
>>
>> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <ta...@cloudera.com>
>> wrote:
>>
>> This was double-posted to http://community.cloudera.com/
>> t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/
>> 56201/highlight/false#M3073
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__community.cloudera.com_t5_Interactive-2DShort-2Dcycle-2DSQL_Creating-2DImpala-2DUDA_m-2Dp_56201_highlight_false-23M3073&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=ySJ0TF-4OH9kZJebUfH8uNEFpwKylGthB9pmsRSfXJc&e=>
>> also. I'll continue the discussion here.
>>
>> > Can we have the flexibility of declaring the variable globally in UDF?
>> Globally, I mean outside the function?
>>
>> > And, the reason I am declaring a static variable is to restore the
>> value of timestamp for every record so that I can perform a comparison of
>> the timestamps. Is there an alternative approach for this?
>>
>> Updating a global or static variable in a UDAF is guaranteed not to do
>> what you expect - the function can be invoked concurrently by multiple
>> threads.
>>
>> It seems like you probably want to store some additional state in the
>> intermediate value. There are some sample UDAs here (see Avg()) where
>> additional intermediate state is stored in a StringVal:
>> https://github.com/cloudera/impala-udf-samples/blob/
>> master/uda-sample.cc#L61
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_cloudera_impala-2Dudf-2Dsamples_blob_master_uda-2Dsample.cc-23L61&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=5NvnQzufO43--_qRtdwsm7ukEoVM2Z3bwVv5R9HBM1Q&e=>
>>
>> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <ra...@gmail.com>
>> wrote:
>>
>>> Thanks Bharath. Can you check if the logic I am implementing is correct
>>> or needed any modification in it as well? I am very new to Impala UDF & C++
>>> and having some hard time figuring out the problems.
>>>
>>> On 20 June 2017 at 14:27, Bharath Vissapragada <bh...@cloudera.com>
>>> wrote:
>>>
>>>> You need to allocate memory for tsTemp, else it can segfault. That
>>>> could be the issue here.
>>>>
>>>>  static TimestampVal* tsTemp;
>>>>       tsTemp->date = 0;
>>>>       tsTemp->time_of_day = 0;
>>>>
>>>>
>>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <ra...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>> We are using Impala to do various processings in our systems. We have
>>>>> a requirement recently, wherein we have to handle the updates on the events
>>>>> i.e, we have an 'e_update' table which has the partial updates received for
>>>>> various events. The fields that are not updated are being stored as NULL
>>>>> values.
>>>>>
>>>>>
>>>>> Ex:
>>>>>
>>>>>
>>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>>>> 1 0 1 NULL NULL
>>>>> 1 1 2 Hi NULL
>>>>> 1 3 4 Hello Hi
>>>>> 1 2 5 NULL NULL
>>>>> 1 4 NULL NULL Zero
>>>>>
>>>>>
>>>>> P.S: Please consider Date_time as valid timestamp type values. For
>>>>> easy understanding, mentioned them as 0,1,2,3,4,5
>>>>>
>>>>>
>>>>> As seen in the above table, the events have a unique id and as we get
>>>>> an update to a particular event, we are storing the date_time at which
>>>>> update has happened and also storing the partial updated values. Apart from
>>>>> the updated values, the rest are stored as NULL values.
>>>>>
>>>>>
>>>>> We are planning to mimic inplace updates on the table, so that it
>>>>> would retrieve the resulting table as follows using the query below: We
>>>>> don't delete the data.
>>>>>
>>>>>
>>>>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time)
>>>>> as B, current_val(C,date_time) as C from e_update GROUP BY ID;
>>>>>
>>>>>
>>>>> where, current_val is a custom impala UDA we are planning to
>>>>> implement. i.e. get* latest non null value for the column.*
>>>>>
>>>>>
>>>>> ID (Int) A (Int) B (String) C (String)
>>>>> 1 4 Hello Zero
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Implemented current_val UDA:
>>>>> The below code is only for int type inputs:
>>>>>
>>>>>
>>>>> uda-currentval.h
>>>>>
>>>>> //This is a sample for retrieving the current value of e_update table
>>>>> //
>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val);
>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst);
>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val);
>>>>>
>>>>> uda-currentval.cc
>>>>>
>>>>> // -----------------------------------------------------------------------------------------------
>>>>> // This is a sample for retrieving the current value of e_update table
>>>>> //-----------------------------------------------------------------------------------------------
>>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>>>>       val->is_null = false;
>>>>>       val->val = 0;
>>>>> }
>>>>>
>>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val) {
>>>>>       static TimestampVal* tsTemp;
>>>>>       tsTemp->date = 0;
>>>>>       tsTemp->time_of_day = 0;
>>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>>>>         tsTemp->date = ts.date;
>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>         val->val = input.val;
>>>>>         return;
>>>>>       }
>>>>>       if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){
>>>>>         tsTemp->date = ts.date;
>>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>>         val->val = input.val;
>>>>>         return;
>>>>>       }
>>>>> }
>>>>>
>>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst) {
>>>>>      dst->val += src.val;
>>>>> }
>>>>>
>>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) {
>>>>>      return val;
>>>>> }
>>>>>
>>>>>
>>>>> We are able to build and create an aggregate function in impala, but
>>>>> when trying to run the select query similar to the one above, it is
>>>>> bringing down couple of impala deamons and throwing the error below and
>>>>> getting terminated.
>>>>>
>>>>>
>>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>>>> hadoop102.**.**.**.com:22000
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> We have impalad running on 14 instances.
>>>>>
>>>>>
>>>>> Can someone help resolve us this problem and a better way to achieve a
>>>>> solution for the scenario explained.
>>>>>
>>>>
>>>>
>>>
>>
>>

Re: Creating Impala UDA

Posted by Ravi Kanth <ra...@gmail.com>.
Antoni,

Thanks for the suggestion. Let me have a look at it and hopefully we can
use it in our use case.

Thanks,
Ravi

On Tue, Jun 20, 2017 at 21:53 Antoni Ivanov <ai...@vmware.com> wrote:

> Hi Ravi,
>
> I am curious why you are not using already existing last_value function in
> Impala to get "latest non null value for the column”
>
> e.g
> last_value(column_a ignore nulls) over(partition by ID  order by Date_Time)
>
> Thanks,
> Antoni
>
>
>
>
> On Jun 21, 2017, at 1:17 AM, Tim Armstrong <ta...@cloudera.com>
> wrote:
>
> This was double-posted to
> http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__community.cloudera.com_t5_Interactive-2DShort-2Dcycle-2DSQL_Creating-2DImpala-2DUDA_m-2Dp_56201_highlight_false-23M3073&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=ySJ0TF-4OH9kZJebUfH8uNEFpwKylGthB9pmsRSfXJc&e=>
> also. I'll continue the discussion here.
>
> > Can we have the flexibility of declaring the variable globally in UDF?
> Globally, I mean outside the function?
>
> > And, the reason I am declaring a static variable is to restore the value
> of timestamp for every record so that I can perform a comparison of the
> timestamps. Is there an alternative approach for this?
>
> Updating a global or static variable in a UDAF is guaranteed not to do
> what you expect - the function can be invoked concurrently by multiple
> threads.
>
> It seems like you probably want to store some additional state in the
> intermediate value. There are some sample UDAs here (see Avg()) where
> additional intermediate state is stored in a StringVal:
> https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_cloudera_impala-2Dudf-2Dsamples_blob_master_uda-2Dsample.cc-23L61&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=5NvnQzufO43--_qRtdwsm7ukEoVM2Z3bwVv5R9HBM1Q&e=>
>
> On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <ra...@gmail.com>
> wrote:
>
>> Thanks Bharath. Can you check if the logic I am implementing is correct
>> or needed any modification in it as well? I am very new to Impala UDF & C++
>> and having some hard time figuring out the problems.
>>
>> On 20 June 2017 at 14:27, Bharath Vissapragada <bh...@cloudera.com>
>> wrote:
>>
>>> You need to allocate memory for tsTemp, else it can segfault. That could
>>> be the issue here.
>>>
>>>  static TimestampVal* tsTemp;
>>>       tsTemp->date = 0;
>>>       tsTemp->time_of_day = 0;
>>>
>>>
>>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <ra...@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>> We are using Impala to do various processings in our systems. We have a
>>>> requirement recently, wherein we have to handle the updates on the events
>>>> i.e, we have an 'e_update' table which has the partial updates received for
>>>> various events. The fields that are not updated are being stored as NULL
>>>> values.
>>>>
>>>>
>>>> Ex:
>>>>
>>>>
>>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>>> 1 0 1 NULL NULL
>>>> 1 1 2 Hi NULL
>>>> 1 3 4 Hello Hi
>>>> 1 2 5 NULL NULL
>>>> 1 4 NULL NULL Zero
>>>>
>>>>
>>>> P.S: Please consider Date_time as valid timestamp type values. For easy
>>>> understanding, mentioned them as 0,1,2,3,4,5
>>>>
>>>>
>>>> As seen in the above table, the events have a unique id and as we get
>>>> an update to a particular event, we are storing the date_time at which
>>>> update has happened and also storing the partial updated values. Apart from
>>>> the updated values, the rest are stored as NULL values.
>>>>
>>>>
>>>> We are planning to mimic inplace updates on the table, so that it would
>>>> retrieve the resulting table as follows using the query below: We don't
>>>> delete the data.
>>>>
>>>>
>>>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time) as
>>>> B, current_val(C,date_time) as C from e_update GROUP BY ID;
>>>>
>>>>
>>>> where, current_val is a custom impala UDA we are planning to implement.
>>>> i.e. get* latest non null value for the column.*
>>>>
>>>>
>>>> ID (Int) A (Int) B (String) C (String)
>>>> 1 4 Hello Zero
>>>>
>>>>
>>>>
>>>>
>>>> Implemented current_val UDA:
>>>> The below code is only for int type inputs:
>>>>
>>>>
>>>> uda-currentval.h
>>>>
>>>> //This is a sample for retrieving the current value of e_update table
>>>> //
>>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val);
>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst);
>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val);
>>>>
>>>> uda-currentval.cc
>>>>
>>>> // -----------------------------------------------------------------------------------------------
>>>> // This is a sample for retrieving the current value of e_update table
>>>> //-----------------------------------------------------------------------------------------------
>>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>>>       val->is_null = false;
>>>>       val->val = 0;
>>>> }
>>>>
>>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val) {
>>>>       static TimestampVal* tsTemp;
>>>>       tsTemp->date = 0;
>>>>       tsTemp->time_of_day = 0;
>>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>>>         tsTemp->date = ts.date;
>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>         val->val = input.val;
>>>>         return;
>>>>       }
>>>>       if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){
>>>>         tsTemp->date = ts.date;
>>>>         tsTemp->time_of_day = ts.time_of_day;
>>>>         val->val = input.val;
>>>>         return;
>>>>       }
>>>> }
>>>>
>>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst) {
>>>>      dst->val += src.val;
>>>> }
>>>>
>>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) {
>>>>      return val;
>>>> }
>>>>
>>>>
>>>> We are able to build and create an aggregate function in impala, but
>>>> when trying to run the select query similar to the one above, it is
>>>> bringing down couple of impala deamons and throwing the error below and
>>>> getting terminated.
>>>>
>>>>
>>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>>> hadoop102.**.**.**.com:22000
>>>>
>>>>
>>>>
>>>>
>>>> We have impalad running on 14 instances.
>>>>
>>>>
>>>> Can someone help resolve us this problem and a better way to achieve a
>>>> solution for the scenario explained.
>>>>
>>>
>>>
>>
>
>

Re: Creating Impala UDA

Posted by Antoni Ivanov <ai...@vmware.com>.
Hi Ravi,

I am curious why you are not using already existing last_value function in Impala to get "latest non null value for the column”

e.g
last_value(column_a ignore nulls) over(partition by ID  order by Date_Time)

Thanks,
Antoni




On Jun 21, 2017, at 1:17 AM, Tim Armstrong <ta...@cloudera.com>> wrote:

This was double-posted to http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073<https://urldefense.proofpoint.com/v2/url?u=http-3A__community.cloudera.com_t5_Interactive-2DShort-2Dcycle-2DSQL_Creating-2DImpala-2DUDA_m-2Dp_56201_highlight_false-23M3073&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=ySJ0TF-4OH9kZJebUfH8uNEFpwKylGthB9pmsRSfXJc&e=> also. I'll continue the discussion here.

> Can we have the flexibility of declaring the variable globally in UDF? Globally, I mean outside the function?

> And, the reason I am declaring a static variable is to restore the value of timestamp for every record so that I can perform a comparison of the timestamps. Is there an alternative approach for this?

Updating a global or static variable in a UDAF is guaranteed not to do what you expect - the function can be invoked concurrently by multiple threads.

It seems like you probably want to store some additional state in the intermediate value. There are some sample UDAs here (see Avg()) where additional intermediate state is stored in a StringVal: https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_cloudera_impala-2Dudf-2Dsamples_blob_master_uda-2Dsample.cc-23L61&d=DwMFaQ&c=uilaK90D4TOVoH58JNXRgQ&r=8j7tFznmoySfY9RiEAZHgvi3kzcbJ_Zy6Hp9HYX4dDE&m=hJl6J5oweVVguuh7TmOZhr9mMy6SvYUZ7kpQT_oTPY8&s=5NvnQzufO43--_qRtdwsm7ukEoVM2Z3bwVv5R9HBM1Q&e=>

On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <ra...@gmail.com>> wrote:
Thanks Bharath. Can you check if the logic I am implementing is correct or needed any modification in it as well? I am very new to Impala UDF & C++ and having some hard time figuring out the problems.

On 20 June 2017 at 14:27, Bharath Vissapragada <bh...@cloudera.com>> wrote:
You need to allocate memory for tsTemp, else it can segfault. That could be the issue here.


 static TimestampVal* tsTemp;
      tsTemp->date = 0;
      tsTemp->time_of_day = 0;

On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <ra...@gmail.com>> wrote:
Hi All,
We are using Impala to do various processings in our systems. We have a requirement recently, wherein we have to handle the updates on the events i.e, we have an 'e_update' table which has the partial updates received for various events. The fields that are not updated are being stored as NULL values.



Ex:



ID (Int)        Date_Time (timestamp)   A (Int) B (String)      C (String)
1       0       1       NULL    NULL
1       1       2       Hi      NULL
1       3       4       Hello   Hi
1       2       5       NULL    NULL
1       4       NULL    NULL    Zero



P.S: Please consider Date_time as valid timestamp type values. For easy understanding, mentioned them as 0,1,2,3,4,5



As seen in the above table, the events have a unique id and as we get an update to a particular event, we are storing the date_time at which update has happened and also storing the partial updated values. Apart from the updated values, the rest are stored as NULL values.



We are planning to mimic inplace updates on the table, so that it would retrieve the resulting table as follows using the query below: We don't delete the data.



> SELECT id, current_val(A,date_time) as A, current_val(B,date_time) as B, current_val(C,date_time) as C from e_update GROUP BY ID;



where, current_val is a custom impala UDA we are planning to implement. i.e. get latest non null value for the column.



ID (Int)        A (Int) B (String)      C (String)
1       4       Hello   Zero





Implemented current_val UDA:
The below code is only for int type inputs:



uda-currentval.h

//This is a sample for retrieving the current value of e_update table
//
void CurrentValueInit(FunctionContext* context, IntVal* val);
void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val);
void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst);
IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val);

uda-currentval.cc<http://uda-currentval.cc>

// -----------------------------------------------------------------------------------------------
// This is a sample for retrieving the current value of e_update table
//-----------------------------------------------------------------------------------------------
void CurrentValueInit(FunctionContext* context, IntVal* val) {
      val->is_null = false;
      val->val = 0;
}

void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val) {
      static TimestampVal* tsTemp;
      tsTemp->date = 0;
      tsTemp->time_of_day = 0;
      if(tsTemp->date==0 && tsTemp->time_of_day==0){
        tsTemp->date = ts.date;
        tsTemp->time_of_day = ts.time_of_day;
        val->val = input.val;
        return;
      }
      if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){
        tsTemp->date = ts.date;
        tsTemp->time_of_day = ts.time_of_day;
        val->val = input.val;
        return;
      }
}

void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst) {
     dst->val += src.val;
}

IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) {
     return val;
}



We are able to build and create an aggregate function in impala, but when trying to run the select query similar to the one above, it is bringing down couple of impala deamons and throwing the error below and getting terminated.



WARNINGS: Cancelled due to unreachable impalad(s): hadoop102.**.**.**.com:22000





We have impalad running on 14 instances.



Can someone help resolve us this problem and a better way to achieve a solution for the scenario explained.





Re: Creating Impala UDA

Posted by Tim Armstrong <ta...@cloudera.com>.
This was double-posted to
http://community.cloudera.com/t5/Interactive-Short-cycle-SQL/Creating-Impala-UDA/m-p/56201/highlight/false#M3073
also. I'll continue the discussion here.

> Can we have the flexibility of declaring the variable globally in UDF?
Globally, I mean outside the function?

> And, the reason I am declaring a static variable is to restore the value
of timestamp for every record so that I can perform a comparison of the
timestamps. Is there an alternative approach for this?

Updating a global or static variable in a UDAF is guaranteed not to do what
you expect - the function can be invoked concurrently by multiple threads.

It seems like you probably want to store some additional state in the
intermediate value. There are some sample UDAs here (see Avg()) where
additional intermediate state is stored in a StringVal:
https://github.com/cloudera/impala-udf-samples/blob/master/uda-sample.cc#L61

On Tue, Jun 20, 2017 at 2:40 PM, Ravi Kanth <ra...@gmail.com> wrote:

> Thanks Bharath. Can you check if the logic I am implementing is correct or
> needed any modification in it as well? I am very new to Impala UDF & C++
> and having some hard time figuring out the problems.
>
> On 20 June 2017 at 14:27, Bharath Vissapragada <bh...@cloudera.com>
> wrote:
>
>> You need to allocate memory for tsTemp, else it can segfault. That could
>> be the issue here.
>>
>>  static TimestampVal* tsTemp;
>>       tsTemp->date = 0;
>>       tsTemp->time_of_day = 0;
>>
>>
>> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <ra...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> We are using Impala to do various processings in our systems. We have a
>>> requirement recently, wherein we have to handle the updates on the events
>>> i.e, we have an 'e_update' table which has the partial updates received for
>>> various events. The fields that are not updated are being stored as NULL
>>> values.
>>>
>>>
>>>
>>> Ex:
>>>
>>>
>>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>>> 1 0 1 NULL NULL
>>> 1 1 2 Hi NULL
>>> 1 3 4 Hello Hi
>>> 1 2 5 NULL NULL
>>> 1 4 NULL NULL Zero
>>>
>>>
>>>
>>> P.S: Please consider Date_time as valid timestamp type values. For easy
>>> understanding, mentioned them as 0,1,2,3,4,5
>>>
>>>
>>>
>>> As seen in the above table, the events have a unique id and as we get
>>> an update to a particular event, we are storing the date_time at which
>>> update has happened and also storing the partial updated values. Apart from
>>> the updated values, the rest are stored as NULL values.
>>>
>>>
>>>
>>> We are planning to mimic inplace updates on the table, so that it would
>>> retrieve the resulting table as follows using the query below: We don't
>>> delete the data.
>>>
>>>
>>>
>>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time) as
>>> B, current_val(C,date_time) as C from e_update GROUP BY ID;
>>>
>>>
>>>
>>> where, current_val is a custom impala UDA we are planning to implement.
>>> i.e. get* latest non null value for the column.*
>>>
>>>
>>> ID (Int) A (Int) B (String) C (String)
>>> 1 4 Hello Zero
>>>
>>>
>>>
>>>
>>>
>>> Implemented current_val UDA:
>>>
>>> The below code is only for int type inputs:
>>>
>>>
>>>
>>> uda-currentval.h
>>>
>>> //This is a sample for retrieving the current value of e_update table
>>> //
>>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val);
>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst);
>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val);
>>>
>>> uda-currentval.cc
>>>
>>> // -----------------------------------------------------------------------------------------------
>>> // This is a sample for retrieving the current value of e_update table
>>> //-----------------------------------------------------------------------------------------------
>>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>>       val->is_null = false;
>>>       val->val = 0;
>>> }
>>>
>>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val) {
>>>       static TimestampVal* tsTemp;
>>>       tsTemp->date = 0;
>>>       tsTemp->time_of_day = 0;
>>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>>         tsTemp->date = ts.date;
>>>         tsTemp->time_of_day = ts.time_of_day;
>>>         val->val = input.val;
>>>         return;
>>>       }
>>>       if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){
>>>         tsTemp->date = ts.date;
>>>         tsTemp->time_of_day = ts.time_of_day;
>>>         val->val = input.val;
>>>         return;
>>>       }
>>> }
>>>
>>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst) {
>>>      dst->val += src.val;
>>> }
>>>
>>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) {
>>>      return val;
>>> }
>>>
>>>
>>>
>>> We are able to build and create an aggregate function in impala, but
>>> when trying to run the select query similar to the one above, it is
>>> bringing down couple of impala deamons and throwing the error below and
>>> getting terminated.
>>>
>>>
>>>
>>> WARNINGS: Cancelled due to unreachable impalad(s):
>>> hadoop102.**.**.**.com:22000
>>>
>>>
>>>
>>>
>>>
>>> We have impalad running on 14 instances.
>>>
>>>
>>>
>>> Can someone help resolve us this problem and a better way to achieve a
>>> solution for the scenario explained.
>>>
>>
>>
>

Re: Creating Impala UDA

Posted by Ravi Kanth <ra...@gmail.com>.
Thanks Bharath. Can you check if the logic I am implementing is correct or
needed any modification in it as well? I am very new to Impala UDF & C++
and having some hard time figuring out the problems.

On 20 June 2017 at 14:27, Bharath Vissapragada <bh...@cloudera.com>
wrote:

> You need to allocate memory for tsTemp, else it can segfault. That could
> be the issue here.
>
>  static TimestampVal* tsTemp;
>       tsTemp->date = 0;
>       tsTemp->time_of_day = 0;
>
>
> On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <ra...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> We are using Impala to do various processings in our systems. We have a
>> requirement recently, wherein we have to handle the updates on the events
>> i.e, we have an 'e_update' table which has the partial updates received for
>> various events. The fields that are not updated are being stored as NULL
>> values.
>>
>>
>>
>> Ex:
>>
>>
>> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
>> 1 0 1 NULL NULL
>> 1 1 2 Hi NULL
>> 1 3 4 Hello Hi
>> 1 2 5 NULL NULL
>> 1 4 NULL NULL Zero
>>
>>
>>
>> P.S: Please consider Date_time as valid timestamp type values. For easy
>> understanding, mentioned them as 0,1,2,3,4,5
>>
>>
>>
>> As seen in the above table, the events have a unique id and as we get
>> an update to a particular event, we are storing the date_time at which
>> update has happened and also storing the partial updated values. Apart from
>> the updated values, the rest are stored as NULL values.
>>
>>
>>
>> We are planning to mimic inplace updates on the table, so that it would
>> retrieve the resulting table as follows using the query below: We don't
>> delete the data.
>>
>>
>>
>> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time) as
>> B, current_val(C,date_time) as C from e_update GROUP BY ID;
>>
>>
>>
>> where, current_val is a custom impala UDA we are planning to implement.
>> i.e. get* latest non null value for the column.*
>>
>>
>> ID (Int) A (Int) B (String) C (String)
>> 1 4 Hello Zero
>>
>>
>>
>>
>>
>> Implemented current_val UDA:
>>
>> The below code is only for int type inputs:
>>
>>
>>
>> uda-currentval.h
>>
>> //This is a sample for retrieving the current value of e_update table
>> //
>> void CurrentValueInit(FunctionContext* context, IntVal* val);
>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val);
>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst);
>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val);
>>
>> uda-currentval.cc
>>
>> // -----------------------------------------------------------------------------------------------
>> // This is a sample for retrieving the current value of e_update table
>> //-----------------------------------------------------------------------------------------------
>> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>>       val->is_null = false;
>>       val->val = 0;
>> }
>>
>> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val) {
>>       static TimestampVal* tsTemp;
>>       tsTemp->date = 0;
>>       tsTemp->time_of_day = 0;
>>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>>         tsTemp->date = ts.date;
>>         tsTemp->time_of_day = ts.time_of_day;
>>         val->val = input.val;
>>         return;
>>       }
>>       if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){
>>         tsTemp->date = ts.date;
>>         tsTemp->time_of_day = ts.time_of_day;
>>         val->val = input.val;
>>         return;
>>       }
>> }
>>
>> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst) {
>>      dst->val += src.val;
>> }
>>
>> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) {
>>      return val;
>> }
>>
>>
>>
>> We are able to build and create an aggregate function in impala, but when
>> trying to run the select query similar to the one above, it is bringing
>> down couple of impala deamons and throwing the error below and getting
>> terminated.
>>
>>
>>
>> WARNINGS: Cancelled due to unreachable impalad(s):
>> hadoop102.**.**.**.com:22000
>>
>>
>>
>>
>>
>> We have impalad running on 14 instances.
>>
>>
>>
>> Can someone help resolve us this problem and a better way to achieve a
>> solution for the scenario explained.
>>
>
>

Re: Creating Impala UDA

Posted by Bharath Vissapragada <bh...@cloudera.com>.
You need to allocate memory for tsTemp, else it can segfault. That could be
the issue here.

 static TimestampVal* tsTemp;
      tsTemp->date = 0;
      tsTemp->time_of_day = 0;


On Tue, Jun 20, 2017 at 2:15 PM, Ravi Kanth <ra...@gmail.com> wrote:

> Hi All,
>
> We are using Impala to do various processings in our systems. We have a
> requirement recently, wherein we have to handle the updates on the events
> i.e, we have an 'e_update' table which has the partial updates received for
> various events. The fields that are not updated are being stored as NULL
> values.
>
>
>
> Ex:
>
>
> ID (Int) Date_Time (timestamp) A (Int) B (String) C (String)
> 1 0 1 NULL NULL
> 1 1 2 Hi NULL
> 1 3 4 Hello Hi
> 1 2 5 NULL NULL
> 1 4 NULL NULL Zero
>
>
>
> P.S: Please consider Date_time as valid timestamp type values. For easy
> understanding, mentioned them as 0,1,2,3,4,5
>
>
>
> As seen in the above table, the events have a unique id and as we get
> an update to a particular event, we are storing the date_time at which
> update has happened and also storing the partial updated values. Apart from
> the updated values, the rest are stored as NULL values.
>
>
>
> We are planning to mimic inplace updates on the table, so that it would
> retrieve the resulting table as follows using the query below: We don't
> delete the data.
>
>
>
> > SELECT id, current_val(A,date_time) as A, current_val(B,date_time) as B,
> current_val(C,date_time) as C from e_update GROUP BY ID;
>
>
>
> where, current_val is a custom impala UDA we are planning to implement.
> i.e. get* latest non null value for the column.*
>
>
> ID (Int) A (Int) B (String) C (String)
> 1 4 Hello Zero
>
>
>
>
>
> Implemented current_val UDA:
>
> The below code is only for int type inputs:
>
>
>
> uda-currentval.h
>
> //This is a sample for retrieving the current value of e_update table
> //
> void CurrentValueInit(FunctionContext* context, IntVal* val);
> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val);
> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst);
> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val);
>
> uda-currentval.cc
>
> // -----------------------------------------------------------------------------------------------
> // This is a sample for retrieving the current value of e_update table
> //-----------------------------------------------------------------------------------------------
> void CurrentValueInit(FunctionContext* context, IntVal* val) {
>       val->is_null = false;
>       val->val = 0;
> }
>
> void CurrentValueUpdate(FunctionContext* context, const IntVal& input, const TimestampVal& ts, IntVal* val) {
>       static TimestampVal* tsTemp;
>       tsTemp->date = 0;
>       tsTemp->time_of_day = 0;
>       if(tsTemp->date==0 && tsTemp->time_of_day==0){
>         tsTemp->date = ts.date;
>         tsTemp->time_of_day = ts.time_of_day;
>         val->val = input.val;
>         return;
>       }
>       if(ts.date > tsTemp->date && ts.time_of_day > tsTemp->time_of_day){
>         tsTemp->date = ts.date;
>         tsTemp->time_of_day = ts.time_of_day;
>         val->val = input.val;
>         return;
>       }
> }
>
> void CurrentValueMerge(FunctionContext* context, const IntVal& src, IntVal* dst) {
>      dst->val += src.val;
> }
>
> IntVal CurrentValueFinalize(FunctionContext* context, const IntVal& val) {
>      return val;
> }
>
>
>
> We are able to build and create an aggregate function in impala, but when
> trying to run the select query similar to the one above, it is bringing
> down couple of impala deamons and throwing the error below and getting
> terminated.
>
>
>
> WARNINGS: Cancelled due to unreachable impalad(s):
> hadoop102.**.**.**.com:22000
>
>
>
>
>
> We have impalad running on 14 instances.
>
>
>
> Can someone help resolve us this problem and a better way to achieve a
> solution for the scenario explained.
>