You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/02/04 10:19:21 UTC

[GitHub] [spark] zhengruifeng opened a new pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

zhengruifeng opened a new pull request #31472:
URL: https://github.com/apache/spark/pull/31472


   ### What changes were proposed in this pull request?
   1, clear predictionCol & probabilityCol, use tmp rawPred col, to avoid potential column conflict;
   2, use array instead of map, to keep in line with the python side;
   3, simplify transform
   
   ### Why are the changes needed?
   if input dataset has a column whose name is `predictionCol`,`probabilityCol`,`RawPredictionCol`, transfrom will fail.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   added testsuite


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773986621


   **[Test build #134918 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/134918/testReport)** for PR 31472 at commit [`02725b0`](https://github.com/apache/spark/commit/02725b038cb1cad57d4a23ec541849455b9f16f1).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773845767


   **[Test build #134918 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/134918/testReport)** for PR 31472 at commit [`02725b0`](https://github.com/apache/spark/commit/02725b038cb1cad57d4a23ec541849455b9f16f1).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773287510






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a change in pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on a change in pull request #31472:
URL: https://github.com/apache/spark/pull/31472#discussion_r570108063



##########
File path: mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
##########
@@ -223,6 +223,13 @@ class OneVsRestSuite extends MLTest with DefaultReadWriteTest {
     assert(oldCols === newCols)
   }
 
+  test("SPARK-SPARK-34356: OneVsRestModel.transform should avoid potential column conflict") {

Review comment:
       this test will fail in master and (maybe) all version of OVR.
   but I think fix it in master maybe enough.

##########
File path: mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
##########
@@ -223,6 +223,13 @@ class OneVsRestSuite extends MLTest with DefaultReadWriteTest {
     assert(oldCols === newCols)
   }
 
+  test("SPARK-SPARK-34356: OneVsRestModel.transform should avoid potential column conflict") {

Review comment:
       this test will fail in master and (maybe) all versions of OVR.
   but I think fix it in master maybe enough.

##########
File path: mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
##########
@@ -223,6 +223,13 @@ class OneVsRestSuite extends MLTest with DefaultReadWriteTest {
     assert(oldCols === newCols)
   }
 
+  test("SPARK-SPARK-34356: OneVsRestModel.transform should avoid potential column conflict") {

Review comment:
       this test will fail in master and (maybe) all versions of OVR.
   but I think fix it in master maybe enough.
   
   ![image](https://user-images.githubusercontent.com/7322292/106879331-f78b2e80-6715-11eb-8ffe-1af4c2a08120.png)
   

##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)

Review comment:
       ok, I will ignore similar case in the future.

##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")

Review comment:
       no, `tmpModel` is jsut a temporary model

##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")
+        tmpModel match {
+          case m: ProbabilisticClassificationModel[_, _] => m.setProbabilityCol("")
+          case _ =>

Review comment:
       ok

##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")

Review comment:
       `val tmpModel = model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a change in pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on a change in pull request #31472:
URL: https://github.com/apache/spark/pull/31472#discussion_r570744185



##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)

Review comment:
       ok, I will ignore similar case in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on a change in pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
srowen commented on a change in pull request #31472:
URL: https://github.com/apache/spark/pull/31472#discussion_r570224680



##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)

Review comment:
       It's no big deal but we don't need to change the if block; we generally always use the braces.

##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")
+        tmpModel match {
+          case m: ProbabilisticClassificationModel[_, _] => m.setProbabilityCol("")
+          case _ =>

Review comment:
       Should this case be silently ignored? if it's always ProbabilisticClassificationModel then just cast?

##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")

Review comment:
       Does this need to be reset to its original value somewhere? I probably don't know the code well enough




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-774581575


   thanks @srowen for reviewing!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773845767


   **[Test build #134918 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/134918/testReport)** for PR 31472 at commit [`02725b0`](https://github.com/apache/spark/commit/02725b038cb1cad57d4a23ec541849455b9f16f1).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a change in pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on a change in pull request #31472:
URL: https://github.com/apache/spark/pull/31472#discussion_r570745838



##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")

Review comment:
       `val tmpModel = model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773877317


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39501/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773239575


   **[Test build #134869 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/134869/testReport)** for PR 31472 at commit [`8a47b6e`](https://github.com/apache/spark/commit/8a47b6ea2a4e9254f8f9a7e4e8039e95cbb6d578).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773203630


   ```
   scala> val df = spark.read.format("libsvm").load("/d0/Dev/Opensource/spark/data/mllib/sample_multiclass_classification_data.txt").withColumn("probability", lit(0.0))
   21/02/04 18:06:36 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.
   df: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 1 more field]
   
   scala> 
   
   scala> val classifier = new LogisticRegression().setMaxIter(1).setTol(1E-6).setFitIntercept(true)
   classifier: org.apache.spark.ml.classification.LogisticRegression = logreg_5900509aa825
   
   scala> val ovr = new OneVsRest().setClassifier(classifier)
   ovr: org.apache.spark.ml.classification.OneVsRest = oneVsRest_dd2b3e9da4e3
   
   scala> val ovrm = ovr.fit(df)
   ovrm: org.apache.spark.ml.classification.OneVsRestModel = OneVsRestModel: uid=oneVsRest_dd2b3e9da4e3, classifier=logreg_5900509aa825, numClasses=3, numFeatures=4
   
   scala> ovrm.transform(df)
   java.lang.IllegalArgumentException: requirement failed: Column probability already exists.
     at scala.Predef$.require(Predef.scala:281)
     at org.apache.spark.ml.util.SchemaUtils$.appendColumn(SchemaUtils.scala:106)
     at org.apache.spark.ml.util.SchemaUtils$.appendColumn(SchemaUtils.scala:96)
     at org.apache.spark.ml.classification.ProbabilisticClassifierParams.validateAndTransformSchema(ProbabilisticClassifier.scala:38)
     at org.apache.spark.ml.classification.ProbabilisticClassifierParams.validateAndTransformSchema$(ProbabilisticClassifier.scala:33)
     at org.apache.spark.ml.classification.LogisticRegressionModel.org$apache$spark$ml$classification$LogisticRegressionParams$$super$validateAndTransformSchema(LogisticRegression.scala:917)
     at org.apache.spark.ml.classification.LogisticRegressionParams.validateAndTransformSchema(LogisticRegression.scala:268)
     at org.apache.spark.ml.classification.LogisticRegressionParams.validateAndTransformSchema$(LogisticRegression.scala:255)
     at org.apache.spark.ml.classification.LogisticRegressionModel.validateAndTransformSchema(LogisticRegression.scala:917)
     at org.apache.spark.ml.PredictionModel.transformSchema(Predictor.scala:222)
     at org.apache.spark.ml.classification.ClassificationModel.transformSchema(Classifier.scala:182)
     at org.apache.spark.ml.classification.ProbabilisticClassificationModel.transformSchema(ProbabilisticClassifier.scala:88)
     at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
     at org.apache.spark.ml.classification.ProbabilisticClassificationModel.transform(ProbabilisticClassifier.scala:107)
     at org.apache.spark.ml.classification.OneVsRestModel.$anonfun$transform$4(OneVsRest.scala:215)
     at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
     at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
     at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:198)
     at org.apache.spark.ml.classification.OneVsRestModel.transform(OneVsRest.scala:203)
     ... 49 elided
   
   scala> 
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773203630


   ```
   scala> val df = spark.read.format("libsvm").load("/d0/Dev/Opensource/spark/data/mllib/sample_multiclass_classification_data.txt").withColumn("probability", lit(0.0))
   21/02/04 18:06:36 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.
   df: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 1 more field]
   
   scala> 
   
   scala> val classifier = new LogisticRegression().setMaxIter(1).setTol(1E-6).setFitIntercept(true)
   classifier: org.apache.spark.ml.classification.LogisticRegression = logreg_5900509aa825
   
   scala> val ovr = new OneVsRest().setClassifier(classifier)
   ovr: org.apache.spark.ml.classification.OneVsRest = oneVsRest_dd2b3e9da4e3
   
   scala> val ovrm = ovr.fit(df)
   ovrm: org.apache.spark.ml.classification.OneVsRestModel = OneVsRestModel: uid=oneVsRest_dd2b3e9da4e3, classifier=logreg_5900509aa825, numClasses=3, numFeatures=4
   
   scala> ovrm.transform(df)
   java.lang.IllegalArgumentException: requirement failed: Column probability already exists.
     at scala.Predef$.require(Predef.scala:281)
     at org.apache.spark.ml.util.SchemaUtils$.appendColumn(SchemaUtils.scala:106)
     at org.apache.spark.ml.util.SchemaUtils$.appendColumn(SchemaUtils.scala:96)
     at org.apache.spark.ml.classification.ProbabilisticClassifierParams.validateAndTransformSchema(ProbabilisticClassifier.scala:38)
     at org.apache.spark.ml.classification.ProbabilisticClassifierParams.validateAndTransformSchema$(ProbabilisticClassifier.scala:33)
     at org.apache.spark.ml.classification.LogisticRegressionModel.org$apache$spark$ml$classification$LogisticRegressionParams$$super$validateAndTransformSchema(LogisticRegression.scala:917)
     at org.apache.spark.ml.classification.LogisticRegressionParams.validateAndTransformSchema(LogisticRegression.scala:268)
     at org.apache.spark.ml.classification.LogisticRegressionParams.validateAndTransformSchema$(LogisticRegression.scala:255)
     at org.apache.spark.ml.classification.LogisticRegressionModel.validateAndTransformSchema(LogisticRegression.scala:917)
     at org.apache.spark.ml.PredictionModel.transformSchema(Predictor.scala:222)
     at org.apache.spark.ml.classification.ClassificationModel.transformSchema(Classifier.scala:182)
     at org.apache.spark.ml.classification.ProbabilisticClassificationModel.transformSchema(ProbabilisticClassifier.scala:88)
     at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
     at org.apache.spark.ml.classification.ProbabilisticClassificationModel.transform(ProbabilisticClassifier.scala:107)
     at org.apache.spark.ml.classification.OneVsRestModel.$anonfun$transform$4(OneVsRest.scala:215)
     at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
     at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
     at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:198)
     at org.apache.spark.ml.classification.OneVsRestModel.transform(OneVsRest.scala:203)
     ... 49 elided
   
   scala> 
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773287510






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a change in pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on a change in pull request #31472:
URL: https://github.com/apache/spark/pull/31472#discussion_r570108063



##########
File path: mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
##########
@@ -223,6 +223,13 @@ class OneVsRestSuite extends MLTest with DefaultReadWriteTest {
     assert(oldCols === newCols)
   }
 
+  test("SPARK-SPARK-34356: OneVsRestModel.transform should avoid potential column conflict") {

Review comment:
       this test will fail in master and (maybe) all versions of OVR.
   but I think fix it in master maybe enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773897290


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39501/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773239575


   **[Test build #134869 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/134869/testReport)** for PR 31472 at commit [`8a47b6e`](https://github.com/apache/spark/commit/8a47b6ea2a4e9254f8f9a7e4e8039e95cbb6d578).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773239575






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773274086


   **[Test build #134869 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/134869/testReport)** for PR 31472 at commit [`8a47b6e`](https://github.com/apache/spark/commit/8a47b6ea2a4e9254f8f9a7e4e8039e95cbb6d578).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773239575


   **[Test build #134869 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/134869/testReport)** for PR 31472 at commit [`8a47b6e`](https://github.com/apache/spark/commit/8a47b6ea2a4e9254f8f9a7e4e8039e95cbb6d578).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773272661


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39456/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng edited a comment on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
zhengruifeng edited a comment on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773203630


   in 3.0.1 and master
   ```
   scala> val df = spark.read.format("libsvm").load("/d0/Dev/Opensource/spark/data/mllib/sample_multiclass_classification_data.txt").withColumn("probability", lit(0.0))
   21/02/04 18:06:36 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.
   df: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 1 more field]
   
   scala> 
   
   scala> val classifier = new LogisticRegression().setMaxIter(1).setTol(1E-6).setFitIntercept(true)
   classifier: org.apache.spark.ml.classification.LogisticRegression = logreg_5900509aa825
   
   scala> val ovr = new OneVsRest().setClassifier(classifier)
   ovr: org.apache.spark.ml.classification.OneVsRest = oneVsRest_dd2b3e9da4e3
   
   scala> val ovrm = ovr.fit(df)
   ovrm: org.apache.spark.ml.classification.OneVsRestModel = OneVsRestModel: uid=oneVsRest_dd2b3e9da4e3, classifier=logreg_5900509aa825, numClasses=3, numFeatures=4
   
   scala> ovrm.transform(df)
   java.lang.IllegalArgumentException: requirement failed: Column probability already exists.
     at scala.Predef$.require(Predef.scala:281)
     at org.apache.spark.ml.util.SchemaUtils$.appendColumn(SchemaUtils.scala:106)
     at org.apache.spark.ml.util.SchemaUtils$.appendColumn(SchemaUtils.scala:96)
     at org.apache.spark.ml.classification.ProbabilisticClassifierParams.validateAndTransformSchema(ProbabilisticClassifier.scala:38)
     at org.apache.spark.ml.classification.ProbabilisticClassifierParams.validateAndTransformSchema$(ProbabilisticClassifier.scala:33)
     at org.apache.spark.ml.classification.LogisticRegressionModel.org$apache$spark$ml$classification$LogisticRegressionParams$$super$validateAndTransformSchema(LogisticRegression.scala:917)
     at org.apache.spark.ml.classification.LogisticRegressionParams.validateAndTransformSchema(LogisticRegression.scala:268)
     at org.apache.spark.ml.classification.LogisticRegressionParams.validateAndTransformSchema$(LogisticRegression.scala:255)
     at org.apache.spark.ml.classification.LogisticRegressionModel.validateAndTransformSchema(LogisticRegression.scala:917)
     at org.apache.spark.ml.PredictionModel.transformSchema(Predictor.scala:222)
     at org.apache.spark.ml.classification.ClassificationModel.transformSchema(Classifier.scala:182)
     at org.apache.spark.ml.classification.ProbabilisticClassificationModel.transformSchema(ProbabilisticClassifier.scala:88)
     at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
     at org.apache.spark.ml.classification.ProbabilisticClassificationModel.transform(ProbabilisticClassifier.scala:107)
     at org.apache.spark.ml.classification.OneVsRestModel.$anonfun$transform$4(OneVsRest.scala:215)
     at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
     at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
     at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:198)
     at org.apache.spark.ml.classification.OneVsRestModel.transform(OneVsRest.scala:203)
     ... 49 elided
   
   scala> 
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773863219


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39501/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a change in pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on a change in pull request #31472:
URL: https://github.com/apache/spark/pull/31472#discussion_r570108063



##########
File path: mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
##########
@@ -223,6 +223,13 @@ class OneVsRestSuite extends MLTest with DefaultReadWriteTest {
     assert(oldCols === newCols)
   }
 
+  test("SPARK-SPARK-34356: OneVsRestModel.transform should avoid potential column conflict") {

Review comment:
       this test will fail in master and (maybe) all versions of OVR.
   but I think fix it in master maybe enough.
   
   ![image](https://user-images.githubusercontent.com/7322292/106879331-f78b2e80-6715-11eb-8ffe-1af4c2a08120.png)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng edited a comment on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
zhengruifeng edited a comment on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773203630


   in 3.0.1 and master
   ```
   scala> val df = spark.read.format("libsvm").load("/d0/Dev/Opensource/spark/data/mllib/sample_multiclass_classification_data.txt").withColumn("probability", lit(0.0))
   21/02/04 18:06:36 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.
   df: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 1 more field]
   
   scala> 
   
   scala> val classifier = new LogisticRegression().setMaxIter(1).setTol(1E-6).setFitIntercept(true)
   classifier: org.apache.spark.ml.classification.LogisticRegression = logreg_5900509aa825
   
   scala> val ovr = new OneVsRest().setClassifier(classifier)
   ovr: org.apache.spark.ml.classification.OneVsRest = oneVsRest_dd2b3e9da4e3
   
   scala> val ovrm = ovr.fit(df)
   ovrm: org.apache.spark.ml.classification.OneVsRestModel = OneVsRestModel: uid=oneVsRest_dd2b3e9da4e3, classifier=logreg_5900509aa825, numClasses=3, numFeatures=4
   
   scala> ovrm.transform(df)
   java.lang.IllegalArgumentException: requirement failed: Column probability already exists.
     at scala.Predef$.require(Predef.scala:281)
     at org.apache.spark.ml.util.SchemaUtils$.appendColumn(SchemaUtils.scala:106)
     at org.apache.spark.ml.util.SchemaUtils$.appendColumn(SchemaUtils.scala:96)
     at org.apache.spark.ml.classification.ProbabilisticClassifierParams.validateAndTransformSchema(ProbabilisticClassifier.scala:38)
     at org.apache.spark.ml.classification.ProbabilisticClassifierParams.validateAndTransformSchema$(ProbabilisticClassifier.scala:33)
     at org.apache.spark.ml.classification.LogisticRegressionModel.org$apache$spark$ml$classification$LogisticRegressionParams$$super$validateAndTransformSchema(LogisticRegression.scala:917)
     at org.apache.spark.ml.classification.LogisticRegressionParams.validateAndTransformSchema(LogisticRegression.scala:268)
     at org.apache.spark.ml.classification.LogisticRegressionParams.validateAndTransformSchema$(LogisticRegression.scala:255)
     at org.apache.spark.ml.classification.LogisticRegressionModel.validateAndTransformSchema(LogisticRegression.scala:917)
     at org.apache.spark.ml.PredictionModel.transformSchema(Predictor.scala:222)
     at org.apache.spark.ml.classification.ClassificationModel.transformSchema(Classifier.scala:182)
     at org.apache.spark.ml.classification.ProbabilisticClassificationModel.transformSchema(ProbabilisticClassifier.scala:88)
     at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:71)
     at org.apache.spark.ml.classification.ProbabilisticClassificationModel.transform(ProbabilisticClassifier.scala:107)
     at org.apache.spark.ml.classification.OneVsRestModel.$anonfun$transform$4(OneVsRest.scala:215)
     at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
     at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
     at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:198)
     at org.apache.spark.ml.classification.OneVsRestModel.transform(OneVsRest.scala:203)
     ... 49 elided
   
   scala> 
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a change in pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on a change in pull request #31472:
URL: https://github.com/apache/spark/pull/31472#discussion_r570108063



##########
File path: mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala
##########
@@ -223,6 +223,13 @@ class OneVsRestSuite extends MLTest with DefaultReadWriteTest {
     assert(oldCols === newCols)
   }
 
+  test("SPARK-SPARK-34356: OneVsRestModel.transform should avoid potential column conflict") {

Review comment:
       this test will fail in master and (maybe) all version of OVR.
   but I think fix it in master maybe enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a change in pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on a change in pull request #31472:
URL: https://github.com/apache/spark/pull/31472#discussion_r570744818



##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")
+        tmpModel match {
+          case m: ProbabilisticClassificationModel[_, _] => m.setProbabilityCol("")
+          case _ =>

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen closed pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
srowen closed pull request #31472:
URL: https://github.com/apache/spark/pull/31472


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773287504






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773897290


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39501/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773287504






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on a change in pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
srowen commented on a change in pull request #31472:
URL: https://github.com/apache/spark/pull/31472#discussion_r571086556



##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")

Review comment:
       Oh OK it's a copy of the model, right.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a change in pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on a change in pull request #31472:
URL: https://github.com/apache/spark/pull/31472#discussion_r570744758



##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")

Review comment:
       no, `tmpModel` is jsut a temporary model




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773993043


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/134918/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773255807


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39456/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on a change in pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
srowen commented on a change in pull request #31472:
URL: https://github.com/apache/spark/pull/31472#discussion_r570224680



##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)

Review comment:
       It's no big deal but we don't need to change the if block; we generally always use the braces.

##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")
+        tmpModel match {
+          case m: ProbabilisticClassificationModel[_, _] => m.setProbabilityCol("")
+          case _ =>

Review comment:
       Should this case be silently ignored? if it's always ProbabilisticClassificationModel then just cast?

##########
File path: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
##########
@@ -185,71 +185,56 @@ final class OneVsRestModel private[ml] (
       return dataset.toDF
     }
 
-    // determine the input columns: these need to be passed through
-    val origCols = dataset.schema.map(f => col(f.name))
-
     // add an accumulator column to store predictions of all the models
     val accColName = "mbc$acc" + UUID.randomUUID().toString
-    val initUDF = udf { () => Map[Int, Double]() }
-    val newDataset = dataset.withColumn(accColName, initUDF())
+    val newDataset = dataset.withColumn(accColName, lit(Array.emptyDoubleArray))
 
     // persist if underlying dataset is not persistent.
     val handlePersistence = !dataset.isStreaming && dataset.storageLevel == StorageLevel.NONE
-    if (handlePersistence) {
-      newDataset.persist(StorageLevel.MEMORY_AND_DISK)
-    }
+    if (handlePersistence) newDataset.persist(StorageLevel.MEMORY_AND_DISK)
 
     // update the accumulator column with the result of prediction of models
-    val aggregatedDataset = models.zipWithIndex.foldLeft[DataFrame](newDataset) {
-      case (df, (model, index)) =>
+    val aggregatedDataset = models.foldLeft[DataFrame](newDataset) {
+      case (df, model) =>
         // avoid calling directly setter of model
         val tmpModel = model.copy(ParamMap.empty).asInstanceOf[ClassificationModel[_, _]]
         tmpModel.setFeaturesCol($(featuresCol))
 
-        val rawPredictionCol = tmpModel.getRawPredictionCol
-        val columns = origCols ++ List(col(rawPredictionCol), col(accColName))
-
-        // add temporary column to store intermediate scores and update
-        val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
-        val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) =>
-          predictions + ((index, prediction(1)))
+        // use a temporary raw prediction column to avoid column conflict
+        val tmpRawPredName = "mbc$raw" + UUID.randomUUID().toString
+        tmpModel.setRawPredictionCol(tmpRawPredName)
+        tmpModel.setPredictionCol("")

Review comment:
       Does this need to be reset to its original value somewhere? I probably don't know the code well enough




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31472: [SPARK-34356][ML] OVR transform fix potential column conflict

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31472:
URL: https://github.com/apache/spark/pull/31472#issuecomment-773993043


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/134918/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org