You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2015/06/29 07:43:58 UTC

spark git commit: [SPARK-8575] [SQL] Deprecate callUDF in favor of udf

Repository: spark
Updated Branches:
  refs/heads/master dfde31da5 -> 0b10662fe


[SPARK-8575] [SQL] Deprecate callUDF in favor of udf

Follow up of [SPARK-8356](https://issues.apache.org/jira/browse/SPARK-8356) and #6902.
Removes the unit test for the now deprecated ```callUdf```
Unit test in SQLQuerySuite now uses ```udf``` instead of ```callUDF```
Replaced ```callUDF``` by ```udf``` where possible in mllib

Author: BenFradet <be...@gmail.com>

Closes #6993 from BenFradet/SPARK-8575 and squashes the following commits:

26f5a7a [BenFradet] 2 spaces instead of 1
1ddb452 [BenFradet] renamed initUDF in order to be consistent in OneVsRest
48ca15e [BenFradet] used vector type tag for udf call in VectorIndexer
0ebd0da [BenFradet] replace the now deprecated callUDF by udf in VectorIndexer
8013409 [BenFradet] replaced the now deprecated callUDF by udf in Predictor
94345b5 [BenFradet] unifomized udf calls in ProbabilisticClassifier
1305492 [BenFradet] uniformized udf calls in Classifier
a672228 [BenFradet] uniformized udf calls in OneVsRest
49e4904 [BenFradet] Revert "removal of the unit test for the now deprecated callUdf"
bbdeaf3 [BenFradet] fixed syntax for init udf in OneVsRest
fe2a10b [BenFradet] callUDF => udf in ProbabilisticClassifier
0ea30b3 [BenFradet] callUDF => udf in Classifier where possible
197ec82 [BenFradet] callUDF => udf in OneVsRest
84d6780 [BenFradet] modified unit test in SQLQuerySuite to use udf instead of callUDF
477709f [BenFradet] removal of the unit test for the now deprecated callUdf


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b10662f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b10662f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b10662f

Branch: refs/heads/master
Commit: 0b10662fef11a56f82144b4953d457738e6961ae
Parents: dfde31d
Author: BenFradet <be...@gmail.com>
Authored: Sun Jun 28 22:43:47 2015 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Sun Jun 28 22:43:47 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ml/Predictor.scala   |  9 ++++---
 .../spark/ml/classification/Classifier.scala    | 13 +++++++---
 .../spark/ml/classification/OneVsRest.scala     | 27 +++++++++-----------
 .../ProbabilisticClassifier.scala               | 22 +++++++++++-----
 .../apache/spark/ml/feature/VectorIndexer.scala |  5 ++--
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  5 ++--
 6 files changed, 46 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0b10662f/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
index edaa2af..333b427 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
@@ -122,9 +122,7 @@ abstract class Predictor[
    */
   protected def extractLabeledPoints(dataset: DataFrame): RDD[LabeledPoint] = {
     dataset.select($(labelCol), $(featuresCol))
-      .map { case Row(label: Double, features: Vector) =>
-      LabeledPoint(label, features)
-    }
+      .map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) }
   }
 }
 
@@ -171,7 +169,10 @@ abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType,
   override def transform(dataset: DataFrame): DataFrame = {
     transformSchema(dataset.schema, logging = true)
     if ($(predictionCol).nonEmpty) {
-      dataset.withColumn($(predictionCol), callUDF(predict _, DoubleType, col($(featuresCol))))
+      val predictUDF = udf { (features: Any) =>
+        predict(features.asInstanceOf[FeaturesType])
+      }
+      dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
     } else {
       this.logWarning(s"$uid: Predictor.transform() was called as NOOP" +
         " since no output columns were set.")

http://git-wip-us.apache.org/repos/asf/spark/blob/0b10662f/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
index 14c285d..85c097b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
@@ -102,15 +102,20 @@ abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[Featur
     var outputData = dataset
     var numColsOutput = 0
     if (getRawPredictionCol != "") {
-      outputData = outputData.withColumn(getRawPredictionCol,
-        callUDF(predictRaw _, new VectorUDT, col(getFeaturesCol)))
+      val predictRawUDF = udf { (features: Any) =>
+        predictRaw(features.asInstanceOf[FeaturesType])
+      }
+      outputData = outputData.withColumn(getRawPredictionCol, predictRawUDF(col(getFeaturesCol)))
       numColsOutput += 1
     }
     if (getPredictionCol != "") {
       val predUDF = if (getRawPredictionCol != "") {
-        callUDF(raw2prediction _, DoubleType, col(getRawPredictionCol))
+        udf(raw2prediction _).apply(col(getRawPredictionCol))
       } else {
-        callUDF(predict _, DoubleType, col(getFeaturesCol))
+        val predictUDF = udf { (features: Any) =>
+          predict(features.asInstanceOf[FeaturesType])
+        }
+        predictUDF(col(getFeaturesCol))
       }
       outputData = outputData.withColumn(getPredictionCol, predUDF)
       numColsOutput += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/0b10662f/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
index b657882..ea757c5 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
@@ -88,9 +88,9 @@ final class OneVsRestModel private[ml] (
 
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val init: () => Map[Int, Double] = () => {Map()}
+    val initUDF = udf { () => Map[Int, Double]() }
     val mapType = MapType(IntegerType, DoubleType, valueContainsNull = false)
-    val newDataset = dataset.withColumn(accColName, callUDF(init, mapType))
+    val newDataset = dataset.withColumn(accColName, initUDF())
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
@@ -106,13 +106,12 @@ final class OneVsRestModel private[ml] (
 
         // add temporary column to store intermediate scores and update
         val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val update: (Map[Int, Double], Vector) => Map[Int, Double] =
-          (predictions: Map[Int, Double], prediction: Vector) => {
-            predictions + ((index, prediction(1)))
-          }
-        val updateUdf = callUDF(update, mapType, col(accColName), col(rawPredictionCol))
+        val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) =>
+          predictions + ((index, prediction(1)))
+        }
         val transformedDataset = model.transform(df).select(columns : _*)
-        val updatedDataset = transformedDataset.withColumn(tmpColName, updateUdf)
+        val updatedDataset = transformedDataset
+          .withColumn(tmpColName, updateUDF(col(accColName), col(rawPredictionCol)))
         val newColumns = origCols ++ List(col(tmpColName))
 
         // switch out the intermediate column with the accumulator column
@@ -124,13 +123,13 @@ final class OneVsRestModel private[ml] (
     }
 
     // output the index of the classifier with highest confidence as prediction
-    val label: Map[Int, Double] => Double = (predictions: Map[Int, Double]) => {
+    val labelUDF = udf { (predictions: Map[Int, Double]) =>
       predictions.maxBy(_._2)._1.toDouble
     }
 
     // output label and label metadata as prediction
-    val labelUdf = callUDF(label, DoubleType, col(accColName))
-    aggregatedDataset.withColumn($(predictionCol), labelUdf.as($(predictionCol), labelMetadata))
+    aggregatedDataset
+      .withColumn($(predictionCol), labelUDF(col(accColName)).as($(predictionCol), labelMetadata))
       .drop(accColName)
   }
 
@@ -185,17 +184,15 @@ final class OneVsRest(override val uid: String)
 
     // create k columns, one for each binary classifier.
     val models = Range(0, numClasses).par.map { index =>
-
-      val label: Double => Double = (label: Double) => {
+      val labelUDF = udf { (label: Double) =>
         if (label.toInt == index) 1.0 else 0.0
       }
 
       // generate new label metadata for the binary problem.
       // TODO: use when ... otherwise after SPARK-7321 is merged
-      val labelUDF = callUDF(label, DoubleType, col($(labelCol)))
       val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata()
       val labelColName = "mc2b$" + index
-      val labelUDFWithNewMeta = labelUDF.as(labelColName, newLabelMeta)
+      val labelUDFWithNewMeta = labelUDF(col($(labelCol))).as(labelColName, newLabelMeta)
       val trainingDataset = multiclassLabeled.withColumn(labelColName, labelUDFWithNewMeta)
       val classifier = getClassifier
       classifier.fit(trainingDataset, classifier.labelCol -> labelColName)

http://git-wip-us.apache.org/repos/asf/spark/blob/0b10662f/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala
index 330ae29..38e8323 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala
@@ -98,26 +98,34 @@ private[spark] abstract class ProbabilisticClassificationModel[
     var outputData = dataset
     var numColsOutput = 0
     if ($(rawPredictionCol).nonEmpty) {
-      outputData = outputData.withColumn(getRawPredictionCol,
-        callUDF(predictRaw _, new VectorUDT, col(getFeaturesCol)))
+      val predictRawUDF = udf { (features: Any) =>
+        predictRaw(features.asInstanceOf[FeaturesType])
+      }
+      outputData = outputData.withColumn(getRawPredictionCol, predictRawUDF(col(getFeaturesCol)))
       numColsOutput += 1
     }
     if ($(probabilityCol).nonEmpty) {
       val probUDF = if ($(rawPredictionCol).nonEmpty) {
-        callUDF(raw2probability _, new VectorUDT, col($(rawPredictionCol)))
+        udf(raw2probability _).apply(col($(rawPredictionCol)))
       } else {
-        callUDF(predictProbability _, new VectorUDT, col($(featuresCol)))
+        val probabilityUDF = udf { (features: Any) =>
+          predictProbability(features.asInstanceOf[FeaturesType])
+        }
+        probabilityUDF(col($(featuresCol)))
       }
       outputData = outputData.withColumn($(probabilityCol), probUDF)
       numColsOutput += 1
     }
     if ($(predictionCol).nonEmpty) {
       val predUDF = if ($(rawPredictionCol).nonEmpty) {
-        callUDF(raw2prediction _, DoubleType, col($(rawPredictionCol)))
+        udf(raw2prediction _).apply(col($(rawPredictionCol)))
       } else if ($(probabilityCol).nonEmpty) {
-        callUDF(probability2prediction _, DoubleType, col($(probabilityCol)))
+        udf(probability2prediction _).apply(col($(probabilityCol)))
       } else {
-        callUDF(predict _, DoubleType, col($(featuresCol)))
+        val predictUDF = udf { (features: Any) =>
+          predict(features.asInstanceOf[FeaturesType])
+        }
+        predictUDF(col($(featuresCol)))
       }
       outputData = outputData.withColumn($(predictionCol), predUDF)
       numColsOutput += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/0b10662f/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
index f4854a5..c73bdcc 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
@@ -30,7 +30,7 @@ import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util.{Identifiable, SchemaUtils}
 import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, VectorUDT}
 import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spark.sql.functions.callUDF
+import org.apache.spark.sql.functions.udf
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.util.collection.OpenHashSet
 
@@ -339,7 +339,8 @@ class VectorIndexerModel private[ml] (
   override def transform(dataset: DataFrame): DataFrame = {
     transformSchema(dataset.schema, logging = true)
     val newField = prepOutputField(dataset.schema)
-    val newCol = callUDF(transformFunc, new VectorUDT, dataset($(inputCol)))
+    val transformUDF = udf { (vector: Vector) => transformFunc(vector) }
+    val newCol = transformUDF(dataset($(inputCol)))
     dataset.withColumn($(outputCol), newCol.as($(outputCol), newField.metadata))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0b10662f/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 73bc6c9..22c54e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -137,13 +137,12 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
 
   test("SPARK-7158 collect and take return different results") {
     import java.util.UUID
-    import org.apache.spark.sql.types._
 
     val df = Seq(Tuple1(1), Tuple1(2), Tuple1(3)).toDF("index")
     // we except the id is materialized once
-    def id: () => String = () => { UUID.randomUUID().toString() }
+    val idUdf = udf(() => UUID.randomUUID().toString)
 
-    val dfWithId = df.withColumn("id", callUDF(id, StringType))
+    val dfWithId = df.withColumn("id", idUdf())
     // Make a new DataFrame (actually the same reference to the old one)
     val cached = dfWithId.cache()
     // Trigger the cache


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org