You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Chester Gan <c....@gmail.com> on 2022/01/07 05:04:20 UTC

Proposed additional function to create fold_column for better integration of Spark data frames with H2O

Idea: PySpark function to create fold indices (numbers from 0, ..., N-1,
where N := number of folds needed for k-fold CV during auto ML training) on
train & test datasets

```
# train & test are PySpark dataframes of the train & test datasets
respectively
import pyspark.sql.functions as F
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

def create_fold_column(input_df,
list_of_fold_group_columns=list_of_fold_group_columns,
fold_column=fold_column, nfolds=nfolds):

    if fold_column:
        # split_into_frikkin_folds... we get a list of Spark dataframes of
group ids
        fold_group_ids_list_of_dataframes =
input_df.drop_duplicates(subset=list_of_fold_group_columns)[list_of_fold_group_columns].randomSplit(nfolds
* [1 / nfolds], seed=42)

        for index in range(0, len(fold_group_ids_list_of_dataframes)):
            fold_group_ids_list_of_dataframes[index] =
fold_group_ids_list_of_dataframes[index].withColumn(fold_column,
F.lit(index))

        fold_groups_ids_dataframes_union =
unionAll(*fold_group_ids_list_of_dataframes)
        input_df = input_df.join(fold_groups_ids_dataframes_union,
on=list_of_fold_group_columns)

    return(input_df)

train = train.transform(create_fold_column)
# Dummy fold_column with single number, nfolds (typically 5), to prevent
H2O from error-ing out
if fold_column:
    test = test.withColumn(fold_column, F.lit(nfolds))

```

Re: Proposed additional function to create fold_column for better integration of Spark data frames with H2O

Posted by Chester Gan <c....@gmail.com>.
Another possible workaround, when creating an ML pipeline with PySpark and
Python's H2O API, would be to first convert the PySpark dataframes to H2O
dataframes, then do the following:


   1. Create a new dataframe from the feature dataframe using
   drop_duplicates (call this group_df), with columns argument = list of
   columns that form the group
   https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.drop_duplicates
   :
   2. Create a new, single-column dataframe (call this kfold_df) from
   group_df using kfold_column:
   https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.kfold_column
   3. Column-bind `kfold_df` back onto `group_df` with cbind:
   https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.cbind
   4. Merge group_df back to the feature dataframe on the common columns
   (which would be the group columns specified in step 1. above) to create the
   feature dataset with fold_column:
   https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-munging/merging-data.html.
   Observations with the same values in the group columns will have the same
   fold index in the fold_column.
   5. Configure the parameters of the H2O AutoML job, as per
   https://docs.h2o.ai/h2o/latest-stable/h2o-docs/automl.html, ensuring
   that fold_column argument = name of the column created from calling
   kfold_column:
   https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/algo-params/fold_column.html,
   class imbalance is rectified and so on.

Any suggestions? Are there already similar wrapper functions in PySpark or
H2O that assign fold indices based on groups of observations, rather than
individual observations?
Should I propose this in the dev@spark.apache.org mailing list??

On Wed, Jan 12, 2022 at 2:34 PM Chester Gan <c....@gmail.com> wrote:

> Another workaround, when creating an ML pipeline with PySpark and Python's
> H2O API, would be to first convert the PySpark dataframes to H2O
> dataframes, then do the following:
>
>
>    1. Create a new dataframe from the feature dataframe using
>    drop_duplicates (call this group_df), with columns argument = list of
>    columns that form the group
>    https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.drop_duplicates
>    :
>       1. [image: image.png]
>    2. Create a new, single-column dataframe (call this kfold_df) from
>    group_df using kfold_column:
>    https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.kfold_column
>       1. [image: image.png]
>    3. Column-bind `kfold_df` back onto `group_df` with cbind:
>    https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.cbind
>       1. [image: image.png]
>       4. Merge group_df back to the feature dataframe on the common
>    columns (which would be the group columns specified in step 1. above) to
>    create the feature dataset with fold_column:
>    https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-munging/merging-data.html.
>    Observations with the same values in the group columns will have the same
>    fold index in the fold_column.
>    1. [image: image.png]
>    5. Configure the parameters of the H2O AutoML job, as per
>    https://docs.h2o.ai/h2o/latest-stable/h2o-docs/automl.html, ensuring
>    that fold_column argument = name of the column created from calling
>    kfold_column:
>    https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/algo-params/fold_column.html,
>    class imbalance is rectified and so on.
>       1. [image: image.png]
>
>
> On Wed, Jan 12, 2022 at 2:06 PM Chester Gan <c....@gmail.com> wrote:
>
>> Has anyone else used PySpark dataframes in conjunction with H2O for ML
>> pipelining, and have had to use custom folds to keep rows/observations of
>> the same group (e.g. user account, vehicle, city or whatever) in the same
>> validation fold, so as to prevent data leakage during cross-validation??
>>
>> On Fri, Jan 7, 2022 at 1:04 PM Chester Gan <c....@gmail.com> wrote:
>>
>>> Idea: PySpark function to create fold indices (numbers from 0, ..., N-1,
>>> where N := number of folds needed for k-fold CV during auto ML training) on
>>> train & test datasets
>>>
>>> ```
>>> # train & test are PySpark dataframes of the train & test datasets
>>> respectively
>>> import pyspark.sql.functions as F
>>> from functools import reduce  # For Python 3.x
>>> from pyspark.sql import DataFrame
>>> def unionAll(*dfs):
>>>     return reduce(DataFrame.unionAll, dfs)
>>>
>>> def create_fold_column(input_df,
>>> list_of_fold_group_columns=list_of_fold_group_columns,
>>> fold_column=fold_column, nfolds=nfolds):
>>>
>>>     if fold_column:
>>>         # split_into_frikkin_folds... we get a list of Spark dataframes
>>> of group ids
>>>         fold_group_ids_list_of_dataframes =
>>> input_df.drop_duplicates(subset=list_of_fold_group_columns)[list_of_fold_group_columns].randomSplit(nfolds
>>> * [1 / nfolds], seed=42)
>>>
>>>         for index in range(0, len(fold_group_ids_list_of_dataframes)):
>>>             fold_group_ids_list_of_dataframes[index] =
>>> fold_group_ids_list_of_dataframes[index].withColumn(fold_column,
>>> F.lit(index))
>>>
>>>         fold_groups_ids_dataframes_union =
>>> unionAll(*fold_group_ids_list_of_dataframes)
>>>         input_df = input_df.join(fold_groups_ids_dataframes_union,
>>> on=list_of_fold_group_columns)
>>>
>>>     return(input_df)
>>>
>>> train = train.transform(create_fold_column)
>>> # Dummy fold_column with single number, nfolds (typically 5), to prevent
>>> H2O from error-ing out
>>> if fold_column:
>>>     test = test.withColumn(fold_column, F.lit(nfolds))
>>>
>>> ```
>>>
>>

Re: Proposed additional function to create fold_column for better integration of Spark data frames with H2O

Posted by Chester Gan <c....@gmail.com>.
Another workaround, when creating an ML pipeline with PySpark and Python's
H2O API, would be to first convert the PySpark dataframes to H2O
dataframes, then do the following:


   1. Create a new dataframe from the feature dataframe using
   drop_duplicates (call this group_df), with columns argument = list of
   columns that form the group
   https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.drop_duplicates
   :
      1. [image: image.png]
   2. Create a new, single-column dataframe (call this kfold_df) from
   group_df using kfold_column:
   https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.kfold_column
      1. [image: image.png]
   3. Column-bind `kfold_df` back onto `group_df` with cbind:
   https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.cbind
      1. [image: image.png]
      4. Merge group_df back to the feature dataframe on the common columns
   (which would be the group columns specified in step 1. above) to create the
   feature dataset with fold_column:
   https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-munging/merging-data.html.
   Observations with the same values in the group columns will have the same
   fold index in the fold_column.
   1. [image: image.png]
   5. Configure the parameters of the H2O AutoML job, as per
   https://docs.h2o.ai/h2o/latest-stable/h2o-docs/automl.html, ensuring
   that fold_column argument = name of the column created from calling
   kfold_column:
   https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/algo-params/fold_column.html,
   class imbalance is rectified and so on.
      1. [image: image.png]


On Wed, Jan 12, 2022 at 2:06 PM Chester Gan <c....@gmail.com> wrote:

> Has anyone else used PySpark dataframes in conjunction with H2O for ML
> pipelining, and have had to use custom folds to keep rows/observations of
> the same group (e.g. user account, vehicle, city or whatever) in the same
> validation fold, so as to prevent data leakage during cross-validation??
>
> On Fri, Jan 7, 2022 at 1:04 PM Chester Gan <c....@gmail.com> wrote:
>
>> Idea: PySpark function to create fold indices (numbers from 0, ..., N-1,
>> where N := number of folds needed for k-fold CV during auto ML training) on
>> train & test datasets
>>
>> ```
>> # train & test are PySpark dataframes of the train & test datasets
>> respectively
>> import pyspark.sql.functions as F
>> from functools import reduce  # For Python 3.x
>> from pyspark.sql import DataFrame
>> def unionAll(*dfs):
>>     return reduce(DataFrame.unionAll, dfs)
>>
>> def create_fold_column(input_df,
>> list_of_fold_group_columns=list_of_fold_group_columns,
>> fold_column=fold_column, nfolds=nfolds):
>>
>>     if fold_column:
>>         # split_into_frikkin_folds... we get a list of Spark dataframes
>> of group ids
>>         fold_group_ids_list_of_dataframes =
>> input_df.drop_duplicates(subset=list_of_fold_group_columns)[list_of_fold_group_columns].randomSplit(nfolds
>> * [1 / nfolds], seed=42)
>>
>>         for index in range(0, len(fold_group_ids_list_of_dataframes)):
>>             fold_group_ids_list_of_dataframes[index] =
>> fold_group_ids_list_of_dataframes[index].withColumn(fold_column,
>> F.lit(index))
>>
>>         fold_groups_ids_dataframes_union =
>> unionAll(*fold_group_ids_list_of_dataframes)
>>         input_df = input_df.join(fold_groups_ids_dataframes_union,
>> on=list_of_fold_group_columns)
>>
>>     return(input_df)
>>
>> train = train.transform(create_fold_column)
>> # Dummy fold_column with single number, nfolds (typically 5), to prevent
>> H2O from error-ing out
>> if fold_column:
>>     test = test.withColumn(fold_column, F.lit(nfolds))
>>
>> ```
>>
>

Re: Proposed additional function to create fold_column for better integration of Spark data frames with H2O

Posted by Chester Gan <c....@gmail.com>.
Has anyone else used PySpark dataframes in conjunction with H2O for ML
pipelining, and have had to use custom folds to keep rows/observations of
the same group (e.g. user account, vehicle, city or whatever) in the same
validation fold, so as to prevent data leakage during cross-validation??

On Fri, Jan 7, 2022 at 1:04 PM Chester Gan <c....@gmail.com> wrote:

> Idea: PySpark function to create fold indices (numbers from 0, ..., N-1,
> where N := number of folds needed for k-fold CV during auto ML training) on
> train & test datasets
>
> ```
> # train & test are PySpark dataframes of the train & test datasets
> respectively
> import pyspark.sql.functions as F
> from functools import reduce  # For Python 3.x
> from pyspark.sql import DataFrame
> def unionAll(*dfs):
>     return reduce(DataFrame.unionAll, dfs)
>
> def create_fold_column(input_df,
> list_of_fold_group_columns=list_of_fold_group_columns,
> fold_column=fold_column, nfolds=nfolds):
>
>     if fold_column:
>         # split_into_frikkin_folds... we get a list of Spark dataframes of
> group ids
>         fold_group_ids_list_of_dataframes =
> input_df.drop_duplicates(subset=list_of_fold_group_columns)[list_of_fold_group_columns].randomSplit(nfolds
> * [1 / nfolds], seed=42)
>
>         for index in range(0, len(fold_group_ids_list_of_dataframes)):
>             fold_group_ids_list_of_dataframes[index] =
> fold_group_ids_list_of_dataframes[index].withColumn(fold_column,
> F.lit(index))
>
>         fold_groups_ids_dataframes_union =
> unionAll(*fold_group_ids_list_of_dataframes)
>         input_df = input_df.join(fold_groups_ids_dataframes_union,
> on=list_of_fold_group_columns)
>
>     return(input_df)
>
> train = train.transform(create_fold_column)
> # Dummy fold_column with single number, nfolds (typically 5), to prevent
> H2O from error-ing out
> if fold_column:
>     test = test.withColumn(fold_column, F.lit(nfolds))
>
> ```
>

Re: Proposed additional function to create fold_column for better integration of Spark data frames with H2O

Posted by Chester Gan <c....@gmail.com>.
Another possible workaround, when creating an ML pipeline with PySpark and
Python's H2O API, would be to first convert the PySpark dataframes to H2O
dataframes, then do the following:


   1. Create a new dataframe from the feature dataframe using
   drop_duplicates (call this group_df), with columns argument = list of
   columns that form the group
   https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.drop_duplicates
   :
   2. Create a new, single-column dataframe (call this kfold_df) from
   group_df using kfold_column:
   https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.kfold_column
   3. Column-bind `kfold_df` back onto `group_df` with cbind:
   https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.cbind
   4. Merge group_df back to the feature dataframe on the common columns
   (which would be the group columns specified in step 1. above) to create the
   feature dataset with fold_column:
   https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-munging/merging-data.html.
   Observations with the same values in the group columns will have the same
   fold index in the fold_column.
   5. Configure the parameters of the H2O AutoML job, as per
   https://docs.h2o.ai/h2o/latest-stable/h2o-docs/automl.html, ensuring
   that fold_column argument = name of the column created from calling
   kfold_column:
   https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/algo-params/fold_column.html,
   class imbalance is rectified and so on.

Any suggestions? Are there already similar wrapper functions in PySpark or
H2O that assign fold indices based on groups of observations, rather than
individual observations?
Should I propose this in the dev@spark.apache.org mailing list??

On Fri, Jan 7, 2022 at 1:04 PM Chester Gan <c....@gmail.com> wrote:

> Idea: PySpark function to create fold indices (numbers from 0, ..., N-1,
> where N := number of folds needed for k-fold CV during auto ML training) on
> train & test datasets
>
> ```
> # train & test are PySpark dataframes of the train & test datasets
> respectively
> import pyspark.sql.functions as F
> from functools import reduce  # For Python 3.x
> from pyspark.sql import DataFrame
> def unionAll(*dfs):
>     return reduce(DataFrame.unionAll, dfs)
>
> def create_fold_column(input_df,
> list_of_fold_group_columns=list_of_fold_group_columns,
> fold_column=fold_column, nfolds=nfolds):
>
>     if fold_column:
>         # split_into_frikkin_folds... we get a list of Spark dataframes of
> group ids
>         fold_group_ids_list_of_dataframes =
> input_df.drop_duplicates(subset=list_of_fold_group_columns)[list_of_fold_group_columns].randomSplit(nfolds
> * [1 / nfolds], seed=42)
>
>         for index in range(0, len(fold_group_ids_list_of_dataframes)):
>             fold_group_ids_list_of_dataframes[index] =
> fold_group_ids_list_of_dataframes[index].withColumn(fold_column,
> F.lit(index))
>
>         fold_groups_ids_dataframes_union =
> unionAll(*fold_group_ids_list_of_dataframes)
>         input_df = input_df.join(fold_groups_ids_dataframes_union,
> on=list_of_fold_group_columns)
>
>     return(input_df)
>
> train = train.transform(create_fold_column)
> # Dummy fold_column with single number, nfolds (typically 5), to prevent
> H2O from error-ing out
> if fold_column:
>     test = test.withColumn(fold_column, F.lit(nfolds))
>
> ```
>