You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Reynold Xin (JIRA)" <ji...@apache.org> on 2017/03/24 23:05:41 UTC

[jira] [Resolved] (SPARK-19846) Add a flag to disable constraint propagation

     [ https://issues.apache.org/jira/browse/SPARK-19846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Reynold Xin resolved SPARK-19846.
---------------------------------
       Resolution: Fixed
         Assignee: Liang-Chi Hsieh
    Fix Version/s: 2.2.0

> Add a flag to disable constraint propagation
> --------------------------------------------
>
>                 Key: SPARK-19846
>                 URL: https://issues.apache.org/jira/browse/SPARK-19846
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: Liang-Chi Hsieh
>            Assignee: Liang-Chi Hsieh
>             Fix For: 2.2.0
>
>
> Constraint propagation can be computation expensive and block the driver execution for long time. For example, the below benchmark needs 30mins.
> Compared with other attempts to modify how constraints propagation works, this is a much simpler option: add a flag to disable constraint propagation.
> {code}
>     import org.apache.spark.ml.{Pipeline, PipelineStage}
>     import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
>     spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)
>     val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3, "baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
>     val indexers = df.columns.tail.map(c => new StringIndexer()
>       .setInputCol(c)
>       .setOutputCol(s"${c}_indexed")
>       .setHandleInvalid("skip"))
>     val encoders = indexers.map(indexer => new OneHotEncoder()
>       .setInputCol(indexer.getOutputCol)
>       .setOutputCol(s"${indexer.getOutputCol}_encoded")
>       .setDropLast(true))
>     val stages: Array[PipelineStage] = indexers ++ encoders
>     val pipeline = new Pipeline().setStages(stages)
>     val startTime = System.nanoTime
>     pipeline.fit(df).transform(df).show
>     val runningTime = System.nanoTime - startTime
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org