You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aakash Basu <aa...@gmail.com> on 2018/05/31 09:50:45 UTC

[Suggestions needed] Weight of Evidence PySpark

Hi guys,

I'm trying to calculate WoE on a particular categorical column depending on
the target column. But the code is taking a lot of time on very few
datapoints (rows).

How can I optimize it to make it performant enough?

Here's the code (here categorical_col is a python list of columns) -

for item in categorical_col:
    new_df = spark.sql('Select `' + item + '`, `' + target_col + '`,
count(*) as Counts from a group by `'
                       + item + '`, `' + target_col + '` order by `' +
item + '`')
    # new_df.show()
    new_df.registerTempTable('b')
    # exit(0)
    new_df2 = spark.sql('Select `' + item + '`, ' +
                        'case when `' + target_col + '` == 0 then
Counts else 0 end as Count_0, ' +
                        'case when `' + target_col + '` == 1 then
Counts else 0 end as Count_1 ' +
                        'from b')

    spark.catalog.dropTempView('b')
    # new_df2.show()
    new_df2.registerTempTable('c')
    # exit(0)

    new_df3 = spark.sql('SELECT `' + item + '`, SUM(Count_0) AS Count_0, ' +
                        'SUM(Count_1) AS Count_1 FROM c GROUP BY `' +
item + '`')

    spark.catalog.dropTempView('c')
    # new_df3.show()
    # exit(0)

    new_df3.registerTempTable('d')

    # SQL DF Experiment
    new_df4 = spark.sql('Select `' + item + '` as
bucketed_col_of_source, Count_0/(select sum(d.Count_0) as sum from d)
as Prop_0, ' +
                        'Count_1/(select sum(d.Count_1) as sum from d)
as Prop_1 from d')

    spark.catalog.dropTempView('d')
    # new_df4.show()
    # exit(0)
    new_df4.registerTempTable('e')

    new_df5 = spark.sql('Select *, case when log(e.Prop_0/e.Prop_1) IS
NULL then 0 else log(e.Prop_0/e.Prop_1) end as WoE from e')

    spark.catalog.dropTempView('e')

    # print('Problem starts here: ')
    # new_df5.show()

    new_df5.registerTempTable('WoE_table')

    joined_Train_DF = spark.sql('Select bucketed.*, WoE_table.WoE as `' + item +
                          '_WoE` from a bucketed inner join WoE_table
on bucketed.`' + item +
                          '` = WoE_table.bucketed_col_of_source')

    # joined_Train_DF.show()
    joined_Test_DF = spark.sql('Select bucketed.*, WoE_table.WoE as `' + item +
                          '_WoE` from test_data bucketed inner join
WoE_table on bucketed.`' + item +
                          '` = WoE_table.bucketed_col_of_source')

    if validation:
        joined_Validation_DF = spark.sql('Select bucketed.*,
WoE_table.WoE as `' + item +
                                   '_WoE` from validation_data
bucketed inner join WoE_table on bucketed.`' + item +
                                   '` = WoE_table.bucketed_col_of_source')
        WoE_Validation_DF = joined_Validation_DF

    spark.catalog.dropTempView('WoE_table')

    WoE_Train_DF = joined_Train_DF
    WoE_Test_DF = joined_Test_DF
    col_len = len(categorical_col)
    if col_len > 1:
        WoE_Train_DF.registerTempTable('a')
        WoE_Test_DF.registerTempTable('test_data')
        if validation:
            # print('got inside...')
            WoE_Validation_DF.registerTempTable('validation_data')

Any help?

Thanks,
Aakash.