You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Boris Clémençon (JIRA)" <ji...@apache.org> on 2017/02/21 18:00:47 UTC

[jira] [Created] (SPARK-19681) save and load pipeline and then use it yield java.lang.RuntimeException

Boris Clémençon  created SPARK-19681:
----------------------------------------

             Summary: save and load pipeline and then use it yield java.lang.RuntimeException
                 Key: SPARK-19681
                 URL: https://issues.apache.org/jira/browse/SPARK-19681
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.1.0
            Reporter: Boris Clémençon 


Here is the unit test that fails:


import org.apache.spark.SparkConf
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{SQLTransformer, VectorAssembler}
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}

import scala.util.Random


/**
  * Created by borisclemencon on 21/02/2017.
  */
class PipelineTest extends FlatSpec with Matchers with BeforeAndAfter {

  val featuresCol = "features"
  val responseCol = "response"
  val weightCol = "weight"
  val features = Array("X1", "X2")
  val lambdas = Array(0.01)

  val alpha = 0.2
  val maxIter = 50
  val nfolds = 5

  var spark: SparkSession = _

  before {
    val sparkConf: SparkConf = new SparkConf().
      set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
      set("spark.ui.enabled", "false"). // faster and remove 'spark test java.net.BindException: Address already in use' warnings!
      set("spark.driver.host", "127.0.0.1")

    spark = SparkSession.
      builder().
      config(sparkConf).
      appName("BlendWeightTransformerTest").
      master("local[*]").
      getOrCreate()
  }


  def makeDataset(n: Int = 100): DataFrame = {
    val sc = spark
    import sc.implicits._
    val n = 1000
    val data =
      for (i <- 1 to n) yield {
        val pn = if (Random.nextDouble() < 0.1) "a" else "b"
        val x1: Double = Random.nextGaussian() * 5
        val x2: Double = Random.nextGaussian() * 2
        val response: Int = if (Random.nextBoolean()) 1 else 0
        (pn, x1, x2, response)
      }
    data.toDF(packageNameCol, "X1", "X2", responseCol)
  }

  "load()" should "produce the same pipeline and result before and after save()" in {

    val lr = new LogisticRegression().
      setFitIntercept(true).
      setMaxIter(maxIter).
      setElasticNetParam(alpha).
      setStandardization(true).
      setFamily("binomial").
      setFeaturesCol(featuresCol).
      setLabelCol(responseCol)

    val assembler = new VectorAssembler().setInputCols(features).setOutputCol(featuresCol)
    val pipeline = new Pipeline().setStages(Array(assembler, lr))
    val evaluator = new BinaryClassificationEvaluator().
      setLabelCol(responseCol).
      setMetricName("areaUnderROC")
    val paramGrid = new ParamGridBuilder().
      addGrid(lr.regParam, lambdas).
      build()

    // Train with simple grid cross validation
    val cv = new CrossValidator().
      setEstimator(pipeline).
      setEvaluator(evaluator).
      setEstimatorParamMaps(paramGrid).
      setNumFolds(nfolds) // Use 3+ in practice

    val df = makeDataset(100).cache
    val cvModel = cv.fit(df)

    val answer = cvModel.transform(df)
    answer.show(truncate = false)

    val path = "./PipelineTestcvModel"
    cvModel.write.overwrite().save(path)

    val cvModelLoaded = CrossValidatorModel.load(path)
    val output = cvModelLoaded.transform(df)
    output.show(truncate = false)
    Compare.assertDataFrameEquals(answer, output)
  }
}

yield exception

should produce the same blent pipeline and result before and after save() *** FAILED ***
[info]   java.lang.RuntimeException: no default for type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7
[info]   at org.apache.spark.sql.catalyst.expressions.Literal$.default(literals.scala:179)
[info]   at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:121)
[info]   at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:114)
[info]   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
[info]   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
[info]   at scala.collection.immutable.List.foreach(List.scala:381)
[info]   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
[info]   at scala.collection.immutable.List.flatMap(List.scala:344)
[info]   at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$.unapply(patterns.scala:114)
[info]   at org.apache.spark.sql.execution.SparkStrategies$JoinSelection$.apply(SparkStrategies.scala:158)




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org