You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yl...@apache.org on 2016/09/10 07:27:17 UTC

spark git commit: [SPARK-15509][FOLLOW-UP][ML][SPARKR] R MLlib algorithms should support input columns "features" and "label"

Repository: spark
Updated Branches:
  refs/heads/master 1fec3ce4e -> bcdd259c3


[SPARK-15509][FOLLOW-UP][ML][SPARKR] R MLlib algorithms should support input columns "features" and "label"

## What changes were proposed in this pull request?
#13584 resolved the issue of features and label columns conflict with ```RFormula``` default ones when loading libsvm data, but it still left some issues should be resolved:
1, It\u2019s not necessary to check and rename label column.
Since we have considerations on the design of ```RFormula```, it can handle the case of label column already exists(with restriction of the existing label column should be numeric/boolean type). So it\u2019s not necessary to change the column name to avoid conflict. If the label column is not numeric/boolean type, ```RFormula``` will throw exception.

2, We should rename features column name to new one if there is conflict, but appending a random value is enough since it was used internally only. We done similar work when implementing ```SQLTransformer```.

3, We should set correct new features column for the estimators. Take ```GLM``` as example:
```GLM``` estimator should set features column with the changed one(rFormula.getFeaturesCol) rather than the default \u201cfeatures\u201d. Although it\u2019s same when training model, but it involves problems when predicting. The following is the prediction result of GLM before this PR:
![image](https://cloud.githubusercontent.com/assets/1962026/18308227/84c3c452-74a8-11e6-9caa-9d6d846cc957.png)
We should drop the internal used feature column name, otherwise, it will appear on the prediction DataFrame which will confused users. And this behavior is same as other scenarios which does not exist column name conflict.
After this PR:
![image](https://cloud.githubusercontent.com/assets/1962026/18308240/92082a04-74a8-11e6-9226-801f52b856d9.png)

## How was this patch tested?
Existing unit tests.

Author: Yanbo Liang <yb...@gmail.com>

Closes #14993 from yanboliang/spark-15509.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bcdd259c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bcdd259c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bcdd259c

Branch: refs/heads/master
Commit: bcdd259c371b1dcdb41baf227867d7e2ecb923c6
Parents: 1fec3ce
Author: Yanbo Liang <yb...@gmail.com>
Authored: Sat Sep 10 00:27:10 2016 -0700
Committer: Yanbo Liang <yb...@gmail.com>
Committed: Sat Sep 10 00:27:10 2016 -0700

----------------------------------------------------------------------
 .../ml/r/AFTSurvivalRegressionWrapper.scala     |  1 +
 .../spark/ml/r/GaussianMixtureWrapper.scala     |  1 +
 .../r/GeneralizedLinearRegressionWrapper.scala  |  1 +
 .../spark/ml/r/IsotonicRegressionWrapper.scala  |  1 +
 .../org/apache/spark/ml/r/KMeansWrapper.scala   |  1 +
 .../apache/spark/ml/r/NaiveBayesWrapper.scala   |  1 +
 .../org/apache/spark/ml/r/RWrapperUtils.scala   | 34 +++-----------------
 .../apache/spark/ml/r/RWrapperUtilsSuite.scala  | 16 +++------
 8 files changed, 14 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bcdd259c/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
index 67d037e..bd965ac 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
@@ -99,6 +99,7 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
     val aft = new AFTSurvivalRegression()
       .setCensorCol(censorCol)
       .setFitIntercept(rFormula.hasIntercept)
+      .setFeaturesCol(rFormula.getFeaturesCol)
 
     val pipeline = new Pipeline()
       .setStages(Array(rFormulaModel, aft))

http://git-wip-us.apache.org/repos/asf/spark/blob/bcdd259c/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
index b654233..b708702 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GaussianMixtureWrapper.scala
@@ -85,6 +85,7 @@ private[r] object GaussianMixtureWrapper extends MLReadable[GaussianMixtureWrapp
       .setK(k)
       .setMaxIter(maxIter)
       .setTol(tol)
+      .setFeaturesCol(rFormula.getFeaturesCol)
 
     val pipeline = new Pipeline()
       .setStages(Array(rFormulaModel, gm))

http://git-wip-us.apache.org/repos/asf/spark/blob/bcdd259c/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
index 3531325..b1bb577 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
@@ -89,6 +89,7 @@ private[r] object GeneralizedLinearRegressionWrapper
       .setMaxIter(maxIter)
       .setWeightCol(weightCol)
       .setRegParam(regParam)
+      .setFeaturesCol(rFormula.getFeaturesCol)
     val pipeline = new Pipeline()
       .setStages(Array(rFormulaModel, glr))
       .fit(data)

http://git-wip-us.apache.org/repos/asf/spark/blob/bcdd259c/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
index 2ed7d7b..4863231 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/IsotonicRegressionWrapper.scala
@@ -75,6 +75,7 @@ private[r] object IsotonicRegressionWrapper
       .setIsotonic(isotonic)
       .setFeatureIndex(featureIndex)
       .setWeightCol(weightCol)
+      .setFeaturesCol(rFormula.getFeaturesCol)
 
     val pipeline = new Pipeline()
       .setStages(Array(rFormulaModel, isotonicRegression))

http://git-wip-us.apache.org/repos/asf/spark/blob/bcdd259c/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
index 8616a8c..ea94585 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
@@ -86,6 +86,7 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
       .setK(k)
       .setMaxIter(maxIter)
       .setInitMode(initMode)
+      .setFeaturesCol(rFormula.getFeaturesCol)
 
     val pipeline = new Pipeline()
       .setStages(Array(rFormulaModel, kMeans))

http://git-wip-us.apache.org/repos/asf/spark/blob/bcdd259c/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
index f2cb24b..d1a39fe 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
@@ -73,6 +73,7 @@ private[r] object NaiveBayesWrapper extends MLReadable[NaiveBayesWrapper] {
     val naiveBayes = new NaiveBayes()
       .setSmoothing(smoothing)
       .setModelType("bernoulli")
+      .setFeaturesCol(rFormula.getFeaturesCol)
       .setPredictionCol(PREDICTED_LABEL_INDEX_COL)
     val idxToStr = new IndexToString()
       .setInputCol(PREDICTED_LABEL_INDEX_COL)

http://git-wip-us.apache.org/repos/asf/spark/blob/bcdd259c/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala
index 6a43599..379007c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrapperUtils.scala
@@ -19,14 +19,15 @@ package org.apache.spark.ml.r
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.ml.feature.RFormula
+import org.apache.spark.ml.util.Identifiable
 import org.apache.spark.sql.Dataset
 
 object RWrapperUtils extends Logging {
 
   /**
    * DataFrame column check.
-   * When loading data, default columns "features" and "label" will be added. And these two names
-   * would conflict with RFormula default feature and label column names.
+   * When loading libsvm data, default columns "features" and "label" will be added.
+   * And "features" would conflict with RFormula default feature column names.
    * Here is to change the column name to avoid "column already exists" error.
    *
    * @param rFormula RFormula instance
@@ -34,38 +35,11 @@ object RWrapperUtils extends Logging {
    * @return Unit
    */
   def checkDataColumns(rFormula: RFormula, data: Dataset[_]): Unit = {
-    if (data.schema.fieldNames.contains(rFormula.getLabelCol)) {
-      val newLabelName = convertToUniqueName(rFormula.getLabelCol, data.schema.fieldNames)
-      logWarning(
-        s"data containing ${rFormula.getLabelCol} column, using new name $newLabelName instead")
-      rFormula.setLabelCol(newLabelName)
-    }
-
     if (data.schema.fieldNames.contains(rFormula.getFeaturesCol)) {
-      val newFeaturesName = convertToUniqueName(rFormula.getFeaturesCol, data.schema.fieldNames)
+      val newFeaturesName = s"${Identifiable.randomUID(rFormula.getFeaturesCol)}"
       logWarning(s"data containing ${rFormula.getFeaturesCol} column, " +
         s"using new name $newFeaturesName instead")
       rFormula.setFeaturesCol(newFeaturesName)
     }
   }
-
-  /**
-   * Convert conflicting name to be an unique name.
-   * Appending a sequence number, like originalName_output1
-   * and incrementing until it is not already there
-   *
-   * @param originalName Original name
-   * @param fieldNames Array of field names in existing schema
-   * @return String
-   */
-  def convertToUniqueName(originalName: String, fieldNames: Array[String]): String = {
-    var counter = 1
-    var newName = originalName + "_output"
-
-    while (fieldNames.contains(newName)) {
-      newName = originalName + "_output" + counter
-      counter += 1
-    }
-    newName
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bcdd259c/mllib/src/test/scala/org/apache/spark/ml/r/RWrapperUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/r/RWrapperUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/r/RWrapperUtilsSuite.scala
index ddc24cb..27b0391 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/r/RWrapperUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/r/RWrapperUtilsSuite.scala
@@ -35,22 +35,14 @@ class RWrapperUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
     // after checking, model build is ok
     RWrapperUtils.checkDataColumns(rFormula, data)
 
-    assert(rFormula.getLabelCol == "label_output")
-    assert(rFormula.getFeaturesCol == "features_output")
+    assert(rFormula.getLabelCol == "label")
+    assert(rFormula.getFeaturesCol.startsWith("features_"))
 
     val model = rFormula.fit(data)
     assert(model.isInstanceOf[RFormulaModel])
 
-    assert(model.getLabelCol == "label_output")
-    assert(model.getFeaturesCol == "features_output")
-  }
-
-  test("generate unique name by appending a sequence number") {
-    val originalName = "label"
-    val fieldNames = Array("label_output", "label_output1", "label_output2")
-    val newName = RWrapperUtils.convertToUniqueName(originalName, fieldNames)
-
-    assert(newName === "label_output3")
+    assert(model.getLabelCol == "label")
+    assert(model.getFeaturesCol.startsWith("features_"))
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org