You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2020/05/06 02:07:28 UTC

[spark] branch master updated: [SPARK-30642][ML][PYSPARK] LinearSVC blockify input vectors

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ebdf41d  [SPARK-30642][ML][PYSPARK] LinearSVC blockify input vectors
ebdf41d is described below

commit ebdf41dd698ce138d07f63b1fa3ffbcc392e7fff
Author: zhengruifeng <ru...@foxmail.com>
AuthorDate: Wed May 6 10:06:23 2020 +0800

    [SPARK-30642][ML][PYSPARK] LinearSVC blockify input vectors
    
    ### What changes were proposed in this pull request?
    1, add new param `blockSize`;
    2, add a new class InstanceBlock;
    3, **if `blockSize==1`, keep original behavior; if `blockSize>1`, stack input vectors to blocks (like ALS/MLP);**
    4, if `blockSize>1`, standardize the input outside of optimization procedure;
    
    ### Why are the changes needed?
    1, reduce RAM to persist traing dataset; (save about 40% RAM)
    2, use Level-2 BLAS routines; (4x ~ 5x faster on dataset `epsilon`)
    
    ### Does this PR introduce any user-facing change?
    Yes, a new param is added
    
    ### How was this patch tested?
    existing and added testsuites
    
    Closes #28349 from zhengruifeng/blockify_svc_II.
    
    Authored-by: zhengruifeng <ru...@foxmail.com>
    Signed-off-by: zhengruifeng <ru...@foxmail.com>
---
 .../apache/spark/serializer/KryoSerializer.scala   |   1 +
 .../org/apache/spark/ml/linalg/Matrices.scala      |  41 ++++
 .../apache/spark/ml/classification/LinearSVC.scala | 222 ++++++++++++++-------
 .../org/apache/spark/ml/feature/Instance.scala     |  91 ++++++++-
 .../ml/optim/aggregator/HingeAggregator.scala      | 110 +++++++++-
 .../org/apache/spark/ml/stat/Summarizer.scala      |   5 +-
 .../spark/ml/classification/LinearSVCSuite.scala   |  15 ++
 .../apache/spark/ml/feature/InstanceSuite.scala    |  31 +++
 .../ml/optim/aggregator/HingeAggregatorSuite.scala |  50 ++++-
 python/pyspark/ml/classification.py                |  23 ++-
 10 files changed, 498 insertions(+), 91 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index cdaab59..55ac2c4 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -502,6 +502,7 @@ private[serializer] object KryoSerializer {
       "org.apache.spark.ml.attribute.NumericAttribute",
 
       "org.apache.spark.ml.feature.Instance",
+      "org.apache.spark.ml.feature.InstanceBlock",
       "org.apache.spark.ml.feature.LabeledPoint",
       "org.apache.spark.ml.feature.OffsetInstance",
       "org.apache.spark.ml.linalg.DenseMatrix",
diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala
index 34e4366..1254ed7 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala
@@ -1008,6 +1008,47 @@ object SparseMatrix {
 @Since("2.0.0")
 object Matrices {
 
+  private[ml] def fromVectors(vectors: Seq[Vector]): Matrix = {
+    val numRows = vectors.length
+    val numCols = vectors.head.size
+    val denseSize = Matrices.getDenseSize(numCols, numRows)
+    val nnz = vectors.iterator.map(_.numNonzeros).sum
+    val sparseSize = Matrices.getSparseSize(nnz, numRows + 1)
+    if (denseSize < sparseSize) {
+      val values = Array.ofDim[Double](numRows * numCols)
+      var offset = 0
+      var j = 0
+      while (j < numRows) {
+        vectors(j).foreachNonZero { (i, v) =>
+          values(offset + i) = v
+        }
+        offset += numCols
+        j += 1
+      }
+      new DenseMatrix(numRows, numCols, values, true)
+    } else {
+      val colIndices = MArrayBuilder.make[Int]
+      val values = MArrayBuilder.make[Double]
+      val rowPtrs = MArrayBuilder.make[Int]
+      var rowPtr = 0
+      rowPtrs += 0
+      var j = 0
+      while (j < numRows) {
+        var nnz = 0
+        vectors(j).foreachNonZero { (i, v) =>
+          colIndices += i
+          values += v
+          nnz += 1
+        }
+        rowPtr += nnz
+        rowPtrs += rowPtr
+        j += 1
+      }
+      new SparseMatrix(numRows, numCols, rowPtrs.result(),
+        colIndices.result(), values.result(), true)
+    }
+  }
+
   /**
    * Creates a column-major dense matrix.
    *
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
index efe84f8..69c35a8 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
@@ -26,21 +26,23 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.Logging
+import org.apache.spark.ml.feature._
 import org.apache.spark.ml.linalg._
-import org.apache.spark.ml.optim.aggregator.HingeAggregator
+import org.apache.spark.ml.optim.aggregator._
 import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.stat._
 import org.apache.spark.ml.util._
 import org.apache.spark.ml.util.Instrumentation.instrumented
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.storage.StorageLevel
 
 /** Params for linear SVM Classifier. */
 private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
   with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
-  with HasAggregationDepth with HasThreshold {
+  with HasAggregationDepth with HasThreshold with HasBlockSize {
 
   /**
    * Param for threshold in binary classification prediction.
@@ -154,31 +156,65 @@ class LinearSVC @Since("2.2.0") (
   def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
   setDefault(aggregationDepth -> 2)
 
+  /**
+   * Set block size for stacking input data in matrices.
+   * If blockSize == 1, then stacking will be skipped, and each vector is treated individually;
+   * If blockSize &gt; 1, then vectors will be stacked to blocks, and high-level BLAS routines
+   * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV).
+   * Recommended size is between 10 and 1000. An appropriate choice of the block size depends
+   * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example,
+   * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads).
+   * Note that existing BLAS implementations are mainly optimized for dense matrices, if the
+   * input dataset is sparse, stacking may bring no performance gain, the worse is possible
+   * performance regression.
+   * Default is 1.
+   *
+   * @group expertSetParam
+   */
+  @Since("3.1.0")
+  def setBlockSize(value: Int): this.type = set(blockSize, value)
+  setDefault(blockSize -> 1)
+
   @Since("2.2.0")
   override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)
 
   override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr =>
-    val handlePersistence = dataset.storageLevel == StorageLevel.NONE
-
-    val instances = extractInstances(dataset)
-    if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
-
     instr.logPipelineStage(this)
     instr.logDataset(dataset)
     instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
-      regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth)
+      regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)
+
+    val instances = extractInstances(dataset)
+      .setName("training instances")
 
-    val (summarizer, labelSummarizer) =
+    val (summarizer, labelSummarizer) = if ($(blockSize) == 1) {
+      if (dataset.storageLevel == StorageLevel.NONE) {
+        instances.persist(StorageLevel.MEMORY_AND_DISK)
+      }
       Summarizer.getClassificationSummarizers(instances, $(aggregationDepth))
-    instr.logNumExamples(summarizer.count)
-    instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
-    instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)
-    instr.logSumOfWeights(summarizer.weightSum)
+    } else {
+      // instances will be standardized and converted to blocks, so no need to cache instances.
+      Summarizer.getClassificationSummarizers(instances, $(aggregationDepth),
+        Seq("mean", "std", "count", "numNonZeros"))
+    }
 
     val histogram = labelSummarizer.histogram
     val numInvalid = labelSummarizer.countInvalid
     val numFeatures = summarizer.mean.size
-    val numFeaturesPlusIntercept = if (getFitIntercept) numFeatures + 1 else numFeatures
+
+    instr.logNumExamples(summarizer.count)
+    instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
+    instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)
+    instr.logSumOfWeights(summarizer.weightSum)
+    if ($(blockSize) > 1) {
+      val scale = 1.0 / summarizer.count / numFeatures
+      val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum
+      instr.logNamedValue("sparsity", sparsity.toString)
+      if (sparsity > 0.5) {
+        instr.logWarning(s"sparsity of input dataset is $sparsity, " +
+          s"which may hurt performance in high-level BLAS.")
+      }
+    }
 
     val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match {
       case Some(n: Int) =>
@@ -192,77 +228,113 @@ class LinearSVC @Since("2.2.0") (
     instr.logNumClasses(numClasses)
     instr.logNumFeatures(numFeatures)
 
-    val (coefficientVector, interceptVector, objectiveHistory) = {
-      if (numInvalid != 0) {
-        val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " +
-          s"Found $numInvalid invalid labels."
-        instr.logError(msg)
-        throw new SparkException(msg)
-      }
+    if (numInvalid != 0) {
+      val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " +
+        s"Found $numInvalid invalid labels."
+      instr.logError(msg)
+      throw new SparkException(msg)
+    }
 
-      val featuresStd = summarizer.std.toArray
-      val getFeaturesStd = (j: Int) => featuresStd(j)
-      val regParamL2 = $(regParam)
-      val bcFeaturesStd = instances.context.broadcast(featuresStd)
-      val regularization = if (regParamL2 != 0.0) {
-        val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
-        Some(new L2Regularization(regParamL2, shouldApply,
-          if ($(standardization)) None else Some(getFeaturesStd)))
-      } else {
-        None
-      }
+    val featuresStd = summarizer.std.toArray
+    val getFeaturesStd = (j: Int) => featuresStd(j)
+    val regularization = if ($(regParam) != 0.0) {
+      val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
+      Some(new L2Regularization($(regParam), shouldApply,
+        if ($(standardization)) None else Some(getFeaturesStd)))
+    } else None
+
+    def regParamL1Fun = (index: Int) => 0.0
+    val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
+
+    /*
+       The coefficients are trained in the scaled space; we're converting them back to
+       the original space.
+       Note that the intercept in scaled space and original space is the same;
+       as a result, no scaling is needed.
+     */
+    val (rawCoefficients, objectiveHistory) = if ($(blockSize) == 1) {
+      trainOnRows(instances, featuresStd, regularization, optimizer)
+    } else {
+      trainOnBlocks(instances, featuresStd, regularization, optimizer)
+    }
+    if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
 
-      val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
-      val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
-        $(aggregationDepth))
+    if (rawCoefficients == null) {
+      val msg = s"${optimizer.getClass.getName} failed."
+      instr.logError(msg)
+      throw new SparkException(msg)
+    }
 
-      def regParamL1Fun = (index: Int) => 0D
-      val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
-      val initialCoefWithIntercept = Vectors.zeros(numFeaturesPlusIntercept)
+    val coefficientArray = Array.tabulate(numFeatures) { i =>
+      if (featuresStd(i) != 0.0) rawCoefficients(i) / featuresStd(i) else 0.0
+    }
+    val intercept = if ($(fitIntercept)) rawCoefficients.last else 0.0
+    copyValues(new LinearSVCModel(uid, Vectors.dense(coefficientArray), intercept))
+  }
 
-      val states = optimizer.iterations(new CachedDiffFunction(costFun),
-        initialCoefWithIntercept.asBreeze.toDenseVector)
+  private def trainOnRows(
+      instances: RDD[Instance],
+      featuresStd: Array[Double],
+      regularization: Option[L2Regularization],
+      optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
+    val numFeatures = featuresStd.length
+    val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures
+
+    val bcFeaturesStd = instances.context.broadcast(featuresStd)
+    val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
+    val costFun = new RDDLossFunction(instances, getAggregatorFunc,
+      regularization, $(aggregationDepth))
+
+    val states = optimizer.iterations(new CachedDiffFunction(costFun),
+      Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)
+
+    val arrayBuilder = mutable.ArrayBuilder.make[Double]
+    var state: optimizer.State = null
+    while (states.hasNext) {
+      state = states.next()
+      arrayBuilder += state.adjustedValue
+    }
+    bcFeaturesStd.destroy()
 
-      val scaledObjectiveHistory = mutable.ArrayBuilder.make[Double]
-      var state: optimizer.State = null
-      while (states.hasNext) {
-        state = states.next()
-        scaledObjectiveHistory += state.adjustedValue
-      }
+    (if (state != null) state.x.toArray else null, arrayBuilder.result)
+  }
 
-      bcFeaturesStd.destroy()
-      if (state == null) {
-        val msg = s"${optimizer.getClass.getName} failed."
-        instr.logError(msg)
-        throw new SparkException(msg)
-      }
+  private def trainOnBlocks(
+      instances: RDD[Instance],
+      featuresStd: Array[Double],
+      regularization: Option[L2Regularization],
+      optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
+    val numFeatures = featuresStd.length
+    val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures
 
-      /*
-         The coefficients are trained in the scaled space; we're converting them back to
-         the original space.
-         Note that the intercept in scaled space and original space is the same;
-         as a result, no scaling is needed.
-       */
-      val rawCoefficients = state.x.toArray
-      val coefficientArray = Array.tabulate(numFeatures) { i =>
-        if (featuresStd(i) != 0.0) {
-          rawCoefficients(i) / featuresStd(i)
-        } else {
-          0.0
-        }
-      }
+    val bcFeaturesStd = instances.context.broadcast(featuresStd)
 
-      val intercept = if ($(fitIntercept)) {
-        rawCoefficients(numFeaturesPlusIntercept - 1)
-      } else {
-        0.0
-      }
-      (Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result())
+    val standardized = instances.mapPartitions { iter =>
+      val inverseStd = bcFeaturesStd.value.map { std => if (std != 0) 1.0 / std else 0.0 }
+      val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true)
+      iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) }
     }
+    val blocks = InstanceBlock.blokify(standardized, $(blockSize))
+      .persist(StorageLevel.MEMORY_AND_DISK)
+      .setName(s"training dataset (blockSize=${$(blockSize)})")
+
+    val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_)
+    val costFun = new RDDLossFunction(blocks, getAggregatorFunc,
+      regularization, $(aggregationDepth))
+
+    val states = optimizer.iterations(new CachedDiffFunction(costFun),
+      Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)
+
+    val arrayBuilder = mutable.ArrayBuilder.make[Double]
+    var state: optimizer.State = null
+    while (states.hasNext) {
+      state = states.next()
+      arrayBuilder += state.adjustedValue
+    }
+    blocks.unpersist()
+    bcFeaturesStd.destroy()
 
-    if (handlePersistence) instances.unpersist()
-
-    copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
+    (if (state != null) state.x.toArray else null, arrayBuilder.result)
   }
 }
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
index 11d0c46..db5f88d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.ml.feature
 
-import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.linalg._
+import org.apache.spark.rdd.RDD
 
 /**
  * Class that represents an instance of weighted data point with label and features.
@@ -28,6 +29,94 @@ import org.apache.spark.ml.linalg.Vector
  */
 private[spark] case class Instance(label: Double, weight: Double, features: Vector)
 
+
+/**
+ * Class that represents an block of instance.
+ * If all weights are 1, then an empty array is stored.
+ */
+private[spark] case class InstanceBlock(
+    labels: Array[Double],
+    weights: Array[Double],
+    matrix: Matrix) {
+  require(labels.length == matrix.numRows)
+  require(matrix.isTransposed)
+  if (weights.nonEmpty) {
+    require(labels.length == weights.length)
+  }
+
+  def size: Int = labels.length
+
+  def numFeatures: Int = matrix.numCols
+
+  def instanceIterator: Iterator[Instance] = {
+    if (weights.nonEmpty) {
+      labels.iterator.zip(weights.iterator).zip(matrix.rowIter)
+        .map { case ((label, weight), vec) => Instance(label, weight, vec) }
+    } else {
+      labels.iterator.zip(matrix.rowIter)
+        .map { case (label, vec) => Instance(label, 1.0, vec) }
+    }
+  }
+
+  def getLabel(i: Int): Double = labels(i)
+
+  def labelIter: Iterator[Double] = labels.iterator
+
+  @transient lazy val getWeight: Int => Double = {
+    if (weights.nonEmpty) {
+      (i: Int) => weights(i)
+    } else {
+      (i: Int) => 1.0
+    }
+  }
+
+  def weightIter: Iterator[Double] = {
+    if (weights.nonEmpty) {
+      weights.iterator
+    } else {
+      Iterator.fill(size)(1.0)
+    }
+  }
+
+  // directly get the non-zero iterator of i-th row vector without array copy or slice
+  @transient lazy val getNonZeroIter: Int => Iterator[(Int, Double)] = {
+    matrix match {
+      case dm: DenseMatrix =>
+        (i: Int) =>
+          val start = numFeatures * i
+          Iterator.tabulate(numFeatures)(j =>
+            (j, dm.values(start + j))
+          ).filter(_._2 != 0)
+      case sm: SparseMatrix =>
+        (i: Int) =>
+          val start = sm.colPtrs(i)
+          val end = sm.colPtrs(i + 1)
+          Iterator.tabulate(end - start)(j =>
+            (sm.rowIndices(start + j), sm.values(start + j))
+          ).filter(_._2 != 0)
+    }
+  }
+}
+
+private[spark] object InstanceBlock {
+
+  def fromInstances(instances: Seq[Instance]): InstanceBlock = {
+    val labels = instances.map(_.label).toArray
+    val weights = if (instances.exists(_.weight != 1)) {
+      instances.map(_.weight).toArray
+    } else {
+      Array.emptyDoubleArray
+    }
+    val matrix = Matrices.fromVectors(instances.map(_.features))
+    new InstanceBlock(labels, weights, matrix)
+  }
+
+  def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
+    instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
+  }
+}
+
+
 /**
  * Case class that represents an instance of data point with
  * label, weight, offset and features.
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
index b0906f1..1525bb9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.ml.optim.aggregator
 
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.feature.{Instance, InstanceBlock}
 import org.apache.spark.ml.linalg._
 
 /**
@@ -39,8 +39,8 @@ private[ml] class HingeAggregator(
     fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
   extends DifferentiableLossAggregator[Instance, HingeAggregator] {
 
-  private val numFeatures: Int = bcFeaturesStd.value.length
-  private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures
+  private val numFeatures = bcFeaturesStd.value.length
+  private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
   @transient private lazy val coefficientsArray = bcCoefficients.value match {
     case DenseVector(values) => values
     case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
@@ -103,3 +103,107 @@ private[ml] class HingeAggregator(
     }
   }
 }
+
+
+/**
+ * BlockHingeAggregator computes the gradient and loss for Hinge loss function as used in
+ * binary classification for blocks in sparse or dense matrix in an online fashion.
+ *
+ * Two BlockHingeAggregators can be merged together to have a summary of loss and gradient of
+ * the corresponding joint dataset.
+ *
+ * NOTE: The feature values are expected to be standardized before computation.
+ *
+ * @param bcCoefficients The coefficients corresponding to the features.
+ * @param fitIntercept Whether to fit an intercept term.
+ */
+private[ml] class BlockHingeAggregator(
+    fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
+  extends DifferentiableLossAggregator[InstanceBlock, BlockHingeAggregator] {
+
+  protected override val dim: Int = bcCoefficients.value.size
+  private val numFeatures = if (fitIntercept) dim - 1 else dim
+
+  @transient private lazy val coefficientsArray = bcCoefficients.value match {
+    case DenseVector(values) => values
+    case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
+      s" but got type ${bcCoefficients.value.getClass}.")
+  }
+
+  @transient private lazy val linear = {
+    val linear = if (fitIntercept) coefficientsArray.take(numFeatures) else coefficientsArray
+    Vectors.dense(linear).toDense
+  }
+
+  /**
+   * Add a new training instance block to this HingeAggregator, and update the loss and gradient
+   * of the objective function.
+   *
+   * @param block The InstanceBlock to be added.
+   * @return This HingeAggregator object.
+   */
+  def add(block: InstanceBlock): this.type = {
+    require(block.matrix.isTransposed)
+    require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
+      s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
+    require(block.weightIter.forall(_ >= 0),
+      s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")
+
+    if (block.weightIter.forall(_ == 0)) return this
+    val size = block.size
+
+    // vec here represents dotProducts
+    val vec = if (fitIntercept) {
+      Vectors.dense(Array.fill(size)(coefficientsArray.last)).toDense
+    } else {
+      Vectors.zeros(size).toDense
+    }
+    BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
+
+    // in-place convert dotProducts to gradient scales
+    // then, vec represents gradient scales
+    var i = 0
+    var interceptGradSum = 0.0
+    while (i < size) {
+      val weight = block.getWeight(i)
+      if (weight > 0) {
+        weightSum += weight
+        // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
+        // Therefore the gradient is -(2y - 1)*x
+        val label = block.getLabel(i)
+        val labelScaled = label + label - 1.0
+        val loss = (1.0 - labelScaled * vec.values(i)) * weight
+        if (loss > 0) {
+          lossSum += loss
+          val gradScale = -labelScaled * weight
+          vec.values(i) = gradScale
+          if (fitIntercept) interceptGradSum += gradScale
+        } else { vec.values(i) = 0.0 }
+      } else { vec.values(i) = 0.0 }
+      i += 1
+    }
+
+    // predictions are all correct, no gradient signal
+    if (vec.values.forall(_ == 0)) return this
+
+    block.matrix match {
+      case dm: DenseMatrix =>
+        BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols,
+          vec.values, 1, 1.0, gradientSumArray, 1)
+        if (fitIntercept) gradientSumArray(numFeatures) += interceptGradSum
+
+      case sm: SparseMatrix if fitIntercept =>
+        val linearGradSumVec = Vectors.zeros(numFeatures).toDense
+        BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
+        BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
+          gradientSumArray, 1)
+        gradientSumArray(numFeatures) += interceptGradSum
+
+      case sm: SparseMatrix if !fitIntercept =>
+        val gradSumVec = new DenseVector(gradientSumArray)
+        BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)
+    }
+
+    this
+  }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala
index 1183041..4230b49 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala
@@ -225,9 +225,10 @@ object Summarizer extends Logging {
   /** Get classification feature and label summarizers for provided data. */
   private[ml] def getClassificationSummarizers(
       instances: RDD[Instance],
-      aggregationDepth: Int = 2): (SummarizerBuffer, MultiClassSummarizer) = {
+      aggregationDepth: Int = 2,
+      requested: Seq[String] = Seq("mean", "std", "count")) = {
     instances.treeAggregate(
-      (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
+      (Summarizer.createSummarizerBuffer(requested: _*), new MultiClassSummarizer))(
       seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) =>
         (c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight)),
       combOp = (c1: (SummarizerBuffer, MultiClassSummarizer),
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
index c2072ce..579d6b1 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
@@ -207,6 +207,21 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest {
       dataset.as[LabeledPoint], estimator, modelEquals, 42L)
   }
 
+  test("LinearSVC on blocks") {
+    for (dataset <- Seq(smallBinaryDataset, smallSparseBinaryDataset);
+         fitIntercept <- Seq(true, false)) {
+      val lsvc = new LinearSVC()
+        .setFitIntercept(fitIntercept)
+        .setMaxIter(5)
+      val model = lsvc.fit(dataset)
+      Seq(4, 16, 64).foreach { blockSize =>
+        val model2 = lsvc.setBlockSize(blockSize).fit(dataset)
+        assert(model.intercept ~== model2.intercept relTol 1e-9)
+        assert(model.coefficients ~== model2.coefficients relTol 1e-9)
+      }
+    }
+  }
+
   test("prediction on single instance") {
     val trainer = new LinearSVC()
     val model = trainer.fit(smallBinaryDataset)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
index 5a74490..d780bdf 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
@@ -42,5 +42,36 @@ class InstanceSuite extends SparkFunSuite{
       val o2 = ser.deserialize[OffsetInstance](ser.serialize(o))
       assert(o === o2)
     }
+
+    val block1 = InstanceBlock.fromInstances(Seq(instance1))
+    val block2 = InstanceBlock.fromInstances(Seq(instance1, instance2))
+    Seq(block1, block2).foreach { o =>
+      val o2 = ser.deserialize[InstanceBlock](ser.serialize(o))
+      assert(o.labels === o2.labels)
+      assert(o.weights === o2.weights)
+      assert(o.matrix === o2.matrix)
+    }
+  }
+
+  test("InstanceBlock: check correctness") {
+    val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0))
+    val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse)
+    val instances = Seq(instance1, instance2)
+
+    val block = InstanceBlock.fromInstances(instances)
+    assert(block.size === 2)
+    assert(block.numFeatures === 2)
+    block.instanceIterator.zipWithIndex.foreach {
+      case (instance, i) =>
+        assert(instance.label === instances(i).label)
+        assert(instance.weight === instances(i).weight)
+        assert(instance.features.toArray === instances(i).features.toArray)
+    }
+    Seq(0, 1).foreach { i =>
+      val nzIter = block.getNonZeroIter(i)
+      val vec = Vectors.sparse(2, nzIter.toSeq)
+      assert(vec.toArray === instances(i).features.toArray)
+    }
   }
+
 }
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
index 16d27a9..51a1edd 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.ml.optim.aggregator
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.feature.{Instance, InstanceBlock}
 import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
 import org.apache.spark.ml.stat.Summarizer
 import org.apache.spark.ml.util.TestingUtils._
@@ -28,6 +28,7 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
   @transient var instances: Array[Instance] = _
   @transient var instancesConstantFeature: Array[Instance] = _
   @transient var instancesConstantFeatureFiltered: Array[Instance] = _
+  @transient var standardizedInstances: Array[Instance] = _
 
   override def beforeAll(): Unit = {
     super.beforeAll()
@@ -46,6 +47,7 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
       Instance(1.0, 0.5, Vectors.dense(1.0)),
       Instance(2.0, 0.3, Vectors.dense(0.5))
     )
+    standardizedInstances = standardize(instances)
   }
 
    /** Get summary statistics for some data and create a new HingeAggregator. */
@@ -61,6 +63,29 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
     new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients)
   }
 
+  private def standardize(instances: Array[Instance]): Array[Instance] = {
+    val (featuresSummarizer, _) =
+      Summarizer.getClassificationSummarizers(sc.parallelize(instances))
+    val stdArray = featuresSummarizer.std.toArray
+    val numFeatures = stdArray.length
+    instances.map { case Instance(label, weight, features) =>
+      val standardized = Array.ofDim[Double](numFeatures)
+      features.foreachNonZero { (i, v) =>
+        val std = stdArray(i)
+        if (std != 0) standardized(i) = v / std
+      }
+      Instance(label, weight, Vectors.dense(standardized).compressed)
+    }
+  }
+
+   /** Get summary statistics for some data and create a new BlockHingeAggregator. */
+  private def getNewBlockAggregator(
+      coefficients: Vector,
+      fitIntercept: Boolean): BlockHingeAggregator = {
+    val bcCoefficients = spark.sparkContext.broadcast(coefficients)
+    new BlockHingeAggregator(fitIntercept)(bcCoefficients)
+  }
+
   test("aggregator add method input size") {
     val coefArray = Array(1.0, 2.0)
     val interceptArray = Array(2.0)
@@ -139,8 +164,26 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
     }
     val gradient = Vectors.dense((gradientCoef ++ Array(gradientIntercept)).map(_ / weightSum))
 
-    assert(loss ~== agg.loss relTol 0.01)
-    assert(gradient ~== agg.gradient relTol 0.01)
+    assert(loss ~== agg.loss relTol 1e-9)
+    assert(gradient ~== agg.gradient relTol 1e-9)
+
+    Seq(1, 2, 4).foreach { blockSize =>
+      val blocks1 = standardizedInstances
+        .grouped(blockSize)
+        .map(seq => InstanceBlock.fromInstances(seq))
+        .toArray
+      val blocks2 = blocks1.map { block =>
+        new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor)
+      }
+
+      Seq(blocks1, blocks2).foreach { blocks =>
+        val blockAgg = getNewBlockAggregator(Vectors.dense(coefArray ++ Array(intercept)),
+          fitIntercept = true)
+        blocks.foreach(blockAgg.add)
+        assert(loss ~== blockAgg.loss relTol 1e-9)
+        assert(gradient ~== blockAgg.gradient relTol 1e-9)
+      }
+    }
   }
 
   test("check with zero standard deviation") {
@@ -158,5 +201,4 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
     assert(aggConstantFeatureBinary.gradient(0) === 0.0)
     assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0))
   }
-
 }
diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py
index 0d88aa8..318ae7a 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -241,7 +241,8 @@ class _JavaProbabilisticClassificationModel(ProbabilisticClassificationModel,
 
 
 class _LinearSVCParams(_ClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol,
-                       HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold):
+                       HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold,
+                       HasBlockSize):
     """
     Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`.
 
@@ -290,6 +291,8 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
     LinearSVCModel...
     >>> model.getThreshold()
     0.5
+    >>> model.getBlockSize()
+    1
     >>> model.coefficients
     DenseVector([0.0, -0.2792, -0.1833])
     >>> model.intercept
@@ -328,18 +331,19 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
     def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
                  maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction",
                  fitIntercept=True, standardization=True, threshold=0.0, weightCol=None,
-                 aggregationDepth=2):
+                 aggregationDepth=2, blockSize=1):
         """
         __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
                  maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \
                  fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \
-                 aggregationDepth=2):
+                 aggregationDepth=2, blockSize=1):
         """
         super(LinearSVC, self).__init__()
         self._java_obj = self._new_java_obj(
             "org.apache.spark.ml.classification.LinearSVC", self.uid)
         self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True,
-                         standardization=True, threshold=0.0, aggregationDepth=2)
+                         standardization=True, threshold=0.0, aggregationDepth=2,
+                         blockSize=1)
         kwargs = self._input_kwargs
         self.setParams(**kwargs)
 
@@ -348,12 +352,12 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
     def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
                   maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction",
                   fitIntercept=True, standardization=True, threshold=0.0, weightCol=None,
-                  aggregationDepth=2):
+                  aggregationDepth=2, blockSize=1):
         """
         setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
                   maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \
                   fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \
-                  aggregationDepth=2):
+                  aggregationDepth=2, blockSize=1):
         Sets params for Linear SVM Classifier.
         """
         kwargs = self._input_kwargs
@@ -418,6 +422,13 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
         """
         return self._set(aggregationDepth=value)
 
+    @since("3.1.0")
+    def setBlockSize(self, value):
+        """
+        Sets the value of :py:attr:`blockSize`.
+        """
+        return self._set(blockSize=value)
+
 
 class LinearSVCModel(_JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable):
     """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org