You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Boris Clémençon (JIRA)" <ji...@apache.org> on 2017/02/21 18:00:47 UTC
[jira] [Created] (SPARK-19681) save and load pipeline and then use
it yield java.lang.RuntimeException
Boris Clémençon created SPARK-19681:
----------------------------------------
Summary: save and load pipeline and then use it yield java.lang.RuntimeException
Key: SPARK-19681
URL: https://issues.apache.org/jira/browse/SPARK-19681
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 2.1.0
Reporter: Boris Clémençon
Here is the unit test that fails:
import org.apache.spark.SparkConf
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{SQLTransformer, VectorAssembler}
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import scala.util.Random
/**
* Created by borisclemencon on 21/02/2017.
*/
class PipelineTest extends FlatSpec with Matchers with BeforeAndAfter {
val featuresCol = "features"
val responseCol = "response"
val weightCol = "weight"
val features = Array("X1", "X2")
val lambdas = Array(0.01)
val alpha = 0.2
val maxIter = 50
val nfolds = 5
var spark: SparkSession = _
before {
val sparkConf: SparkConf = new SparkConf().
set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
set("spark.ui.enabled", "false"). // faster and remove 'spark test java.net.BindException: Address already in use' warnings!
set("spark.driver.host", "127.0.0.1")
spark = SparkSession.
builder().
config(sparkConf).
appName("BlendWeightTransformerTest").
master("local[*]").
getOrCreate()
}
def makeDataset(n: Int = 100): DataFrame = {
val sc = spark
import sc.implicits._
val n = 1000
val data =
for (i <- 1 to n) yield {
val pn = if (Random.nextDouble() < 0.1) "a" else "b"
val x1: Double = Random.nextGaussian() * 5
val x2: Double = Random.nextGaussian() * 2
val response: Int = if (Random.nextBoolean()) 1 else 0
(pn, x1, x2, response)
}
data.toDF(packageNameCol, "X1", "X2", responseCol)
}
"load()" should "produce the same pipeline and result before and after save()" in {
val lr = new LogisticRegression().
setFitIntercept(true).
setMaxIter(maxIter).
setElasticNetParam(alpha).
setStandardization(true).
setFamily("binomial").
setFeaturesCol(featuresCol).
setLabelCol(responseCol)
val assembler = new VectorAssembler().setInputCols(features).setOutputCol(featuresCol)
val pipeline = new Pipeline().setStages(Array(assembler, lr))
val evaluator = new BinaryClassificationEvaluator().
setLabelCol(responseCol).
setMetricName("areaUnderROC")
val paramGrid = new ParamGridBuilder().
addGrid(lr.regParam, lambdas).
build()
// Train with simple grid cross validation
val cv = new CrossValidator().
setEstimator(pipeline).
setEvaluator(evaluator).
setEstimatorParamMaps(paramGrid).
setNumFolds(nfolds) // Use 3+ in practice
val df = makeDataset(100).cache
val cvModel = cv.fit(df)
val answer = cvModel.transform(df)
answer.show(truncate = false)
val path = "./PipelineTestcvModel"
cvModel.write.overwrite().save(path)
val cvModelLoaded = CrossValidatorModel.load(path)
val output = cvModelLoaded.transform(df)
output.show(truncate = false)
Compare.assertDataFrameEquals(answer, output)
}
}
yield exception
should produce the same blent pipeline and result before and after save() *** FAILED ***
[info] java.lang.RuntimeException: no default for type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7
[info] at org.apache.spark.sql.catalyst.expressions.Literal$.default(literals.scala:179)
[info] at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:121)
[info] at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:114)
[info] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
[info] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
[info] at scala.collection.immutable.List.foreach(List.scala:381)
[info] at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
[info] at scala.collection.immutable.List.flatMap(List.scala:344)
[info] at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$.unapply(patterns.scala:114)
[info] at org.apache.spark.sql.execution.SparkStrategies$JoinSelection$.apply(SparkStrategies.scala:158)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org