You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2020/01/06 06:08:00 UTC
[jira] [Resolved] (SPARK-30397) [pyspark] Writer applied to custom
model changes type of keys' dict from int to str
[ https://issues.apache.org/jira/browse/SPARK-30397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-30397.
----------------------------------
Resolution: Not A Problem
> [pyspark] Writer applied to custom model changes type of keys' dict from int to str
> -----------------------------------------------------------------------------------
>
> Key: SPARK-30397
> URL: https://issues.apache.org/jira/browse/SPARK-30397
> Project: Spark
> Issue Type: Bug
> Components: ML, PySpark
> Affects Versions: 2.4.4
> Reporter: Jean-Marc Montanier
> Priority: Major
>
> Hello,
>
> I have a custom model that I'm trying to persist. Within this custom model there is a python dict mapping from int to int. When the model is saved (with write().save('path')), the keys of the dict are modified from int to str.
>
> You can find bellow a code to reproduce the issue:
> {code:python}
> #!/usr/bin/env python3
> # -*- coding: utf-8 -*-
> """
> @author: Jean-Marc Montanier
> @date: 2019/12/31
> """
> from pyspark.sql import SparkSession
> from pyspark import keyword_only
> from pyspark.ml import Pipeline, PipelineModel
> from pyspark.ml import Estimator, Model
> from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
> from pyspark.ml.param import Param, Params
> from pyspark.ml.param.shared import HasInputCol, HasOutputCol
> from pyspark.sql.types import IntegerType
> from pyspark.sql.functions import udf
> spark = SparkSession \
> .builder \
> .appName("ImputeNormal") \
> .getOrCreate()
> class CustomFit(Estimator,
> HasInputCol,
> HasOutputCol,
> DefaultParamsReadable,
> DefaultParamsWritable,
> ):
> @keyword_only
> def __init__(self, inputCol="inputCol", outputCol="outputCol"):
> super(CustomFit, self).__init__()
> self._setDefault(inputCol="inputCol", outputCol="outputCol")
> kwargs = self._input_kwargs
> self.setParams(**kwargs)
> @keyword_only
> def setParams(self, inputCol="inputCol", outputCol="outputCol"):
> """
> setParams(self, inputCol="inputCol", outputCol="outputCol")
> """
> kwargs = self._input_kwargs
> self._set(**kwargs)
> return self
> def _fit(self, data):
> inputCol = self.getInputCol()
> outputCol = self.getOutputCol()
> categories = data.where(data[inputCol].isNotNull()) \
> .groupby(inputCol) \
> .count() \
> .orderBy("count", ascending=False) \
> .limit(2)
> categories = dict(categories.toPandas().set_index(inputCol)["count"])
> for cat in categories:
> categories[cat] = int(categories[cat])
> return CustomModel(categories=categories,
> input_col=inputCol,
> output_col=outputCol)
> class CustomModel(Model,
> DefaultParamsReadable,
> DefaultParamsWritable):
> input_col = Param(Params._dummy(), "input_col", "Name of the input column")
> output_col = Param(Params._dummy(), "output_col", "Name of the output column")
> categories = Param(Params._dummy(), "categories", "Top categories")
> def __init__(self, categories: dict = None, input_col="input_col", output_col="output_col"):
> super(CustomModel, self).__init__()
> self._set(categories=categories, input_col=input_col, output_col=output_col)
> def get_output_col(self) -> str:
> """
> output_col getter
> :return:
> """
> return self.getOrDefault(self.output_col)
> def get_input_col(self) -> str:
> """
> input_col getter
> :return:
> """
> return self.getOrDefault(self.input_col)
> def get_categories(self):
> """
> categories getter
> :return:
> """
> return self.getOrDefault(self.categories)
> def _transform(self, data):
> input_col = self.get_input_col()
> output_col = self.get_output_col()
> categories = self.get_categories()
> def get_cat(val):
> if val is None:
> return -1
> if val not in categories:
> return -1
> return int(categories[val])
> get_cat_udf = udf(get_cat, IntegerType())
> df = data.withColumn(output_col,
> get_cat_udf(input_col))
> return df
> def test_without_write():
> fit_df = spark.createDataFrame([[10]] * 5 + [[11]] * 4 + [[12]] * 3 + [[None]] * 2, ['input'])
> custom_fit = CustomFit(inputCol='input', outputCol='output')
> pipeline = Pipeline(stages=[custom_fit])
> pipeline_model = pipeline.fit(fit_df)
> print("Categories: {}".format(pipeline_model.stages[0].get_categories()))
> transform_df = spark.createDataFrame([[10]] * 2 + [[11]] * 2 + [[12]] * 2 + [[None]] * 2, ['input'])
> test = pipeline_model.transform(transform_df)
> test.show() # This output is the expected output
> def test_with_write():
> fit_df = spark.createDataFrame([[10]] * 5 + [[11]] * 4 + [[12]] * 3 + [[None]] * 2, ['input'])
> custom_fit = CustomFit(inputCol='input', outputCol='output')
> pipeline = Pipeline(stages=[custom_fit])
> pipeline_model = pipeline.fit(fit_df)
> print("Categories: {}".format(pipeline_model.stages[0].get_categories()))
> pipeline_model.write().save('tmp')
> loaded_model = PipelineModel.load('tmp')
> # We can see that the type of the keys is know str instead of int
> print("Categories: {}".format(loaded_model.stages[0].get_categories()))
> transform_df = spark.createDataFrame([[10]] * 2 + [[11]] * 2 + [[12]] * 2 + [[None]] * 2, ['input'])
> test = loaded_model.transform(transform_df)
> test.show() # We can see that the output does not match the expected output
> if __name__ == "__main__":
> test_without_write()
> test_with_write()
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org