You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by db...@apache.org on 2016/08/21 01:52:47 UTC

spark git commit: [SPARK-17090][ML] Make tree aggregation level in linear/logistic regression configurable

Repository: spark
Updated Branches:
  refs/heads/master 9f37d4eac -> 61ef74f22


[SPARK-17090][ML] Make tree aggregation level in linear/logistic regression configurable

## What changes were proposed in this pull request?

Linear/logistic regression use treeAggregate with default depth (always = 2) for collecting coefficient gradient updates to the driver. For high dimensional problems, this can cause OOM error on the driver. This patch makes it configurable to avoid this problem if users' input data has many features. It adds a HasTreeDepth API in `sharedParams.scala`, and extends it to both Linear regression and logistic regression in .ml

Author: hqzizania <hq...@gmail.com>

Closes #14717 from hqzizania/SPARK-17090.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61ef74f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61ef74f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61ef74f2

Branch: refs/heads/master
Commit: 61ef74f2272faa7ce8f2badc7e00039908e3551f
Parents: 9f37d4e
Author: hqzizania <hq...@gmail.com>
Authored: Sat Aug 20 18:52:44 2016 -0700
Committer: DB Tsai <db...@netflix.com>
Committed: Sat Aug 20 18:52:44 2016 -0700

----------------------------------------------------------------------
 .../ml/classification/LogisticRegression.scala  | 24 ++++++++++++++-----
 .../MultinomialLogisticRegression.scala         | 16 +++++++++++--
 .../ml/param/shared/SharedParamsCodeGen.scala   |  4 +++-
 .../spark/ml/param/shared/sharedParams.scala    | 25 ++++++++++++++++----
 .../spark/ml/regression/LinearRegression.scala  | 22 +++++++++++++----
 5 files changed, 74 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/61ef74f2/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index ea31c68..757d520 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -48,7 +48,7 @@ import org.apache.spark.storage.StorageLevel
  */
 private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams
   with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol
-  with HasStandardization with HasWeightCol with HasThreshold {
+  with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth {
 
   /**
    * Set threshold in binary classification, in range [0, 1].
@@ -256,6 +256,17 @@ class LogisticRegression @Since("1.2.0") (
   @Since("1.5.0")
   override def getThresholds: Array[Double] = super.getThresholds
 
+  /**
+   * Suggested depth for treeAggregate (>= 2).
+   * If the dimensions of features or the number of partitions are large,
+   * this param could be adjusted to a larger size.
+   * Default is 2.
+   * @group expertSetParam
+   */
+  @Since("2.1.0")
+  def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
+  setDefault(aggregationDepth -> 2)
+
   private var optInitialModel: Option[LogisticRegressionModel] = None
 
   /** @group setParam */
@@ -294,7 +305,8 @@ class LogisticRegression @Since("1.2.0") (
           (c1._1.merge(c2._1), c1._2.merge(c2._2))
 
       instances.treeAggregate(
-        new MultivariateOnlineSummarizer, new MultiClassSummarizer)(seqOp, combOp)
+        new MultivariateOnlineSummarizer, new MultiClassSummarizer
+      )(seqOp, combOp, $(aggregationDepth))
     }
 
     val histogram = labelSummarizer.histogram
@@ -358,7 +370,7 @@ class LogisticRegression @Since("1.2.0") (
 
         val bcFeaturesStd = instances.context.broadcast(featuresStd)
         val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept),
-          $(standardization), bcFeaturesStd, regParamL2, multinomial = false)
+          $(standardization), bcFeaturesStd, regParamL2, multinomial = false, $(aggregationDepth))
 
         val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
           new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
@@ -1331,8 +1343,8 @@ private class LogisticCostFun(
     standardization: Boolean,
     bcFeaturesStd: Broadcast[Array[Double]],
     regParamL2: Double,
-    multinomial: Boolean) extends DiffFunction[BDV[Double]] {
-
+    multinomial: Boolean,
+    aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
 
   override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
     val coeffs = Vectors.fromBreeze(coefficients)
@@ -1347,7 +1359,7 @@ private class LogisticCostFun(
       instances.treeAggregate(
         new LogisticAggregator(bcCoeffs, bcFeaturesStd, numClasses, fitIntercept,
           multinomial)
-      )(seqOp, combOp)
+      )(seqOp, combOp, aggregationDepth)
     }
 
     val totalGradientArray = logisticAggregator.gradient.toArray

http://git-wip-us.apache.org/repos/asf/spark/blob/61ef74f2/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala
index dfadd68..f85ac76 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultinomialLogisticRegression.scala
@@ -44,7 +44,8 @@ import org.apache.spark.storage.StorageLevel
  */
 private[classification] trait MultinomialLogisticRegressionParams
   extends ProbabilisticClassifierParams with HasRegParam with HasElasticNetParam with HasMaxIter
-    with HasFitIntercept with HasTol with HasStandardization with HasWeightCol {
+    with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
+    with HasAggregationDepth {
 
   /**
    * Set thresholds in multiclass (or binary) classification to adjust the probability of
@@ -163,6 +164,17 @@ class MultinomialLogisticRegression @Since("2.1.0") (
   @Since("2.1.0")
   override def setThresholds(value: Array[Double]): this.type = super.setThresholds(value)
 
+  /**
+   * Suggested depth for treeAggregate (>= 2).
+   * If the dimensions of features or the number of partitions are large,
+   * this param could be adjusted to a larger size.
+   * Default is 2.
+   * @group expertSetParam
+   */
+  @Since("2.1.0")
+  def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
+  setDefault(aggregationDepth -> 2)
+
   override protected[spark] def train(dataset: Dataset[_]): MultinomialLogisticRegressionModel = {
     val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
     val instances: RDD[Instance] =
@@ -245,7 +257,7 @@ class MultinomialLogisticRegression @Since("2.1.0") (
 
         val bcFeaturesStd = instances.context.broadcast(featuresStd)
         val costFun = new LogisticCostFun(instances, numClasses, $(fitIntercept),
-          $(standardization), bcFeaturesStd, regParamL2, multinomial = true)
+          $(standardization), bcFeaturesStd, regParamL2, multinomial = true, $(aggregationDepth))
 
         val optimizer = if ($(elasticNetParam) == 0.0 || $(regParam) == 0.0) {
           new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))

http://git-wip-us.apache.org/repos/asf/spark/blob/61ef74f2/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
index 4ab0c16..0f48a16 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
@@ -78,7 +78,9 @@ private[shared] object SharedParamsCodeGen {
       ParamDesc[String]("weightCol", "weight column name. If this is not set or empty, we treat " +
         "all instance weights as 1.0"),
       ParamDesc[String]("solver", "the solver algorithm for optimization. If this is not set or " +
-        "empty, default value is 'auto'", Some("\"auto\"")))
+        "empty, default value is 'auto'", Some("\"auto\"")),
+      ParamDesc[Int]("aggregationDepth", "suggested depth for treeAggregate (>= 2)", Some("2"),
+        isValid = "ParamValidators.gtEq(2)"))
 
     val code = genSharedParams(params)
     val file = "src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala"

http://git-wip-us.apache.org/repos/asf/spark/blob/61ef74f2/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
index 64d6af2..6803772 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
@@ -334,10 +334,10 @@ private[ml] trait HasElasticNetParam extends Params {
 private[ml] trait HasTol extends Params {
 
   /**
-   * Param for the convergence tolerance for iterative algorithms.
+   * Param for the convergence tolerance for iterative algorithms (>= 0).
    * @group param
    */
-  final val tol: DoubleParam = new DoubleParam(this, "tol", "the convergence tolerance for iterative algorithms")
+  final val tol: DoubleParam = new DoubleParam(this, "tol", "the convergence tolerance for iterative algorithms (>= 0)", ParamValidators.gtEq(0))
 
   /** @group getParam */
   final def getTol: Double = $(tol)
@@ -349,10 +349,10 @@ private[ml] trait HasTol extends Params {
 private[ml] trait HasStepSize extends Params {
 
   /**
-   * Param for Step size to be used for each iteration of optimization.
+   * Param for Step size to be used for each iteration of optimization (> 0).
    * @group param
    */
-  final val stepSize: DoubleParam = new DoubleParam(this, "stepSize", "Step size to be used for each iteration of optimization")
+  final val stepSize: DoubleParam = new DoubleParam(this, "stepSize", "Step size to be used for each iteration of optimization (> 0)", ParamValidators.gt(0))
 
   /** @group getParam */
   final def getStepSize: Double = $(stepSize)
@@ -389,4 +389,21 @@ private[ml] trait HasSolver extends Params {
   /** @group getParam */
   final def getSolver: String = $(solver)
 }
+
+/**
+ * Trait for shared param aggregationDepth (default: 2).
+ */
+private[ml] trait HasAggregationDepth extends Params {
+
+  /**
+   * Param for suggested depth for treeAggregate (>= 2).
+   * @group param
+   */
+  final val aggregationDepth: IntParam = new IntParam(this, "aggregationDepth", "suggested depth for treeAggregate (>= 2)", ParamValidators.gtEq(2))
+
+  setDefault(aggregationDepth, 2)
+
+  /** @group getParam */
+  final def getAggregationDepth: Int = $(aggregationDepth)
+}
 // scalastyle:on

http://git-wip-us.apache.org/repos/asf/spark/blob/61ef74f2/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index 76be420..b1bb9b9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -53,6 +53,7 @@ import org.apache.spark.storage.StorageLevel
 private[regression] trait LinearRegressionParams extends PredictorParams
     with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol
     with HasFitIntercept with HasStandardization with HasWeightCol with HasSolver
+    with HasAggregationDepth
 
 /**
  * Linear regression.
@@ -172,6 +173,17 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
   def setSolver(value: String): this.type = set(solver, value)
   setDefault(solver -> "auto")
 
+  /**
+   * Suggested depth for treeAggregate (>= 2).
+   * If the dimensions of features or the number of partitions are large,
+   * this param could be adjusted to a larger size.
+   * Default is 2.
+   * @group expertSetParam
+   */
+  @Since("2.1.0")
+  def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
+  setDefault(aggregationDepth -> 2)
+
   override protected def train(dataset: Dataset[_]): LinearRegressionModel = {
     // Extract the number of features before deciding optimization solver.
     val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size
@@ -230,7 +242,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
           (c1._1.merge(c2._1), c1._2.merge(c2._2))
 
       instances.treeAggregate(
-        new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer)(seqOp, combOp)
+        new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer
+      )(seqOp, combOp, $(aggregationDepth))
     }
 
     val yMean = ySummarizer.mean(0)
@@ -296,7 +309,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
     val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam
 
     val costFun = new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept),
-      $(standardization), bcFeaturesStd, bcFeaturesMean, effectiveL2RegParam)
+      $(standardization), bcFeaturesStd, bcFeaturesMean, effectiveL2RegParam, $(aggregationDepth))
 
     val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) {
       new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
@@ -1016,7 +1029,8 @@ private class LeastSquaresCostFun(
     standardization: Boolean,
     bcFeaturesStd: Broadcast[Array[Double]],
     bcFeaturesMean: Broadcast[Array[Double]],
-    effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] {
+    effectiveL2regParam: Double,
+    aggregationDepth: Int) extends DiffFunction[BDV[Double]] {
 
   override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
     val coeffs = Vectors.fromBreeze(coefficients)
@@ -1029,7 +1043,7 @@ private class LeastSquaresCostFun(
 
       instances.treeAggregate(
         new LeastSquaresAggregator(bcCoeffs, labelStd, labelMean, fitIntercept, bcFeaturesStd,
-          bcFeaturesMean))(seqOp, combOp)
+          bcFeaturesMean))(seqOp, combOp, aggregationDepth)
     }
 
     val totalGradientArray = leastSquaresAggregator.gradient.toArray


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org