You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jean-Marc Montanier (Jira)" <ji...@apache.org> on 2019/12/31 09:00:00 UTC

[jira] [Created] (SPARK-30397) [pyspark] Writer applied to custom model changes type of keys' dict from int to str

Jean-Marc Montanier created SPARK-30397:
-------------------------------------------

             Summary: [pyspark] Writer applied to custom model changes type of keys' dict from int to str
                 Key: SPARK-30397
                 URL: https://issues.apache.org/jira/browse/SPARK-30397
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.4.4
            Reporter: Jean-Marc Montanier


Hello,

 

I have a custom model that I'm trying to persist. Within this custom model there is a python dict mapping from int to int. When the model is saved (with write().save('path')), the keys of the dict are modified from int to str.

 

You can find bellow a code to reproduce the issue:
{code:python}
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@author: Jean-Marc Montanier
@date: 2019/12/31
"""

from pyspark.sql import SparkSession

from pyspark import keyword_only
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml import Estimator, Model
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.param import Param, Params
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf


spark = SparkSession \
    .builder \
    .appName("ImputeNormal") \
    .getOrCreate()


class CustomFit(Estimator,
                HasInputCol,
                HasOutputCol,
                DefaultParamsReadable,
                DefaultParamsWritable,
                ):
    @keyword_only
    def __init__(self, inputCol="inputCol", outputCol="outputCol"):
        super(CustomFit, self).__init__()

        self._setDefault(inputCol="inputCol", outputCol="outputCol")
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol="inputCol", outputCol="outputCol"):
        """
        setParams(self, inputCol="inputCol", outputCol="outputCol")
        """
        kwargs = self._input_kwargs
        self._set(**kwargs)
        return self

    def _fit(self, data):
        inputCol = self.getInputCol()
        outputCol = self.getOutputCol()

        categories = data.where(data[inputCol].isNotNull()) \
            .groupby(inputCol) \
            .count() \
            .orderBy("count", ascending=False) \
            .limit(2)
        categories = dict(categories.toPandas().set_index(inputCol)["count"])
        for cat in categories:
            categories[cat] = int(categories[cat])

        return CustomModel(categories=categories,
                           input_col=inputCol,
                           output_col=outputCol)


class CustomModel(Model,
                  DefaultParamsReadable,
                  DefaultParamsWritable):

    input_col = Param(Params._dummy(), "input_col", "Name of the input column")
    output_col = Param(Params._dummy(), "output_col", "Name of the output column")
    categories = Param(Params._dummy(), "categories", "Top categories")

    def __init__(self, categories: dict = None, input_col="input_col", output_col="output_col"):
        super(CustomModel, self).__init__()

        self._set(categories=categories, input_col=input_col, output_col=output_col)

    def get_output_col(self) -> str:
        """
        output_col getter
        :return:
        """
        return self.getOrDefault(self.output_col)

    def get_input_col(self) -> str:
        """
        input_col getter
        :return:
        """
        return self.getOrDefault(self.input_col)

    def get_categories(self):
        """
        categories getter
        :return:
        """
        return self.getOrDefault(self.categories)

    def _transform(self, data):
        input_col = self.get_input_col()
        output_col = self.get_output_col()
        categories = self.get_categories()

        def get_cat(val):
            if val is None:
                return -1
            if val not in categories:
                return -1
            return int(categories[val])

        get_cat_udf = udf(get_cat, IntegerType())

        df = data.withColumn(output_col,
                             get_cat_udf(input_col))

        return df


def test_without_write():
    fit_df = spark.createDataFrame([[10]] * 5 + [[11]] * 4 + [[12]] * 3 + [[None]] * 2, ['input'])
    custom_fit = CustomFit(inputCol='input', outputCol='output')
    pipeline = Pipeline(stages=[custom_fit])
    pipeline_model = pipeline.fit(fit_df)

    print("Categories: {}".format(pipeline_model.stages[0].get_categories()))

    transform_df = spark.createDataFrame([[10]] * 2 + [[11]] * 2 + [[12]] * 2 + [[None]] * 2, ['input'])
    test = pipeline_model.transform(transform_df)
    test.show()  # This output is the expected output


def test_with_write():
    fit_df = spark.createDataFrame([[10]] * 5 + [[11]] * 4 + [[12]] * 3 + [[None]] * 2, ['input'])
    custom_fit = CustomFit(inputCol='input', outputCol='output')
    pipeline = Pipeline(stages=[custom_fit])
    pipeline_model = pipeline.fit(fit_df)

    print("Categories: {}".format(pipeline_model.stages[0].get_categories()))

    pipeline_model.write().save('tmp')
    loaded_model = PipelineModel.load('tmp')
    # We can see that the type of the keys is know str instead of int
    print("Categories: {}".format(loaded_model.stages[0].get_categories()))

    transform_df = spark.createDataFrame([[10]] * 2 + [[11]] * 2 + [[12]] * 2 + [[None]] * 2, ['input'])
    test = loaded_model.transform(transform_df)
    test.show()  # We can see that the output does not match the expected output


if __name__ == "__main__":
    test_without_write()
    test_with_write()

{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org