You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Marcello Leida (Jira)" <ji...@apache.org> on 2020/06/22 09:20:00 UTC

[jira] [Updated] (SPARK-32048) PySpark: error in serializing ML pipelines with training strategy and pipeline as estimator

     [ https://issues.apache.org/jira/browse/SPARK-32048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Marcello Leida updated SPARK-32048:
-----------------------------------
    Description: 
Hi all,

I get the following error when serializing a pipeline with a CrossValidation and/or TrainValidationSplit training strategy and an estimator of type Pipeline through pyspark:
{code:java}
AttributeError: 'Pipeline' object has no attribute '_transfer_param_map_to_java
{code}
In scala the serialization works without problems, so i assume the issue should be in pyspark

In case of using the LinearRegression as estimator the serialization is working properly.

I see that in the tests of CrossValidation and TrainValidatioSplit, there is not a test with Pipeline as an estimator.

I do not know if there is a workaround for this or another way to serialize the pipeline

Code for replicating the issue:
{code:java}
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
# Prepare training documents from a list of (id, text, label) tuples.
df = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 3.0)
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
lr = LogisticRegression(maxIter=10, regParam=0.001)
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features", numFeatures=1000)
#treeClassifier = DecisionTreeClassifier()
sub_pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
sub_pipeline2 = Pipeline(stages=[tokenizer, hashingTF])paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()pipeline_cv = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)
cvPath = "/tmp/cv"
pipeline_cv.write().overwrite().save(cvPath)
model = pipeline_cv.fit(sub_pipeline2.fit(df).transform(df))
model.write().overwrite().save(cvPath)
pipeline_cv2 = CrossValidator(estimator=sub_pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)
cvPath = "/tmp/cv2"
model2 = pipeline_cv2.fit(df).bestModel
model2.write().overwrite().save(cvPath)
pipeline_cv2.write().overwrite().save(cvPath)
{code}

  was:
Hi all,

I get the following error when serializing a pipeline with a CrossValidation and/or TrainValidationSplit training strategy and an estimator of type Pipeline through pyspark:
{code:java}
AttributeError: 'Pipeline' object has no attribute '_transfer_param_map_to_java
{code}
In scala the serialization works without problems, so i assume the issue should be in pyspark

In case of using the LinearRegression as estimator the serialization is working properly.

I see that in the tests of CrossValidation and TrainValidatioSplit, there is not a test with Pipeline as an estimator.

I do not know if there is a workaround for this or another way to serialize the pipeline

Code for replicating the issue:
{code:java}
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit# Prepare training documents from a list of (id, text, label) tuples.
df = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 3.0)
], ["id", "text", "label"])# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
lr = LogisticRegression(maxIter=10, regParam=0.001)
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features", numFeatures=1000)
#treeClassifier = DecisionTreeClassifier()sub_pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])sub_pipeline2 = Pipeline(stages=[tokenizer, hashingTF])paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()pipeline_cv = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)
cvPath = "/tmp/cv"
pipeline_cv.write().overwrite().save(cvPath)
model = pipeline_cv.fit(sub_pipeline2.fit(df).transform(df))
model.write().overwrite().save(cvPath)
pipeline_cv2 = CrossValidator(estimator=sub_pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)
cvPath = "/tmp/cv2"
model2 = pipeline_cv2.fit(df).bestModel
model2.write().overwrite().save(cvPath)
pipeline_cv2.write().overwrite().save(cvPath)
{code}


> PySpark: error in serializing ML pipelines with training strategy and pipeline as estimator
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-32048
>                 URL: https://issues.apache.org/jira/browse/SPARK-32048
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, PySpark
>    Affects Versions: 2.4.5
>            Reporter: Marcello Leida
>            Priority: Major
>
> Hi all,
> I get the following error when serializing a pipeline with a CrossValidation and/or TrainValidationSplit training strategy and an estimator of type Pipeline through pyspark:
> {code:java}
> AttributeError: 'Pipeline' object has no attribute '_transfer_param_map_to_java
> {code}
> In scala the serialization works without problems, so i assume the issue should be in pyspark
> In case of using the LinearRegression as estimator the serialization is working properly.
> I see that in the tests of CrossValidation and TrainValidatioSplit, there is not a test with Pipeline as an estimator.
> I do not know if there is a workaround for this or another way to serialize the pipeline
> Code for replicating the issue:
> {code:java}
> from pyspark.ml import Pipeline
> from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
> from pyspark.ml.evaluation import BinaryClassificationEvaluator
> from pyspark.ml.feature import HashingTF, Tokenizer
> from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
> # Prepare training documents from a list of (id, text, label) tuples.
> df = spark.createDataFrame([
>     (0, "a b c d e spark", 1.0),
>     (1, "b d", 0.0),
>     (2, "spark f g h", 1.0),
>     (3, "hadoop mapreduce", 3.0)
> ], ["id", "text", "label"])
> # Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
> lr = LogisticRegression(maxIter=10, regParam=0.001)
> tokenizer = Tokenizer(inputCol="text", outputCol="words")
> hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features", numFeatures=1000)
> #treeClassifier = DecisionTreeClassifier()
> sub_pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
> sub_pipeline2 = Pipeline(stages=[tokenizer, hashingTF])paramGrid = ParamGridBuilder() \
>     .addGrid(lr.regParam, [0.1, 0.01]) \
>     .build()pipeline_cv = CrossValidator(estimator=lr,
>                           estimatorParamMaps=paramGrid,
>                           evaluator=BinaryClassificationEvaluator(),
>                           numFolds=2)
> cvPath = "/tmp/cv"
> pipeline_cv.write().overwrite().save(cvPath)
> model = pipeline_cv.fit(sub_pipeline2.fit(df).transform(df))
> model.write().overwrite().save(cvPath)
> pipeline_cv2 = CrossValidator(estimator=sub_pipeline,
>                           estimatorParamMaps=paramGrid,
>                           evaluator=BinaryClassificationEvaluator(),
>                           numFolds=2)
> cvPath = "/tmp/cv2"
> model2 = pipeline_cv2.fit(df).bestModel
> model2.write().overwrite().save(cvPath)
> pipeline_cv2.write().overwrite().save(cvPath)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org