You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/03/15 18:37:07 UTC

[GitHub] [spark] srowen commented on a change in pull request #27895: [SPARK-31138][ML] Add ANOVA Selector for continuous features and categorical labels

srowen commented on a change in pull request #27895: [SPARK-31138][ML] Add ANOVA Selector for continuous features and categorical labels
URL: https://github.com/apache/spark/pull/27895#discussion_r392700075
 
 

 ##########
 File path: mllib/src/main/scala/org/apache/spark/ml/feature/ANOVASelector.scala
 ##########
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import scala.collection.mutable.ArrayBuilder
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.annotation.Since
+import org.apache.spark.ml._
+import org.apache.spark.ml.attribute._
+import org.apache.spark.ml.linalg._
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.stat.{ANOVATest, SelectionTestResult}
+import org.apache.spark.ml.util._
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
+
+
+/**
+ * ANOVA F-value Classification selector, which selects continuous features to use for predicting a
+ * categorical label.
+ * The selector supports different selection methods: `numTopFeatures`, `percentile`, `fpr`,
+ * `fdr`, `fwe`.
+ *  - `numTopFeatures` chooses a fixed number of top features according to a F value classification
+ *     test.
+ *  - `percentile` is similar but chooses a fraction of all features instead of a fixed number.
+ *  - `fpr` chooses all features whose p-value are below a threshold, thus controlling the false
+ *    positive rate of selection.
+ *  - `fdr` uses the [Benjamini-Hochberg procedure]
+ *    (https://en.wikipedia.org/wiki/False_discovery_rate#Benjamini.E2.80.93Hochberg_procedure)
+ *    to choose all features whose false discovery rate is below a threshold.
+ *  - `fwe` chooses all features whose p-values are below a threshold. The threshold is scaled by
+ *    1/numFeatures, thus controlling the family-wise error rate of selection.
+ * By default, the selection method is `numTopFeatures`, with the default number of top features
+ * set to 50.
+ */
+@Since("3.1.0")
+final class ANOVASelector @Since("3.1.0")(@Since("3.1.0") override val uid: String)
+  extends Estimator[ANOVASelectorModel] with FValueSelectorParams
+    with DefaultParamsWritable {
+
+  @Since("3.1.0")
+  def this() = this(Identifiable.randomUID("ANOVASelector"))
+
+  /** @group setParam */
+  @Since("3.1.0")
+  def setNumTopFeatures(value: Int): this.type = set(numTopFeatures, value)
+
+  /** @group setParam */
+  @Since("3.1.0")
+  def setPercentile(value: Double): this.type = set(percentile, value)
+
+  /** @group setParam */
+  @Since("3.1.0")
+  def setFpr(value: Double): this.type = set(fpr, value)
+
+  /** @group setParam */
+  @Since("3.1.0")
+  def setFdr(value: Double): this.type = set(fdr, value)
+
+  /** @group setParam */
+  @Since("3.1.0")
+  def setFwe(value: Double): this.type = set(fwe, value)
+
+  /** @group setParam */
+  @Since("3.1.0")
+  def setSelectorType(value: String): this.type = set(selectorType, value)
+
+  /** @group setParam */
+  @Since("3.1.0")
+  def setFeaturesCol(value: String): this.type = set(featuresCol, value)
+
+  /** @group setParam */
+  @Since("3.1.0")
+  def setOutputCol(value: String): this.type = set(outputCol, value)
+
+  /** @group setParam */
+  @Since("3.1.0")
+  def setLabelCol(value: String): this.type = set(labelCol, value)
+
+  @Since("3.1.0")
+  override def fit(dataset: Dataset[_]): ANOVASelectorModel = {
+    transformSchema(dataset.schema, logging = true)
+    dataset.select(col($(labelCol)).cast(DoubleType), col($(featuresCol))).rdd.map {
+      case Row(label: Double, features: Vector) =>
+        LabeledPoint(label, features)
+    }
+
+    val testResult = ANOVATest.testClassification(dataset, getFeaturesCol, getLabelCol)
+      .zipWithIndex
+    val features = $(selectorType) match {
+      case "numTopFeatures" =>
+        testResult
+          .sortBy { case (res, _) => res.pValue }
+          .take(getNumTopFeatures)
+      case "percentile" =>
+        testResult
+          .sortBy { case (res, _) => res.pValue }
+          .take((testResult.length * getPercentile).toInt)
+      case "fpr" =>
+        testResult
+          .filter { case (res, _) => res.pValue < getFpr }
+      case "fdr" =>
+        // This uses the Benjamini-Hochberg procedure.
+        // https://en.wikipedia.org/wiki/False_discovery_rate#Benjamini.E2.80.93Hochberg_procedure
+        val tempRes = testResult
+          .sortBy { case (res, _) => res.pValue }
+        val selected = tempRes
+          .zipWithIndex
+          .filter { case ((res, _), index) =>
+            res.pValue <= getFdr * (index + 1) / testResult.length }
+        if (selected.isEmpty) {
+          Array.empty[(SelectionTestResult, Int)]
+        } else {
+          val maxIndex = selected.map(_._2).max
+          tempRes.take(maxIndex + 1)
+        }
+      case "fwe" =>
+        testResult
+          .filter { case (res, _) => res.pValue < getFwe / testResult.length }
+      case errorType =>
+        throw new IllegalStateException(s"Unknown Selector Type: $errorType")
+    }
+    val indices = features.map { case (_, index) => index }
+    copyValues(new ANOVASelectorModel(uid, indices.sorted)
+      .setParent(this))
+  }
+
+  @Since("3.1.0")
+  override def transformSchema(schema: StructType): StructType = {
+    SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT)
+    SchemaUtils.checkNumericType(schema, $(labelCol))
+    SchemaUtils.appendColumn(schema, $(outputCol), new VectorUDT)
+  }
+
+  @Since("3.1.0")
+  override def copy(extra: ParamMap): ANOVASelector = defaultCopy(extra)
+}
+
+@Since("3.1.0")
+object ANOVASelector extends DefaultParamsReadable[ANOVASelector] {
+
+  @Since("3.1.0")
+  override def load(path: String): ANOVASelector = super.load(path)
+}
+
+/**
+ * Model fitted by [[ANOVASelector]].
+ */
+@Since("3.1.0")
+class ANOVASelectorModel private[ml](
+    @Since("3.1.0") override val uid: String,
+    @Since("3.1.0") val selectedFeatures: Array[Int])
+  extends Model[ANOVASelectorModel] with FValueSelectorParams with MLWritable {
+
+  if (selectedFeatures.length >= 2) {
+    require(selectedFeatures.sliding(2).forall(l => l(0) < l(1)),
+      "Index should be strictly increasing.")
+  }
+
+  /** @group setParam */
+  @Since("3.1.0")
+  def setFeaturesCol(value: String): this.type = set(featuresCol, value)
+
+  /** @group setParam */
+  @Since("3.1.0")
+  def setOutputCol(value: String): this.type = set(outputCol, value)
+
+  @Since("3.1.0")
+  override def transform(dataset: Dataset[_]): DataFrame = {
+    val outputSchema = transformSchema(dataset.schema, logging = true)
+
+    val newSize = selectedFeatures.length
+    val func = { vector: Vector =>
+      vector match {
+        case SparseVector(_, indices, values) =>
+          val (newIndices, newValues) = compressSparse(indices, values)
+          Vectors.sparse(newSize, newIndices, newValues)
+        case DenseVector(values) =>
+          Vectors.dense(selectedFeatures.map(values))
+        case other =>
+          throw new UnsupportedOperationException(
+            s"Only sparse and dense vectors are supported but got ${other.getClass}.")
+      }
+    }
+
+    val transformer = udf(func)
+    dataset.withColumn($(outputCol), transformer(col($(featuresCol))),
+      outputSchema($(outputCol)).metadata)
+  }
+
+  @Since("3.1.0")
+  override def transformSchema(schema: StructType): StructType = {
+    SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT)
+    val newField = prepOutputField(schema)
+    SchemaUtils.appendColumn(schema, newField)
+  }
+
+  /**
+   * Prepare the output column field, including per-feature metadata.
+   */
+  private def prepOutputField(schema: StructType): StructField = {
+    val selector = selectedFeatures.toSet
+    val origAttrGroup = AttributeGroup.fromStructField(schema($(featuresCol)))
+    val featureAttributes: Array[Attribute] = if (origAttrGroup.attributes.nonEmpty) {
+      origAttrGroup.attributes.get.zipWithIndex.filter(x => selector.contains(x._2)).map(_._1)
+    } else {
+      Array.fill[Attribute](selector.size)(NominalAttribute.defaultAttr)
+    }
+    val newAttributeGroup = new AttributeGroup($(outputCol), featureAttributes)
+    newAttributeGroup.toStructField()
+  }
+
+  @Since("3.1.0")
+  override def copy(extra: ParamMap): ANOVASelectorModel = {
+    val copied = new ANOVASelectorModel(uid, selectedFeatures)
+      .setParent(parent)
+    copyValues(copied, extra)
+  }
+
+  @Since("3.1.0")
+  override def write: MLWriter = new ANOVASelectorModel.ANOVASelectorModelWriter(this)
+
+  @Since("3.1.0")
+  override def toString: String = {
+    s"ANOVASelectorModel: uid=$uid, numSelectedFeatures=${selectedFeatures.length}"
+  }
+
+  private[spark] def compressSparse(
+      indices: Array[Int],
+      values: Array[Double]): (Array[Int], Array[Double]) = {
+    val newValues = new ArrayBuilder.ofDouble
+    val newIndices = new ArrayBuilder.ofInt
+    var i = 0
+    var j = 0
+    var indicesIdx = 0
 
 Review comment:
   Nit: Can these just be `val` inside the while loop? or else I'm missing why they have to be outside

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org