You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Boris Clémençon (JIRA)" <ji...@apache.org> on 2017/02/21 23:47:44 UTC
[jira] [Commented] (SPARK-19681) save and load pipeline and then
use it yield java.lang.RuntimeException
[ https://issues.apache.org/jira/browse/SPARK-19681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15877033#comment-15877033 ]
Boris Clémençon commented on SPARK-19681:
------------------------------------------
EDIT:
the problem is in Compare.assertDataFrameEquals, not in Spark. I close the ticket.
> save and load pipeline and then use it yield java.lang.RuntimeException
> -----------------------------------------------------------------------
>
> Key: SPARK-19681
> URL: https://issues.apache.org/jira/browse/SPARK-19681
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.1.0
> Reporter: Boris Clémençon
> Labels: spark-ml
>
> Here is the unit test that fails:
> import org.apache.spark.SparkConf
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.classification.LogisticRegression
> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
> import org.apache.spark.ml.feature.{SQLTransformer, VectorAssembler}
> import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder}
> import org.apache.spark.sql.{DataFrame, SparkSession}
> import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
> import scala.util.Random
> /**
> * Created by borisclemencon on 21/02/2017.
> */
> class PipelineTest extends FlatSpec with Matchers with BeforeAndAfter {
> val featuresCol = "features"
> val responseCol = "response"
> val weightCol = "weight"
> val features = Array("X1", "X2")
> val lambdas = Array(0.01)
> val alpha = 0.2
> val maxIter = 50
> val nfolds = 5
> var spark: SparkSession = _
> before {
> val sparkConf: SparkConf = new SparkConf().
> set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
> set("spark.ui.enabled", "false"). // faster and remove 'spark test java.net.BindException: Address already in use' warnings!
> set("spark.driver.host", "127.0.0.1")
> spark = SparkSession.
> builder().
> config(sparkConf).
> appName("BlendWeightTransformerTest").
> master("local[*]").
> getOrCreate()
> }
> def makeDataset(n: Int = 100): DataFrame = {
> val sc = spark
> import sc.implicits._
> val n = 1000
> val data =
> for (i <- 1 to n) yield {
> val pn = if (Random.nextDouble() < 0.1) "a" else "b"
> val x1: Double = Random.nextGaussian() * 5
> val x2: Double = Random.nextGaussian() * 2
> val response: Int = if (Random.nextBoolean()) 1 else 0
> (pn, x1, x2, response)
> }
> data.toDF(packageNameCol, "X1", "X2", responseCol)
> }
> "load()" should "produce the same pipeline and result before and after save()" in {
> val lr = new LogisticRegression().
> setFitIntercept(true).
> setMaxIter(maxIter).
> setElasticNetParam(alpha).
> setStandardization(true).
> setFamily("binomial").
> setFeaturesCol(featuresCol).
> setLabelCol(responseCol)
> val assembler = new VectorAssembler().setInputCols(features).setOutputCol(featuresCol)
> val pipeline = new Pipeline().setStages(Array(assembler, lr))
> val evaluator = new BinaryClassificationEvaluator().
> setLabelCol(responseCol).
> setMetricName("areaUnderROC")
> val paramGrid = new ParamGridBuilder().
> addGrid(lr.regParam, lambdas).
> build()
> // Train with simple grid cross validation
> val cv = new CrossValidator().
> setEstimator(pipeline).
> setEvaluator(evaluator).
> setEstimatorParamMaps(paramGrid).
> setNumFolds(nfolds) // Use 3+ in practice
> val df = makeDataset(100).cache
> val cvModel = cv.fit(df)
> val answer = cvModel.transform(df)
> answer.show(truncate = false)
> val path = "./PipelineTestcvModel"
> cvModel.write.overwrite().save(path)
> val cvModelLoaded = CrossValidatorModel.load(path)
> val output = cvModelLoaded.transform(df)
> output.show(truncate = false)
> Compare.assertDataFrameEquals(answer, output)
> }
> }
> yield exception
> should produce the same blent pipeline and result before and after save() *** FAILED ***
> [info] java.lang.RuntimeException: no default for type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7
> [info] at org.apache.spark.sql.catalyst.expressions.Literal$.default(literals.scala:179)
> [info] at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:121)
> [info] at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:114)
> [info] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> [info] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> [info] at scala.collection.immutable.List.foreach(List.scala:381)
> [info] at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> [info] at scala.collection.immutable.List.flatMap(List.scala:344)
> [info] at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$.unapply(patterns.scala:114)
> [info] at org.apache.spark.sql.execution.SparkStrategies$JoinSelection$.apply(SparkStrategies.scala:158)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org