You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2020/05/06 02:07:28 UTC
[spark] branch master updated: [SPARK-30642][ML][PYSPARK] LinearSVC
blockify input vectors
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ebdf41d [SPARK-30642][ML][PYSPARK] LinearSVC blockify input vectors
ebdf41d is described below
commit ebdf41dd698ce138d07f63b1fa3ffbcc392e7fff
Author: zhengruifeng <ru...@foxmail.com>
AuthorDate: Wed May 6 10:06:23 2020 +0800
[SPARK-30642][ML][PYSPARK] LinearSVC blockify input vectors
### What changes were proposed in this pull request?
1, add new param `blockSize`;
2, add a new class InstanceBlock;
3, **if `blockSize==1`, keep original behavior; if `blockSize>1`, stack input vectors to blocks (like ALS/MLP);**
4, if `blockSize>1`, standardize the input outside of optimization procedure;
### Why are the changes needed?
1, reduce RAM to persist traing dataset; (save about 40% RAM)
2, use Level-2 BLAS routines; (4x ~ 5x faster on dataset `epsilon`)
### Does this PR introduce any user-facing change?
Yes, a new param is added
### How was this patch tested?
existing and added testsuites
Closes #28349 from zhengruifeng/blockify_svc_II.
Authored-by: zhengruifeng <ru...@foxmail.com>
Signed-off-by: zhengruifeng <ru...@foxmail.com>
---
.../apache/spark/serializer/KryoSerializer.scala | 1 +
.../org/apache/spark/ml/linalg/Matrices.scala | 41 ++++
.../apache/spark/ml/classification/LinearSVC.scala | 222 ++++++++++++++-------
.../org/apache/spark/ml/feature/Instance.scala | 91 ++++++++-
.../ml/optim/aggregator/HingeAggregator.scala | 110 +++++++++-
.../org/apache/spark/ml/stat/Summarizer.scala | 5 +-
.../spark/ml/classification/LinearSVCSuite.scala | 15 ++
.../apache/spark/ml/feature/InstanceSuite.scala | 31 +++
.../ml/optim/aggregator/HingeAggregatorSuite.scala | 50 ++++-
python/pyspark/ml/classification.py | 23 ++-
10 files changed, 498 insertions(+), 91 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index cdaab59..55ac2c4 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -502,6 +502,7 @@ private[serializer] object KryoSerializer {
"org.apache.spark.ml.attribute.NumericAttribute",
"org.apache.spark.ml.feature.Instance",
+ "org.apache.spark.ml.feature.InstanceBlock",
"org.apache.spark.ml.feature.LabeledPoint",
"org.apache.spark.ml.feature.OffsetInstance",
"org.apache.spark.ml.linalg.DenseMatrix",
diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala
index 34e4366..1254ed7 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala
@@ -1008,6 +1008,47 @@ object SparseMatrix {
@Since("2.0.0")
object Matrices {
+ private[ml] def fromVectors(vectors: Seq[Vector]): Matrix = {
+ val numRows = vectors.length
+ val numCols = vectors.head.size
+ val denseSize = Matrices.getDenseSize(numCols, numRows)
+ val nnz = vectors.iterator.map(_.numNonzeros).sum
+ val sparseSize = Matrices.getSparseSize(nnz, numRows + 1)
+ if (denseSize < sparseSize) {
+ val values = Array.ofDim[Double](numRows * numCols)
+ var offset = 0
+ var j = 0
+ while (j < numRows) {
+ vectors(j).foreachNonZero { (i, v) =>
+ values(offset + i) = v
+ }
+ offset += numCols
+ j += 1
+ }
+ new DenseMatrix(numRows, numCols, values, true)
+ } else {
+ val colIndices = MArrayBuilder.make[Int]
+ val values = MArrayBuilder.make[Double]
+ val rowPtrs = MArrayBuilder.make[Int]
+ var rowPtr = 0
+ rowPtrs += 0
+ var j = 0
+ while (j < numRows) {
+ var nnz = 0
+ vectors(j).foreachNonZero { (i, v) =>
+ colIndices += i
+ values += v
+ nnz += 1
+ }
+ rowPtr += nnz
+ rowPtrs += rowPtr
+ j += 1
+ }
+ new SparseMatrix(numRows, numCols, rowPtrs.result(),
+ colIndices.result(), values.result(), true)
+ }
+ }
+
/**
* Creates a column-major dense matrix.
*
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
index efe84f8..69c35a8 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
@@ -26,21 +26,23 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
+import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg._
-import org.apache.spark.ml.optim.aggregator.HingeAggregator
+import org.apache.spark.ml.optim.aggregator._
import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.stat._
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.Instrumentation.instrumented
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.storage.StorageLevel
/** Params for linear SVM Classifier. */
private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
- with HasAggregationDepth with HasThreshold {
+ with HasAggregationDepth with HasThreshold with HasBlockSize {
/**
* Param for threshold in binary classification prediction.
@@ -154,31 +156,65 @@ class LinearSVC @Since("2.2.0") (
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
setDefault(aggregationDepth -> 2)
+ /**
+ * Set block size for stacking input data in matrices.
+ * If blockSize == 1, then stacking will be skipped, and each vector is treated individually;
+ * If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines
+ * will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV).
+ * Recommended size is between 10 and 1000. An appropriate choice of the block size depends
+ * on the sparsity and dim of input datasets, the underlying BLAS implementation (for example,
+ * f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads).
+ * Note that existing BLAS implementations are mainly optimized for dense matrices, if the
+ * input dataset is sparse, stacking may bring no performance gain, the worse is possible
+ * performance regression.
+ * Default is 1.
+ *
+ * @group expertSetParam
+ */
+ @Since("3.1.0")
+ def setBlockSize(value: Int): this.type = set(blockSize, value)
+ setDefault(blockSize -> 1)
+
@Since("2.2.0")
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)
override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr =>
- val handlePersistence = dataset.storageLevel == StorageLevel.NONE
-
- val instances = extractInstances(dataset)
- if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
-
instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
- regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth)
+ regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)
+
+ val instances = extractInstances(dataset)
+ .setName("training instances")
- val (summarizer, labelSummarizer) =
+ val (summarizer, labelSummarizer) = if ($(blockSize) == 1) {
+ if (dataset.storageLevel == StorageLevel.NONE) {
+ instances.persist(StorageLevel.MEMORY_AND_DISK)
+ }
Summarizer.getClassificationSummarizers(instances, $(aggregationDepth))
- instr.logNumExamples(summarizer.count)
- instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
- instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)
- instr.logSumOfWeights(summarizer.weightSum)
+ } else {
+ // instances will be standardized and converted to blocks, so no need to cache instances.
+ Summarizer.getClassificationSummarizers(instances, $(aggregationDepth),
+ Seq("mean", "std", "count", "numNonZeros"))
+ }
val histogram = labelSummarizer.histogram
val numInvalid = labelSummarizer.countInvalid
val numFeatures = summarizer.mean.size
- val numFeaturesPlusIntercept = if (getFitIntercept) numFeatures + 1 else numFeatures
+
+ instr.logNumExamples(summarizer.count)
+ instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
+ instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)
+ instr.logSumOfWeights(summarizer.weightSum)
+ if ($(blockSize) > 1) {
+ val scale = 1.0 / summarizer.count / numFeatures
+ val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum
+ instr.logNamedValue("sparsity", sparsity.toString)
+ if (sparsity > 0.5) {
+ instr.logWarning(s"sparsity of input dataset is $sparsity, " +
+ s"which may hurt performance in high-level BLAS.")
+ }
+ }
val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match {
case Some(n: Int) =>
@@ -192,77 +228,113 @@ class LinearSVC @Since("2.2.0") (
instr.logNumClasses(numClasses)
instr.logNumFeatures(numFeatures)
- val (coefficientVector, interceptVector, objectiveHistory) = {
- if (numInvalid != 0) {
- val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " +
- s"Found $numInvalid invalid labels."
- instr.logError(msg)
- throw new SparkException(msg)
- }
+ if (numInvalid != 0) {
+ val msg = s"Classification labels should be in [0 to ${numClasses - 1}]. " +
+ s"Found $numInvalid invalid labels."
+ instr.logError(msg)
+ throw new SparkException(msg)
+ }
- val featuresStd = summarizer.std.toArray
- val getFeaturesStd = (j: Int) => featuresStd(j)
- val regParamL2 = $(regParam)
- val bcFeaturesStd = instances.context.broadcast(featuresStd)
- val regularization = if (regParamL2 != 0.0) {
- val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
- Some(new L2Regularization(regParamL2, shouldApply,
- if ($(standardization)) None else Some(getFeaturesStd)))
- } else {
- None
- }
+ val featuresStd = summarizer.std.toArray
+ val getFeaturesStd = (j: Int) => featuresStd(j)
+ val regularization = if ($(regParam) != 0.0) {
+ val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
+ Some(new L2Regularization($(regParam), shouldApply,
+ if ($(standardization)) None else Some(getFeaturesStd)))
+ } else None
+
+ def regParamL1Fun = (index: Int) => 0.0
+ val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
+
+ /*
+ The coefficients are trained in the scaled space; we're converting them back to
+ the original space.
+ Note that the intercept in scaled space and original space is the same;
+ as a result, no scaling is needed.
+ */
+ val (rawCoefficients, objectiveHistory) = if ($(blockSize) == 1) {
+ trainOnRows(instances, featuresStd, regularization, optimizer)
+ } else {
+ trainOnBlocks(instances, featuresStd, regularization, optimizer)
+ }
+ if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
- val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
- val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
- $(aggregationDepth))
+ if (rawCoefficients == null) {
+ val msg = s"${optimizer.getClass.getName} failed."
+ instr.logError(msg)
+ throw new SparkException(msg)
+ }
- def regParamL1Fun = (index: Int) => 0D
- val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
- val initialCoefWithIntercept = Vectors.zeros(numFeaturesPlusIntercept)
+ val coefficientArray = Array.tabulate(numFeatures) { i =>
+ if (featuresStd(i) != 0.0) rawCoefficients(i) / featuresStd(i) else 0.0
+ }
+ val intercept = if ($(fitIntercept)) rawCoefficients.last else 0.0
+ copyValues(new LinearSVCModel(uid, Vectors.dense(coefficientArray), intercept))
+ }
- val states = optimizer.iterations(new CachedDiffFunction(costFun),
- initialCoefWithIntercept.asBreeze.toDenseVector)
+ private def trainOnRows(
+ instances: RDD[Instance],
+ featuresStd: Array[Double],
+ regularization: Option[L2Regularization],
+ optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
+ val numFeatures = featuresStd.length
+ val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures
+
+ val bcFeaturesStd = instances.context.broadcast(featuresStd)
+ val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
+ val costFun = new RDDLossFunction(instances, getAggregatorFunc,
+ regularization, $(aggregationDepth))
+
+ val states = optimizer.iterations(new CachedDiffFunction(costFun),
+ Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)
+
+ val arrayBuilder = mutable.ArrayBuilder.make[Double]
+ var state: optimizer.State = null
+ while (states.hasNext) {
+ state = states.next()
+ arrayBuilder += state.adjustedValue
+ }
+ bcFeaturesStd.destroy()
- val scaledObjectiveHistory = mutable.ArrayBuilder.make[Double]
- var state: optimizer.State = null
- while (states.hasNext) {
- state = states.next()
- scaledObjectiveHistory += state.adjustedValue
- }
+ (if (state != null) state.x.toArray else null, arrayBuilder.result)
+ }
- bcFeaturesStd.destroy()
- if (state == null) {
- val msg = s"${optimizer.getClass.getName} failed."
- instr.logError(msg)
- throw new SparkException(msg)
- }
+ private def trainOnBlocks(
+ instances: RDD[Instance],
+ featuresStd: Array[Double],
+ regularization: Option[L2Regularization],
+ optimizer: BreezeOWLQN[Int, BDV[Double]]): (Array[Double], Array[Double]) = {
+ val numFeatures = featuresStd.length
+ val numFeaturesPlusIntercept = if ($(fitIntercept)) numFeatures + 1 else numFeatures
- /*
- The coefficients are trained in the scaled space; we're converting them back to
- the original space.
- Note that the intercept in scaled space and original space is the same;
- as a result, no scaling is needed.
- */
- val rawCoefficients = state.x.toArray
- val coefficientArray = Array.tabulate(numFeatures) { i =>
- if (featuresStd(i) != 0.0) {
- rawCoefficients(i) / featuresStd(i)
- } else {
- 0.0
- }
- }
+ val bcFeaturesStd = instances.context.broadcast(featuresStd)
- val intercept = if ($(fitIntercept)) {
- rawCoefficients(numFeaturesPlusIntercept - 1)
- } else {
- 0.0
- }
- (Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result())
+ val standardized = instances.mapPartitions { iter =>
+ val inverseStd = bcFeaturesStd.value.map { std => if (std != 0) 1.0 / std else 0.0 }
+ val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true)
+ iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) }
}
+ val blocks = InstanceBlock.blokify(standardized, $(blockSize))
+ .persist(StorageLevel.MEMORY_AND_DISK)
+ .setName(s"training dataset (blockSize=${$(blockSize)})")
+
+ val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_)
+ val costFun = new RDDLossFunction(blocks, getAggregatorFunc,
+ regularization, $(aggregationDepth))
+
+ val states = optimizer.iterations(new CachedDiffFunction(costFun),
+ Vectors.zeros(numFeaturesPlusIntercept).asBreeze.toDenseVector)
+
+ val arrayBuilder = mutable.ArrayBuilder.make[Double]
+ var state: optimizer.State = null
+ while (states.hasNext) {
+ state = states.next()
+ arrayBuilder += state.adjustedValue
+ }
+ blocks.unpersist()
+ bcFeaturesStd.destroy()
- if (handlePersistence) instances.unpersist()
-
- copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
+ (if (state != null) state.x.toArray else null, arrayBuilder.result)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
index 11d0c46..db5f88d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
@@ -17,7 +17,8 @@
package org.apache.spark.ml.feature
-import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.linalg._
+import org.apache.spark.rdd.RDD
/**
* Class that represents an instance of weighted data point with label and features.
@@ -28,6 +29,94 @@ import org.apache.spark.ml.linalg.Vector
*/
private[spark] case class Instance(label: Double, weight: Double, features: Vector)
+
+/**
+ * Class that represents an block of instance.
+ * If all weights are 1, then an empty array is stored.
+ */
+private[spark] case class InstanceBlock(
+ labels: Array[Double],
+ weights: Array[Double],
+ matrix: Matrix) {
+ require(labels.length == matrix.numRows)
+ require(matrix.isTransposed)
+ if (weights.nonEmpty) {
+ require(labels.length == weights.length)
+ }
+
+ def size: Int = labels.length
+
+ def numFeatures: Int = matrix.numCols
+
+ def instanceIterator: Iterator[Instance] = {
+ if (weights.nonEmpty) {
+ labels.iterator.zip(weights.iterator).zip(matrix.rowIter)
+ .map { case ((label, weight), vec) => Instance(label, weight, vec) }
+ } else {
+ labels.iterator.zip(matrix.rowIter)
+ .map { case (label, vec) => Instance(label, 1.0, vec) }
+ }
+ }
+
+ def getLabel(i: Int): Double = labels(i)
+
+ def labelIter: Iterator[Double] = labels.iterator
+
+ @transient lazy val getWeight: Int => Double = {
+ if (weights.nonEmpty) {
+ (i: Int) => weights(i)
+ } else {
+ (i: Int) => 1.0
+ }
+ }
+
+ def weightIter: Iterator[Double] = {
+ if (weights.nonEmpty) {
+ weights.iterator
+ } else {
+ Iterator.fill(size)(1.0)
+ }
+ }
+
+ // directly get the non-zero iterator of i-th row vector without array copy or slice
+ @transient lazy val getNonZeroIter: Int => Iterator[(Int, Double)] = {
+ matrix match {
+ case dm: DenseMatrix =>
+ (i: Int) =>
+ val start = numFeatures * i
+ Iterator.tabulate(numFeatures)(j =>
+ (j, dm.values(start + j))
+ ).filter(_._2 != 0)
+ case sm: SparseMatrix =>
+ (i: Int) =>
+ val start = sm.colPtrs(i)
+ val end = sm.colPtrs(i + 1)
+ Iterator.tabulate(end - start)(j =>
+ (sm.rowIndices(start + j), sm.values(start + j))
+ ).filter(_._2 != 0)
+ }
+ }
+}
+
+private[spark] object InstanceBlock {
+
+ def fromInstances(instances: Seq[Instance]): InstanceBlock = {
+ val labels = instances.map(_.label).toArray
+ val weights = if (instances.exists(_.weight != 1)) {
+ instances.map(_.weight).toArray
+ } else {
+ Array.emptyDoubleArray
+ }
+ val matrix = Matrices.fromVectors(instances.map(_.features))
+ new InstanceBlock(labels, weights, matrix)
+ }
+
+ def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
+ instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
+ }
+}
+
+
/**
* Case class that represents an instance of data point with
* label, weight, offset and features.
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
index b0906f1..1525bb9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
@@ -18,7 +18,7 @@
package org.apache.spark.ml.optim.aggregator
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.linalg._
/**
@@ -39,8 +39,8 @@ private[ml] class HingeAggregator(
fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
extends DifferentiableLossAggregator[Instance, HingeAggregator] {
- private val numFeatures: Int = bcFeaturesStd.value.length
- private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures
+ private val numFeatures = bcFeaturesStd.value.length
+ private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
@transient private lazy val coefficientsArray = bcCoefficients.value match {
case DenseVector(values) => values
case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
@@ -103,3 +103,107 @@ private[ml] class HingeAggregator(
}
}
}
+
+
+/**
+ * BlockHingeAggregator computes the gradient and loss for Hinge loss function as used in
+ * binary classification for blocks in sparse or dense matrix in an online fashion.
+ *
+ * Two BlockHingeAggregators can be merged together to have a summary of loss and gradient of
+ * the corresponding joint dataset.
+ *
+ * NOTE: The feature values are expected to be standardized before computation.
+ *
+ * @param bcCoefficients The coefficients corresponding to the features.
+ * @param fitIntercept Whether to fit an intercept term.
+ */
+private[ml] class BlockHingeAggregator(
+ fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
+ extends DifferentiableLossAggregator[InstanceBlock, BlockHingeAggregator] {
+
+ protected override val dim: Int = bcCoefficients.value.size
+ private val numFeatures = if (fitIntercept) dim - 1 else dim
+
+ @transient private lazy val coefficientsArray = bcCoefficients.value match {
+ case DenseVector(values) => values
+ case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
+ s" but got type ${bcCoefficients.value.getClass}.")
+ }
+
+ @transient private lazy val linear = {
+ val linear = if (fitIntercept) coefficientsArray.take(numFeatures) else coefficientsArray
+ Vectors.dense(linear).toDense
+ }
+
+ /**
+ * Add a new training instance block to this HingeAggregator, and update the loss and gradient
+ * of the objective function.
+ *
+ * @param block The InstanceBlock to be added.
+ * @return This HingeAggregator object.
+ */
+ def add(block: InstanceBlock): this.type = {
+ require(block.matrix.isTransposed)
+ require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
+ s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
+ require(block.weightIter.forall(_ >= 0),
+ s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")
+
+ if (block.weightIter.forall(_ == 0)) return this
+ val size = block.size
+
+ // vec here represents dotProducts
+ val vec = if (fitIntercept) {
+ Vectors.dense(Array.fill(size)(coefficientsArray.last)).toDense
+ } else {
+ Vectors.zeros(size).toDense
+ }
+ BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
+
+ // in-place convert dotProducts to gradient scales
+ // then, vec represents gradient scales
+ var i = 0
+ var interceptGradSum = 0.0
+ while (i < size) {
+ val weight = block.getWeight(i)
+ if (weight > 0) {
+ weightSum += weight
+ // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
+ // Therefore the gradient is -(2y - 1)*x
+ val label = block.getLabel(i)
+ val labelScaled = label + label - 1.0
+ val loss = (1.0 - labelScaled * vec.values(i)) * weight
+ if (loss > 0) {
+ lossSum += loss
+ val gradScale = -labelScaled * weight
+ vec.values(i) = gradScale
+ if (fitIntercept) interceptGradSum += gradScale
+ } else { vec.values(i) = 0.0 }
+ } else { vec.values(i) = 0.0 }
+ i += 1
+ }
+
+ // predictions are all correct, no gradient signal
+ if (vec.values.forall(_ == 0)) return this
+
+ block.matrix match {
+ case dm: DenseMatrix =>
+ BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols,
+ vec.values, 1, 1.0, gradientSumArray, 1)
+ if (fitIntercept) gradientSumArray(numFeatures) += interceptGradSum
+
+ case sm: SparseMatrix if fitIntercept =>
+ val linearGradSumVec = Vectors.zeros(numFeatures).toDense
+ BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
+ BLAS.getBLAS(numFeatures).daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
+ gradientSumArray, 1)
+ gradientSumArray(numFeatures) += interceptGradSum
+
+ case sm: SparseMatrix if !fitIntercept =>
+ val gradSumVec = new DenseVector(gradientSumArray)
+ BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)
+ }
+
+ this
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala
index 1183041..4230b49 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala
@@ -225,9 +225,10 @@ object Summarizer extends Logging {
/** Get classification feature and label summarizers for provided data. */
private[ml] def getClassificationSummarizers(
instances: RDD[Instance],
- aggregationDepth: Int = 2): (SummarizerBuffer, MultiClassSummarizer) = {
+ aggregationDepth: Int = 2,
+ requested: Seq[String] = Seq("mean", "std", "count")) = {
instances.treeAggregate(
- (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
+ (Summarizer.createSummarizerBuffer(requested: _*), new MultiClassSummarizer))(
seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) =>
(c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight)),
combOp = (c1: (SummarizerBuffer, MultiClassSummarizer),
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
index c2072ce..579d6b1 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
@@ -207,6 +207,21 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest {
dataset.as[LabeledPoint], estimator, modelEquals, 42L)
}
+ test("LinearSVC on blocks") {
+ for (dataset <- Seq(smallBinaryDataset, smallSparseBinaryDataset);
+ fitIntercept <- Seq(true, false)) {
+ val lsvc = new LinearSVC()
+ .setFitIntercept(fitIntercept)
+ .setMaxIter(5)
+ val model = lsvc.fit(dataset)
+ Seq(4, 16, 64).foreach { blockSize =>
+ val model2 = lsvc.setBlockSize(blockSize).fit(dataset)
+ assert(model.intercept ~== model2.intercept relTol 1e-9)
+ assert(model.coefficients ~== model2.coefficients relTol 1e-9)
+ }
+ }
+ }
+
test("prediction on single instance") {
val trainer = new LinearSVC()
val model = trainer.fit(smallBinaryDataset)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
index 5a74490..d780bdf 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
@@ -42,5 +42,36 @@ class InstanceSuite extends SparkFunSuite{
val o2 = ser.deserialize[OffsetInstance](ser.serialize(o))
assert(o === o2)
}
+
+ val block1 = InstanceBlock.fromInstances(Seq(instance1))
+ val block2 = InstanceBlock.fromInstances(Seq(instance1, instance2))
+ Seq(block1, block2).foreach { o =>
+ val o2 = ser.deserialize[InstanceBlock](ser.serialize(o))
+ assert(o.labels === o2.labels)
+ assert(o.weights === o2.weights)
+ assert(o.matrix === o2.matrix)
+ }
+ }
+
+ test("InstanceBlock: check correctness") {
+ val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0))
+ val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse)
+ val instances = Seq(instance1, instance2)
+
+ val block = InstanceBlock.fromInstances(instances)
+ assert(block.size === 2)
+ assert(block.numFeatures === 2)
+ block.instanceIterator.zipWithIndex.foreach {
+ case (instance, i) =>
+ assert(instance.label === instances(i).label)
+ assert(instance.weight === instances(i).weight)
+ assert(instance.features.toArray === instances(i).features.toArray)
+ }
+ Seq(0, 1).foreach { i =>
+ val nzIter = block.getNonZeroIter(i)
+ val vec = Vectors.sparse(2, nzIter.toSeq)
+ assert(vec.toArray === instances(i).features.toArray)
+ }
}
+
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
index 16d27a9..51a1edd 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.ml.optim.aggregator
import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.feature.{Instance, InstanceBlock}
import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.ml.stat.Summarizer
import org.apache.spark.ml.util.TestingUtils._
@@ -28,6 +28,7 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
@transient var instances: Array[Instance] = _
@transient var instancesConstantFeature: Array[Instance] = _
@transient var instancesConstantFeatureFiltered: Array[Instance] = _
+ @transient var standardizedInstances: Array[Instance] = _
override def beforeAll(): Unit = {
super.beforeAll()
@@ -46,6 +47,7 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
Instance(1.0, 0.5, Vectors.dense(1.0)),
Instance(2.0, 0.3, Vectors.dense(0.5))
)
+ standardizedInstances = standardize(instances)
}
/** Get summary statistics for some data and create a new HingeAggregator. */
@@ -61,6 +63,29 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients)
}
+ private def standardize(instances: Array[Instance]): Array[Instance] = {
+ val (featuresSummarizer, _) =
+ Summarizer.getClassificationSummarizers(sc.parallelize(instances))
+ val stdArray = featuresSummarizer.std.toArray
+ val numFeatures = stdArray.length
+ instances.map { case Instance(label, weight, features) =>
+ val standardized = Array.ofDim[Double](numFeatures)
+ features.foreachNonZero { (i, v) =>
+ val std = stdArray(i)
+ if (std != 0) standardized(i) = v / std
+ }
+ Instance(label, weight, Vectors.dense(standardized).compressed)
+ }
+ }
+
+ /** Get summary statistics for some data and create a new BlockHingeAggregator. */
+ private def getNewBlockAggregator(
+ coefficients: Vector,
+ fitIntercept: Boolean): BlockHingeAggregator = {
+ val bcCoefficients = spark.sparkContext.broadcast(coefficients)
+ new BlockHingeAggregator(fitIntercept)(bcCoefficients)
+ }
+
test("aggregator add method input size") {
val coefArray = Array(1.0, 2.0)
val interceptArray = Array(2.0)
@@ -139,8 +164,26 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
}
val gradient = Vectors.dense((gradientCoef ++ Array(gradientIntercept)).map(_ / weightSum))
- assert(loss ~== agg.loss relTol 0.01)
- assert(gradient ~== agg.gradient relTol 0.01)
+ assert(loss ~== agg.loss relTol 1e-9)
+ assert(gradient ~== agg.gradient relTol 1e-9)
+
+ Seq(1, 2, 4).foreach { blockSize =>
+ val blocks1 = standardizedInstances
+ .grouped(blockSize)
+ .map(seq => InstanceBlock.fromInstances(seq))
+ .toArray
+ val blocks2 = blocks1.map { block =>
+ new InstanceBlock(block.labels, block.weights, block.matrix.toSparseRowMajor)
+ }
+
+ Seq(blocks1, blocks2).foreach { blocks =>
+ val blockAgg = getNewBlockAggregator(Vectors.dense(coefArray ++ Array(intercept)),
+ fitIntercept = true)
+ blocks.foreach(blockAgg.add)
+ assert(loss ~== blockAgg.loss relTol 1e-9)
+ assert(gradient ~== blockAgg.gradient relTol 1e-9)
+ }
+ }
}
test("check with zero standard deviation") {
@@ -158,5 +201,4 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(aggConstantFeatureBinary.gradient(0) === 0.0)
assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0))
}
-
}
diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py
index 0d88aa8..318ae7a 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -241,7 +241,8 @@ class _JavaProbabilisticClassificationModel(ProbabilisticClassificationModel,
class _LinearSVCParams(_ClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol,
- HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold):
+ HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold,
+ HasBlockSize):
"""
Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`.
@@ -290,6 +291,8 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
LinearSVCModel...
>>> model.getThreshold()
0.5
+ >>> model.getBlockSize()
+ 1
>>> model.coefficients
DenseVector([0.0, -0.2792, -0.1833])
>>> model.intercept
@@ -328,18 +331,19 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction",
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None,
- aggregationDepth=2):
+ aggregationDepth=2, blockSize=1):
"""
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \
- aggregationDepth=2):
+ aggregationDepth=2, blockSize=1):
"""
super(LinearSVC, self).__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.classification.LinearSVC", self.uid)
self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True,
- standardization=True, threshold=0.0, aggregationDepth=2)
+ standardization=True, threshold=0.0, aggregationDepth=2,
+ blockSize=1)
kwargs = self._input_kwargs
self.setParams(**kwargs)
@@ -348,12 +352,12 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction",
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None,
- aggregationDepth=2):
+ aggregationDepth=2, blockSize=1):
"""
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \
- aggregationDepth=2):
+ aggregationDepth=2, blockSize=1):
Sets params for Linear SVM Classifier.
"""
kwargs = self._input_kwargs
@@ -418,6 +422,13 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
"""
return self._set(aggregationDepth=value)
+ @since("3.1.0")
+ def setBlockSize(self, value):
+ """
+ Sets the value of :py:attr:`blockSize`.
+ """
+ return self._set(blockSize=value)
+
class LinearSVCModel(_JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable):
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org