You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by karan alang <ka...@gmail.com> on 2022/02/23 20:42:40 UTC

Structured Streaming + UDF - logic based on checking if a column is present in the Dataframe

Hello All,

I'm using StructuredStreaming, and am trying to use UDF to parse each row.
Here is the requirement:

   - we can get alerts of a particular KPI with type 'major' OR 'critical'
   - for a KPI, if we get alerts of type 'major' eg _major, and we have a
   critical alert as well _critical, we need to ignore the _major alert, and
   consider _critical alert only

There are ~25 alerts which are stored in the array (AlarmKeys.alarm_all)

UDF Code (draft):

@udf(returnType=StringType())def convertStructToStr(APP_CAUSE,
tenantName, window,<one>,<two>__major,<three>__major,
<four>__critical, five__major, <six>__critical):

    res = "{window: "+ str(window) + "type: 10m, applianceName: "+
str(APP_CAUSE)+","
    first = True
    for curr_alarm in AlarmKeys.alarms_all:
        alsplit = curr_alarm.split('__')
        if len(alsplit) == 2:
            # Only account for critical row if both major & critical are there
            if alsplit[1] == 'major':
                critical_alarm = alsplit[0] + "__critical"
                if int(col(critical_alarm)) > 0:
                    continue
        if int(col(curr_alarm)) > 0:
            if first:
                mystring = "{} {}({})".format(mystring,
AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
                first = False
            else:
                mystring = "{}, {}({})".format(mystring,
AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
        res+="insight: "+mystring +"}"

# structured streaming using udf, this is printing data on console#
eventually, i'll put data into Kafka instead
df.select(convertStructToStr(*df.columns)) \
.write \
    .format("console") \
    .option("numRows",100)\
    .option("checkpointLocation",
"/Users/karanalang/PycharmProjects/Kafka/checkpoint") \
    .option("outputMode", "complete")\
    .save("output")

Additional Details in stackoverflow :
https://stackoverflow.com/questions/71243726/structured-streaming-udf-logic-based-on-checking-if-a-column-is-present-in-t


Question is -

Can this be done using UDF ? Since I'm passing column values to the UDF, I
have no way to check if a particular KPI of type 'critical' is available in
the dataframe ?

Any suggestions on the best way to solve this problem ?
tia!

Re: Structured Streaming + UDF - logic based on checking if a column is present in the Dataframe

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

can you please let us know the following:
1. the spark version
2. a few samples of input data
3. a few samples of what is the expected output that you want


Regards,
Gourav Sengupta

On Wed, Feb 23, 2022 at 8:43 PM karan alang <ka...@gmail.com> wrote:

> Hello All,
>
> I'm using StructuredStreaming, and am trying to use UDF to parse each row.
> Here is the requirement:
>
>    - we can get alerts of a particular KPI with type 'major' OR 'critical'
>    - for a KPI, if we get alerts of type 'major' eg _major, and we have a
>    critical alert as well _critical, we need to ignore the _major alert, and
>    consider _critical alert only
>
> There are ~25 alerts which are stored in the array (AlarmKeys.alarm_all)
>
> UDF Code (draft):
>
> @udf(returnType=StringType())def convertStructToStr(APP_CAUSE, tenantName, window,<one>,<two>__major,<three>__major, <four>__critical, five__major, <six>__critical):
>
>     res = "{window: "+ str(window) + "type: 10m, applianceName: "+ str(APP_CAUSE)+","
>     first = True
>     for curr_alarm in AlarmKeys.alarms_all:
>         alsplit = curr_alarm.split('__')
>         if len(alsplit) == 2:
>             # Only account for critical row if both major & critical are there
>             if alsplit[1] == 'major':
>                 critical_alarm = alsplit[0] + "__critical"
>                 if int(col(critical_alarm)) > 0:
>                     continue
>         if int(col(curr_alarm)) > 0:
>             if first:
>                 mystring = "{} {}({})".format(mystring, AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
>                 first = False
>             else:
>                 mystring = "{}, {}({})".format(mystring, AlarmKeys.alarm_exp[curr_alarm], str(col(curr_alarm)))
>         res+="insight: "+mystring +"}"
>
> # structured streaming using udf, this is printing data on console# eventually, i'll put data into Kafka instead
> df.select(convertStructToStr(*df.columns)) \
> .write \
>     .format("console") \
>     .option("numRows",100)\
>     .option("checkpointLocation", "/Users/karanalang/PycharmProjects/Kafka/checkpoint") \
>     .option("outputMode", "complete")\
>     .save("output")
>
> Additional Details in stackoverflow :
>
> https://stackoverflow.com/questions/71243726/structured-streaming-udf-logic-based-on-checking-if-a-column-is-present-in-t
>
>
> Question is -
>
> Can this be done using UDF ? Since I'm passing column values to the UDF, I
> have no way to check if a particular KPI of type 'critical' is available in
> the dataframe ?
>
> Any suggestions on the best way to solve this problem ?
> tia!
>
>