You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2020/02/25 03:19:18 UTC

[spark] branch branch-3.0 updated: Revert "[SPARK-30642][SPARK-30659][SPARK-30660][SPARK-30662]"

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e19f478  Revert "[SPARK-30642][SPARK-30659][SPARK-30660][SPARK-30662]"
e19f478 is described below

commit e19f47849559ca92ae42544b0d1b0e21a6f55d39
Author: zhengruifeng <ru...@foxmail.com>
AuthorDate: Sat Feb 8 08:46:16 2020 +0800

    Revert "[SPARK-30642][SPARK-30659][SPARK-30660][SPARK-30662]"
    
    ### What changes were proposed in this pull request?
    Revert
    #27360
    #27396
    #27374
    #27389
    
    ### Why are the changes needed?
    BLAS need more performace tests, specially on sparse datasets.
    Perfermance test of LogisticRegression (https://github.com/apache/spark/pull/27374) on sparse dataset shows that blockify vectors to matrices and use BLAS will cause performance regression.
    LinearSVC and LinearRegression were also updated in the same way as LogisticRegression, so we need to revert them to make sure no regression.
    
    ### Does this PR introduce any user-facing change?
    remove newly added param blockSize
    
    ### How was this patch tested?
    reverted testsuites
    
    Closes #27487 from zhengruifeng/revert_blockify_ii.
    
    Authored-by: zhengruifeng <ru...@foxmail.com>
    Signed-off-by: zhengruifeng <ru...@foxmail.com>
---
 .../apache/spark/serializer/KryoSerializer.scala   |   1 -
 .../scala/org/apache/spark/ml/linalg/BLAS.scala    |   1 +
 .../apache/spark/ml/classification/LinearSVC.scala |  51 ++---
 .../ml/classification/LogisticRegression.scala     |  58 ++---
 .../MultilayerPerceptronClassifier.scala           |  22 +-
 .../org/apache/spark/ml/feature/Instance.scala     | 130 +----------
 .../ml/optim/aggregator/HingeAggregator.scala      | 102 ++-------
 .../ml/optim/aggregator/HuberAggregator.scala      | 103 ++-------
 .../optim/aggregator/LeastSquaresAggregator.scala  |  74 ++----
 .../ml/optim/aggregator/LogisticAggregator.scala   | 252 +++------------------
 .../ml/param/shared/SharedParamsCodeGen.scala      |   6 +-
 .../spark/ml/param/shared/sharedParams.scala       |  23 +-
 .../org/apache/spark/ml/recommendation/ALS.scala   |  46 +---
 .../spark/ml/regression/LinearRegression.scala     |  49 ++--
 .../mllib/classification/LogisticRegression.scala  |   4 +-
 .../spark/ml/classification/LinearSVCSuite.scala   |   2 +-
 .../classification/LogisticRegressionSuite.scala   |   4 +-
 .../apache/spark/ml/feature/InstanceSuite.scala    |  31 ---
 .../ml/optim/aggregator/HingeAggregatorSuite.scala |  52 +----
 .../ml/optim/aggregator/HuberAggregatorSuite.scala |  61 +----
 .../aggregator/LeastSquaresAggregatorSuite.scala   |  62 +----
 .../optim/aggregator/LogisticAggregatorSuite.scala |  56 +----
 python/pyspark/ml/classification.py                |  67 +++---
 python/pyspark/ml/param/_shared_params_code_gen.py |   5 +-
 python/pyspark/ml/param/shared.py                  |  18 --
 python/pyspark/ml/recommendation.py                |  29 +--
 python/pyspark/ml/regression.py                    |  22 +-
 27 files changed, 260 insertions(+), 1071 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 55ac2c4..cdaab59 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -502,7 +502,6 @@ private[serializer] object KryoSerializer {
       "org.apache.spark.ml.attribute.NumericAttribute",
 
       "org.apache.spark.ml.feature.Instance",
-      "org.apache.spark.ml.feature.InstanceBlock",
       "org.apache.spark.ml.feature.LabeledPoint",
       "org.apache.spark.ml.feature.OffsetInstance",
       "org.apache.spark.ml.linalg.DenseMatrix",
diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
index 00e5b61..e054a15 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
@@ -682,6 +682,7 @@ private[spark] object BLAS extends Serializable {
 
           val xTemp = xValues(k) * alpha
           while (i < indEnd) {
+            val rowIndex = Arows(i)
             yValues(Arows(i)) += Avals(i) * xTemp
             i += 1
           }
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
index f16648d..9057890 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.Logging
-import org.apache.spark.ml.feature.{Instance, InstanceBlock}
+import org.apache.spark.ml.feature.Instance
 import org.apache.spark.ml.linalg._
 import org.apache.spark.ml.optim.aggregator.HingeAggregator
 import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
@@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
 /** Params for linear SVM Classifier. */
 private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
   with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
-  with HasAggregationDepth with HasThreshold with HasBlockSize {
+  with HasAggregationDepth with HasThreshold {
 
   /**
    * Param for threshold in binary classification prediction.
@@ -155,26 +155,19 @@ class LinearSVC @Since("2.2.0") (
   def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
   setDefault(aggregationDepth -> 2)
 
-  /**
-   * Set block size for stacking input data in matrices.
-   * Default is 1024.
-   *
-   * @group expertSetParam
-   */
-  @Since("3.0.0")
-  def setBlockSize(value: Int): this.type = set(blockSize, value)
-
   @Since("2.2.0")
   override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)
 
   override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr =>
+    val handlePersistence = dataset.storageLevel == StorageLevel.NONE
+
+    val instances = extractInstances(dataset)
+    if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
+
     instr.logPipelineStage(this)
     instr.logDataset(dataset)
     instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
-      regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)
-
-    val sc = dataset.sparkSession.sparkContext
-    val instances = extractInstances(dataset)
+      regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth)
 
     val (summarizer, labelSummarizer) = instances.treeAggregate(
       (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
@@ -215,33 +208,20 @@ class LinearSVC @Since("2.2.0") (
         throw new SparkException(msg)
       }
 
-      val featuresStd = summarizer.std.compressed
-      val bcFeaturesStd = sc.broadcast(featuresStd)
+      val featuresStd = summarizer.std.toArray
+      val getFeaturesStd = (j: Int) => featuresStd(j)
       val regParamL2 = $(regParam)
+      val bcFeaturesStd = instances.context.broadcast(featuresStd)
       val regularization = if (regParamL2 != 0.0) {
         val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
         Some(new L2Regularization(regParamL2, shouldApply,
-          if ($(standardization)) None else Some(featuresStd.apply)))
+          if ($(standardization)) None else Some(getFeaturesStd)))
       } else {
         None
       }
 
-      val standardized = instances.map {
-        case Instance(label, weight, features) =>
-          val featuresStd = bcFeaturesStd.value
-          val array = Array.ofDim[Double](numFeatures)
-          features.foreachNonZero { (i, v) =>
-            val std = featuresStd(i)
-            if (std != 0) array(i) = v / std
-          }
-          Instance(label, weight, Vectors.dense(array))
-      }
-      val blocks = InstanceBlock.blokify(standardized, $(blockSize))
-        .persist(StorageLevel.MEMORY_AND_DISK)
-        .setName(s"training dataset (blockSize=${$(blockSize)})")
-
-      val getAggregatorFunc = new HingeAggregator(numFeatures, $(fitIntercept))(_)
-      val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization,
+      val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_)
+      val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
         $(aggregationDepth))
 
       def regParamL1Fun = (index: Int) => 0D
@@ -258,7 +238,6 @@ class LinearSVC @Since("2.2.0") (
         scaledObjectiveHistory += state.adjustedValue
       }
 
-      blocks.unpersist()
       bcFeaturesStd.destroy()
       if (state == null) {
         val msg = s"${optimizer.getClass.getName} failed."
@@ -289,6 +268,8 @@ class LinearSVC @Since("2.2.0") (
       (Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result())
     }
 
+    if (handlePersistence) instances.unpersist()
+
     copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
   }
 }
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 9b5b362..50c14d0 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.Logging
-import org.apache.spark.ml.feature.{Instance, InstanceBlock}
+import org.apache.spark.ml.feature.Instance
 import org.apache.spark.ml.linalg._
 import org.apache.spark.ml.optim.aggregator.LogisticAggregator
 import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
@@ -50,8 +50,7 @@ import org.apache.spark.util.VersionUtils
  */
 private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams
   with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol
-  with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth
-  with HasBlockSize  {
+  with HasStandardization with HasWeightCol with HasThreshold with HasAggregationDepth {
 
   import org.apache.spark.ml.classification.LogisticRegression.supportedFamilyNames
 
@@ -431,15 +430,6 @@ class LogisticRegression @Since("1.2.0") (
   @Since("2.2.0")
   def setUpperBoundsOnIntercepts(value: Vector): this.type = set(upperBoundsOnIntercepts, value)
 
-  /**
-   * Set block size for stacking input data in matrices.
-   * Default is 1024.
-   *
-   * @group expertSetParam
-   */
-  @Since("3.0.0")
-  def setBlockSize(value: Int): this.type = set(blockSize, value)
-
   private def assertBoundConstrainedOptimizationParamsValid(
       numCoefficientSets: Int,
       numFeatures: Int): Unit = {
@@ -492,17 +482,24 @@ class LogisticRegression @Since("1.2.0") (
     this
   }
 
-  override protected[spark] def train(
-      dataset: Dataset[_]): LogisticRegressionModel = instrumented { instr =>
+  override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = {
+    val handlePersistence = dataset.storageLevel == StorageLevel.NONE
+    train(dataset, handlePersistence)
+  }
+
+  protected[spark] def train(
+      dataset: Dataset[_],
+      handlePersistence: Boolean): LogisticRegressionModel = instrumented { instr =>
+    val instances = extractInstances(dataset)
+
+    if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
+
     instr.logPipelineStage(this)
     instr.logDataset(dataset)
     instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
       probabilityCol, regParam, elasticNetParam, standardization, threshold, maxIter, tol,
       fitIntercept)
 
-    val sc = dataset.sparkSession.sparkContext
-    val instances = extractInstances(dataset)
-
     val (summarizer, labelSummarizer) = instances.treeAggregate(
       (Summarizer.createSummarizerBuffer("mean", "std", "count"), new MultiClassSummarizer))(
       seqOp = (c: (SummarizerBuffer, MultiClassSummarizer), instance: Instance) =>
@@ -585,9 +582,8 @@ class LogisticRegression @Since("1.2.0") (
             s"dangerous ground, so the algorithm may not converge.")
         }
 
-        val featuresMean = summarizer.mean.compressed
-        val featuresStd = summarizer.std.compressed
-        val bcFeaturesStd = sc.broadcast(featuresStd)
+        val featuresMean = summarizer.mean.toArray
+        val featuresStd = summarizer.std.toArray
 
         if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
           featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) {
@@ -599,7 +595,8 @@ class LogisticRegression @Since("1.2.0") (
         val regParamL1 = $(elasticNetParam) * $(regParam)
         val regParamL2 = (1.0 - $(elasticNetParam)) * $(regParam)
 
-        val getAggregatorFunc = new LogisticAggregator(numFeatures, numClasses, $(fitIntercept),
+        val bcFeaturesStd = instances.context.broadcast(featuresStd)
+        val getAggregatorFunc = new LogisticAggregator(bcFeaturesStd, numClasses, $(fitIntercept),
           multinomial = isMultinomial)(_)
         val getFeaturesStd = (j: Int) => if (j >= 0 && j < numCoefficientSets * numFeatures) {
           featuresStd(j / numCoefficientSets)
@@ -615,21 +612,7 @@ class LogisticRegression @Since("1.2.0") (
           None
         }
 
-        val standardized = instances.map {
-          case Instance(label, weight, features) =>
-            val featuresStd = bcFeaturesStd.value
-            val array = Array.ofDim[Double](numFeatures)
-            features.foreachNonZero { (i, v) =>
-              val std = featuresStd(i)
-              if (std != 0) array(i) = v / std
-            }
-            Instance(label, weight, Vectors.dense(array))
-        }
-        val blocks = InstanceBlock.blokify(standardized, $(blockSize))
-          .persist(StorageLevel.MEMORY_AND_DISK)
-          .setName(s"training dataset (blockSize=${$(blockSize)})")
-
-        val costFun = new RDDLossFunction(blocks, getAggregatorFunc, regularization,
+        val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization,
           $(aggregationDepth))
 
         val numCoeffsPlusIntercepts = numFeaturesPlusIntercept * numCoefficientSets
@@ -823,7 +806,6 @@ class LogisticRegression @Since("1.2.0") (
           state = states.next()
           arrayBuilder += state.adjustedValue
         }
-        blocks.unpersist()
         bcFeaturesStd.destroy()
 
         if (state == null) {
@@ -893,6 +875,8 @@ class LogisticRegression @Since("1.2.0") (
       }
     }
 
+    if (handlePersistence) instances.unpersist()
+
     val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector,
       numClasses, isMultinomial))
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
index 6e8f92b..c7a8237 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
@@ -34,7 +34,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion
 
 /** Params for Multilayer Perceptron. */
 private[classification] trait MultilayerPerceptronParams extends ProbabilisticClassifierParams
-  with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver with HasBlockSize {
+  with HasSeed with HasMaxIter with HasTol with HasStepSize with HasSolver {
 
   import MultilayerPerceptronClassifier._
 
@@ -55,6 +55,26 @@ private[classification] trait MultilayerPerceptronParams extends ProbabilisticCl
   final def getLayers: Array[Int] = $(layers)
 
   /**
+   * Block size for stacking input data in matrices to speed up the computation.
+   * Data is stacked within partitions. If block size is more than remaining data in
+   * a partition then it is adjusted to the size of this data.
+   * Recommended size is between 10 and 1000.
+   * Default: 128
+   *
+   * @group expertParam
+   */
+  @Since("1.5.0")
+  final val blockSize: IntParam = new IntParam(this, "blockSize",
+    "Block size for stacking input data in matrices. Data is stacked within partitions." +
+      " If block size is more than remaining data in a partition then " +
+      "it is adjusted to the size of this data. Recommended size is between 10 and 1000",
+    ParamValidators.gt(0))
+
+  /** @group expertGetParam */
+  @Since("1.5.0")
+  final def getBlockSize: Int = $(blockSize)
+
+  /**
    * The solver algorithm for optimization.
    * Supported options: "gd" (minibatch gradient descent) or "l-bfgs".
    * Default: "l-bfgs"
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
index 5476a86..11d0c46 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
@@ -17,10 +17,7 @@
 
 package org.apache.spark.ml.feature
 
-import scala.collection.mutable
-
-import org.apache.spark.ml.linalg._
-import org.apache.spark.rdd.RDD
+import org.apache.spark.ml.linalg.Vector
 
 /**
  * Class that represents an instance of weighted data point with label and features.
@@ -31,131 +28,6 @@ import org.apache.spark.rdd.RDD
  */
 private[spark] case class Instance(label: Double, weight: Double, features: Vector)
 
-
-/**
- * Class that represents an block of instance.
- * If all weights are 1, then an empty array is stored.
- */
-private[spark] case class InstanceBlock(
-    labels: Array[Double],
-    weights: Array[Double],
-    matrix: Matrix) {
-  require(labels.length == matrix.numRows)
-  require(matrix.isTransposed)
-  if (weights.nonEmpty) {
-    require(labels.length == weights.length)
-  }
-
-  def size: Int = labels.length
-
-  def numFeatures: Int = matrix.numCols
-
-  def instanceIterator: Iterator[Instance] = {
-    if (weights.nonEmpty) {
-      labels.iterator.zip(weights.iterator).zip(matrix.rowIter)
-        .map { case ((label, weight), vec) => Instance(label, weight, vec) }
-    } else {
-      labels.iterator.zip(matrix.rowIter)
-        .map { case (label, vec) => Instance(label, 1.0, vec) }
-    }
-  }
-
-  def getLabel(i: Int): Double = labels(i)
-
-  def labelIter: Iterator[Double] = labels.iterator
-
-  @transient lazy val getWeight: Int => Double = {
-    if (weights.nonEmpty) {
-      (i: Int) => weights(i)
-    } else {
-      (i: Int) => 1.0
-    }
-  }
-
-  def weightIter: Iterator[Double] = {
-    if (weights.nonEmpty) {
-      weights.iterator
-    } else {
-      Iterator.fill(size)(1.0)
-    }
-  }
-
-  // directly get the non-zero iterator of i-th row vector without array copy or slice
-  @transient lazy val getNonZeroIter: Int => Iterator[(Int, Double)] = {
-    matrix match {
-      case dm: DenseMatrix =>
-        (i: Int) =>
-          val start = numFeatures * i
-          Iterator.tabulate(numFeatures)(j =>
-            (j, dm.values(start + j))
-          ).filter(_._2 != 0)
-      case sm: SparseMatrix =>
-        (i: Int) =>
-          val start = sm.colPtrs(i)
-          val end = sm.colPtrs(i + 1)
-          Iterator.tabulate(end - start)(j =>
-            (sm.rowIndices(start + j), sm.values(start + j))
-          ).filter(_._2 != 0)
-    }
-  }
-}
-
-private[spark] object InstanceBlock {
-
-  def fromInstances(instances: Seq[Instance]): InstanceBlock = {
-    val labels = instances.map(_.label).toArray
-    val weights = if (instances.exists(_.weight != 1)) {
-      instances.map(_.weight).toArray
-    } else {
-      Array.emptyDoubleArray
-    }
-    val numRows = instances.length
-    val numCols = instances.head.features.size
-    val denseSize = Matrices.getDenseSize(numCols, numRows)
-    val nnz = instances.iterator.map(_.features.numNonzeros).sum
-    val sparseSize = Matrices.getSparseSize(nnz, numRows + 1)
-    val matrix = if (denseSize < sparseSize) {
-      val values = Array.ofDim[Double](numRows * numCols)
-      var offset = 0
-      var j = 0
-      while (j < numRows) {
-        instances(j).features.foreachNonZero { (i, v) =>
-          values(offset + i) = v
-        }
-        offset += numCols
-        j += 1
-      }
-      new DenseMatrix(numRows, numCols, values, true)
-    } else {
-      val colIndices = mutable.ArrayBuilder.make[Int]
-      val values = mutable.ArrayBuilder.make[Double]
-      val rowPtrs = mutable.ArrayBuilder.make[Int]
-      var rowPtr = 0
-      rowPtrs += 0
-      var j = 0
-      while (j < numRows) {
-        var nnz = 0
-        instances(j).features.foreachNonZero { (i, v) =>
-          colIndices += i
-          values += v
-          nnz += 1
-        }
-        rowPtr += nnz
-        rowPtrs += rowPtr
-        j += 1
-      }
-      new SparseMatrix(numRows, numCols, rowPtrs.result(),
-        colIndices.result(), values.result(), true)
-    }
-    InstanceBlock(labels, weights, matrix)
-  }
-
-  def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
-    instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
-  }
-}
-
-
 /**
  * Case class that represents an instance of data point with
  * label, weight, offset and features.
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
index 292187b..b0906f1 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.ml.optim.aggregator
 
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.ml.feature.{Instance, InstanceBlock}
+import org.apache.spark.ml.feature.Instance
 import org.apache.spark.ml.linalg._
 
 /**
@@ -32,28 +32,21 @@ import org.apache.spark.ml.linalg._
  *
  * @param bcCoefficients The coefficients corresponding to the features.
  * @param fitIntercept Whether to fit an intercept term.
+ * @param bcFeaturesStd The standard deviation values of the features.
  */
 private[ml] class HingeAggregator(
-    numFeatures: Int,
+    bcFeaturesStd: Broadcast[Array[Double]],
     fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector])
-  extends DifferentiableLossAggregator[InstanceBlock, HingeAggregator] {
+  extends DifferentiableLossAggregator[Instance, HingeAggregator] {
 
+  private val numFeatures: Int = bcFeaturesStd.value.length
   private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures
-  protected override val dim: Int = numFeaturesPlusIntercept
   @transient private lazy val coefficientsArray = bcCoefficients.value match {
     case DenseVector(values) => values
     case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" +
       s" but got type ${bcCoefficients.value.getClass}.")
   }
-
-  @transient private lazy val linear = {
-    if (fitIntercept) {
-      new DenseVector(coefficientsArray.take(numFeatures))
-    } else {
-      new DenseVector(coefficientsArray)
-    }
-  }
-
+  protected override val dim: Int = numFeaturesPlusIntercept
 
   /**
    * Add a new training instance to this HingeAggregator, and update the loss and gradient
@@ -69,13 +62,16 @@ private[ml] class HingeAggregator(
       require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
 
       if (weight == 0.0) return this
+      val localFeaturesStd = bcFeaturesStd.value
       val localCoefficients = coefficientsArray
       val localGradientSumArray = gradientSumArray
 
       val dotProduct = {
         var sum = 0.0
         features.foreachNonZero { (index, value) =>
-          sum += localCoefficients(index) * value
+          if (localFeaturesStd(index) != 0.0) {
+            sum += localCoefficients(index) * value / localFeaturesStd(index)
+          }
         }
         if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1)
         sum
@@ -92,7 +88,9 @@ private[ml] class HingeAggregator(
       if (1.0 > labelScaled * dotProduct) {
         val gradientScale = -labelScaled * weight
         features.foreachNonZero { (index, value) =>
-          localGradientSumArray(index) += value * gradientScale
+          if (localFeaturesStd(index) != 0.0) {
+            localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index)
+          }
         }
         if (fitIntercept) {
           localGradientSumArray(localGradientSumArray.length - 1) += gradientScale
@@ -104,78 +102,4 @@ private[ml] class HingeAggregator(
       this
     }
   }
-
-  /**
-   * Add a new training instance block to this HingeAggregator, and update the loss and gradient
-   * of the objective function.
-   *
-   * @param block The InstanceBlock to be added.
-   * @return This HingeAggregator object.
-   */
-  def add(block: InstanceBlock): this.type = {
-    require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
-      s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
-    require(block.weightIter.forall(_ >= 0),
-      s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")
-
-    if (block.weightIter.forall(_ == 0)) return this
-    val size = block.size
-    val localGradientSumArray = gradientSumArray
-
-    // vec here represents dotProducts
-    val vec = if (fitIntercept && coefficientsArray.last != 0) {
-      val intercept = coefficientsArray.last
-      new DenseVector(Array.fill(size)(intercept))
-    } else {
-      new DenseVector(Array.ofDim[Double](size))
-    }
-
-    if (fitIntercept) {
-      BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
-    } else {
-      BLAS.gemv(1.0, block.matrix, linear, 0.0, vec)
-    }
-
-    // in-place convert dotProducts to gradient scales
-    // then, vec represents gradient scales
-    var i = 0
-    while (i < size) {
-      val weight = block.getWeight(i)
-      if (weight > 0) {
-        weightSum += weight
-        // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))
-        // Therefore the gradient is -(2y - 1)*x
-        val label = block.getLabel(i)
-        val labelScaled = 2 * label - 1.0
-        val loss = (1.0 - labelScaled * vec(i)) * weight
-        if (loss > 0) {
-          lossSum += loss
-          val gradScale = -labelScaled * weight
-          vec.values(i) = gradScale
-        } else {
-          vec.values(i) = 0.0
-        }
-      } else {
-        vec.values(i) = 0.0
-      }
-      i += 1
-    }
-
-    // predictions are all correct, no gradient signal
-    if (vec.values.forall(_ == 0)) return this
-
-    if (fitIntercept) {
-      // localGradientSumArray is of size numFeatures+1, so can not
-      // be directly used as the output of BLAS.gemv
-      val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures))
-      BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec)
-      linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v }
-      localGradientSumArray(numFeatures) += vec.values.sum
-    } else {
-      val gradSumVec = new DenseVector(localGradientSumArray)
-      BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec)
-    }
-
-    this
-  }
 }
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala
index f836215..8a1a41b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala
@@ -17,8 +17,8 @@
 package org.apache.spark.ml.optim.aggregator
 
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.ml.feature.{Instance, InstanceBlock}
-import org.apache.spark.ml.linalg._
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.Vector
 
 /**
  * HuberAggregator computes the gradient and loss for a huber loss function,
@@ -62,17 +62,19 @@ import org.apache.spark.ml.linalg._
  *
  * @param fitIntercept Whether to fit an intercept term.
  * @param epsilon The shape parameter to control the amount of robustness.
+ * @param bcFeaturesStd The broadcast standard deviation values of the features.
  * @param bcParameters including three parts: the regression coefficients corresponding
  *                     to the features, the intercept (if fitIntercept is ture)
  *                     and the scale parameter (sigma).
  */
 private[ml] class HuberAggregator(
-    numFeatures: Int,
     fitIntercept: Boolean,
-    epsilon: Double)(bcParameters: Broadcast[Vector])
-  extends DifferentiableLossAggregator[InstanceBlock, HuberAggregator] {
+    epsilon: Double,
+    bcFeaturesStd: Broadcast[Array[Double]])(bcParameters: Broadcast[Vector])
+  extends DifferentiableLossAggregator[Instance, HuberAggregator] {
 
   protected override val dim: Int = bcParameters.value.size
+  private val numFeatures: Int = if (fitIntercept) dim - 2 else dim - 1
   private val sigma: Double = bcParameters.value(dim - 1)
   private val intercept: Double = if (fitIntercept) {
     bcParameters.value(dim - 2)
@@ -80,8 +82,7 @@ private[ml] class HuberAggregator(
     0.0
   }
   // make transient so we do not serialize between aggregation stages
-  @transient private lazy val linear =
-    new DenseVector(bcParameters.value.toArray.take(numFeatures))
+  @transient private lazy val coefficients = bcParameters.value.toArray.slice(0, numFeatures)
 
   /**
    * Add a new training instance to this HuberAggregator, and update the loss and gradient
@@ -97,13 +98,16 @@ private[ml] class HuberAggregator(
       require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
 
       if (weight == 0.0) return this
-      val localCoefficients = linear.values
+      val localFeaturesStd = bcFeaturesStd.value
+      val localCoefficients = coefficients
       val localGradientSumArray = gradientSumArray
 
       val margin = {
         var sum = 0.0
         features.foreachNonZero { (index, value) =>
-            sum += localCoefficients(index) * value
+          if (localFeaturesStd(index) != 0.0) {
+            sum += localCoefficients(index) * (value / localFeaturesStd(index))
+          }
         }
         if (fitIntercept) sum += intercept
         sum
@@ -115,7 +119,10 @@ private[ml] class HuberAggregator(
         val linearLossDivSigma = linearLoss / sigma
 
         features.foreachNonZero { (index, value) =>
-          localGradientSumArray(index) -= weight * linearLossDivSigma * value
+          if (localFeaturesStd(index) != 0.0) {
+            localGradientSumArray(index) +=
+              -1.0 * weight * linearLossDivSigma * (value / localFeaturesStd(index))
+          }
         }
         if (fitIntercept) {
           localGradientSumArray(dim - 2) += -1.0 * weight * linearLossDivSigma
@@ -127,7 +134,10 @@ private[ml] class HuberAggregator(
           (sigma + 2.0 * epsilon * math.abs(linearLoss) - sigma * epsilon * epsilon)
 
         features.foreachNonZero { (index, value) =>
-          localGradientSumArray(index) += weight * sign * epsilon * value
+          if (localFeaturesStd(index) != 0.0) {
+            localGradientSumArray(index) +=
+              weight * sign * epsilon * (value / localFeaturesStd(index))
+          }
         }
         if (fitIntercept) {
           localGradientSumArray(dim - 2) += weight * sign * epsilon
@@ -139,75 +149,4 @@ private[ml] class HuberAggregator(
       this
     }
   }
-
-  /**
-   * Add a new training instance block to this HuberAggregator, and update the loss and gradient
-   * of the objective function.
-   *
-   * @param block The instance block of data point to be added.
-   * @return This HuberAggregator object.
-   */
-  def add(block: InstanceBlock): HuberAggregator = {
-    require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
-      s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
-    require(block.weightIter.forall(_ >= 0),
-      s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")
-
-    if (block.weightIter.forall(_ == 0)) return this
-    val size = block.size
-    val localGradientSumArray = gradientSumArray
-
-    // vec here represents margins or dotProducts
-    val vec = if (fitIntercept && intercept != 0) {
-      new DenseVector(Array.fill(size)(intercept))
-    } else {
-      new DenseVector(Array.ofDim[Double](size))
-    }
-
-    if (fitIntercept) {
-      BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
-    } else {
-      BLAS.gemv(1.0, block.matrix, linear, 0.0, vec)
-    }
-
-    // in-place convert margins to multipliers
-    // then, vec represents multipliers
-    var i = 0
-    while (i < size) {
-      val weight = block.getWeight(i)
-      if (weight > 0) {
-        weightSum += weight
-        val label = block.getLabel(i)
-        val margin = vec(i)
-        val linearLoss = label - margin
-
-        if (math.abs(linearLoss) <= sigma * epsilon) {
-          lossSum += 0.5 * weight * (sigma + math.pow(linearLoss, 2.0) / sigma)
-          val linearLossDivSigma = linearLoss / sigma
-          val multiplier = -1.0 * weight * linearLossDivSigma
-          vec.values(i) = multiplier
-          localGradientSumArray(dim - 1) += 0.5 * weight * (1.0 - math.pow(linearLossDivSigma, 2.0))
-        } else {
-          lossSum += 0.5 * weight *
-            (sigma + 2.0 * epsilon * math.abs(linearLoss) - sigma * epsilon * epsilon)
-          val sign = if (linearLoss >= 0) -1.0 else 1.0
-          val multiplier = weight * sign * epsilon
-          vec.values(i) = multiplier
-          localGradientSumArray(dim - 1) += 0.5 * weight * (1.0 - epsilon * epsilon)
-        }
-      } else {
-        vec.values(i) = 0.0
-      }
-      i += 1
-    }
-
-    val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures))
-    BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec)
-    linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v }
-    if (fitIntercept) {
-      localGradientSumArray(dim - 2) += vec.values.sum
-    }
-
-    this
-  }
 }
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala
index a8bda9c..7a5806d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregator.scala
@@ -17,8 +17,8 @@
 package org.apache.spark.ml.optim.aggregator
 
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.ml.feature.{Instance, InstanceBlock}
-import org.apache.spark.ml.linalg._
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
 
 /**
  * LeastSquaresAggregator computes the gradient and loss for a Least-squared loss function,
@@ -157,25 +157,26 @@ private[ml] class LeastSquaresAggregator(
     labelStd: Double,
     labelMean: Double,
     fitIntercept: Boolean,
-    bcFeaturesStd: Broadcast[Vector],
-    bcFeaturesMean: Broadcast[Vector])(bcCoefficients: Broadcast[Vector])
-  extends DifferentiableLossAggregator[InstanceBlock, LeastSquaresAggregator] {
+    bcFeaturesStd: Broadcast[Array[Double]],
+    bcFeaturesMean: Broadcast[Array[Double]])(bcCoefficients: Broadcast[Vector])
+  extends DifferentiableLossAggregator[Instance, LeastSquaresAggregator] {
   require(labelStd > 0.0, s"${this.getClass.getName} requires the label standard " +
     s"deviation to be positive.")
 
-  private val numFeatures = bcFeaturesStd.value.size
+  private val numFeatures = bcFeaturesStd.value.length
   protected override val dim: Int = numFeatures
   // make transient so we do not serialize between aggregation stages
+  @transient private lazy val featuresStd = bcFeaturesStd.value
   @transient private lazy val effectiveCoefAndOffset = {
     val coefficientsArray = bcCoefficients.value.toArray.clone()
     val featuresMean = bcFeaturesMean.value
-    val featuresStd = bcFeaturesStd.value
     var sum = 0.0
     var i = 0
     val len = coefficientsArray.length
     while (i < len) {
       if (featuresStd(i) != 0.0) {
-        sum += coefficientsArray(i) / featuresStd(i) * featuresMean(i)
+        coefficientsArray(i) /=  featuresStd(i)
+        sum += coefficientsArray(i) * featuresMean(i)
       } else {
         coefficientsArray(i) = 0.0
       }
@@ -185,7 +186,7 @@ private[ml] class LeastSquaresAggregator(
     (Vectors.dense(coefficientsArray), offset)
   }
   // do not use tuple assignment above because it will circumvent the @transient tag
-  @transient private lazy val effectiveCoefficientsVec = effectiveCoefAndOffset._1
+  @transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1
   @transient private lazy val offset = effectiveCoefAndOffset._2
 
   /**
@@ -203,20 +204,16 @@ private[ml] class LeastSquaresAggregator(
 
       if (weight == 0.0) return this
 
-      val localEffectiveCoefficientsVec = effectiveCoefficientsVec
-
-      val diff = {
-        var dot = 0.0
-        features.foreachNonZero { (index, value) =>
-          dot += localEffectiveCoefficientsVec(index) * value
-        }
-        dot - label / labelStd + offset
-      }
+      val diff = BLAS.dot(features, effectiveCoefficientsVector) - label / labelStd + offset
 
       if (diff != 0) {
         val localGradientSumArray = gradientSumArray
+        val localFeaturesStd = featuresStd
         features.foreachNonZero { (index, value) =>
-          localGradientSumArray(index) += weight * diff * value
+          val fStd = localFeaturesStd(index)
+          if (fStd != 0.0) {
+            localGradientSumArray(index) += weight * diff * value / fStd
+          }
         }
         lossSum += weight * diff * diff / 2.0
       }
@@ -224,43 +221,4 @@ private[ml] class LeastSquaresAggregator(
       this
     }
   }
-
-  /**
-   * Add a new training instance block to this LeastSquaresAggregator, and update the loss
-   * and gradient of the objective function.
-   *
-   * @param block The instance block of data point to be added.
-   * @return This LeastSquaresAggregator object.
-   */
-  def add(block: InstanceBlock): LeastSquaresAggregator = {
-    require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
-      s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
-    require(block.weightIter.forall(_ >= 0),
-      s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")
-
-    if (block.weightIter.forall(_ == 0)) return this
-    val size = block.size
-
-    // vec here represents diffs
-    val vec = new DenseVector(Array.tabulate(size)(i => offset - block.getLabel(i) / labelStd))
-    BLAS.gemv(1.0, block.matrix, effectiveCoefficientsVec, 1.0, vec)
-
-    // in-place convert diffs to multipliers
-    // then, vec represents multipliers
-    var i = 0
-    while (i < size) {
-      val weight = block.getWeight(i)
-      val diff = vec(i)
-      lossSum += weight * diff * diff / 2
-      weightSum += weight
-      val multiplier = weight * diff
-      vec.values(i) = multiplier
-      i += 1
-    }
-
-    val gradSumVec = new DenseVector(gradientSumArray)
-    BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec)
-
-    this
-  }
 }
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala
index 76d2199..f2b3566 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala
@@ -18,8 +18,8 @@ package org.apache.spark.ml.optim.aggregator
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
-import org.apache.spark.ml.feature.{Instance, InstanceBlock}
-import org.apache.spark.ml.linalg._
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{DenseVector, Vector}
 import org.apache.spark.mllib.util.MLUtils
 
 /**
@@ -171,6 +171,7 @@ import org.apache.spark.mllib.util.MLUtils
  *
  *
  * @param bcCoefficients The broadcast coefficients corresponding to the features.
+ * @param bcFeaturesStd The broadcast standard deviation values of the features.
  * @param numClasses the number of possible outcomes for k classes classification problem in
  *                   Multinomial Logistic Regression.
  * @param fitIntercept Whether to fit an intercept term.
@@ -182,12 +183,13 @@ import org.apache.spark.mllib.util.MLUtils
  * since this form is optimal for the matrix operations used for prediction.
  */
 private[ml] class LogisticAggregator(
-    numFeatures: Int,
+    bcFeaturesStd: Broadcast[Array[Double]],
     numClasses: Int,
     fitIntercept: Boolean,
     multinomial: Boolean)(bcCoefficients: Broadcast[Vector])
-  extends DifferentiableLossAggregator[InstanceBlock, LogisticAggregator] with Logging {
+  extends DifferentiableLossAggregator[Instance, LogisticAggregator] with Logging {
 
+  private val numFeatures = bcFeaturesStd.value.length
   private val numFeaturesPlusIntercept = if (fitIntercept) numFeatures + 1 else numFeatures
   private val coefficientSize = bcCoefficients.value.size
   protected override val dim: Int = coefficientSize
@@ -207,31 +209,6 @@ private[ml] class LogisticAggregator(
       s"got type ${bcCoefficients.value.getClass}.)")
   }
 
-  @transient private lazy val binaryLinear = {
-    if (!multinomial) {
-      if (fitIntercept) {
-        new DenseVector(coefficientsArray.take(numFeatures))
-      } else {
-        new DenseVector(coefficientsArray)
-      }
-    } else {
-      null
-    }
-  }
-
-  @transient private lazy val multinomialLinear = {
-    if (multinomial) {
-      if (fitIntercept) {
-        new DenseMatrix(numClasses, numFeatures, coefficientsArray.take(numClasses * numFeatures))
-      } else {
-        new DenseMatrix(numClasses, numFeatures, coefficientsArray)
-      }
-    } else {
-      null
-    }
-  }
-
-
   if (multinomial && numClasses <= 2) {
     logInfo(s"Multinomial logistic regression for binary classification yields separate " +
       s"coefficients for positive and negative classes. When no regularization is applied, the" +
@@ -242,12 +219,15 @@ private[ml] class LogisticAggregator(
   /** Update gradient and loss using binary loss function. */
   private def binaryUpdateInPlace(features: Vector, weight: Double, label: Double): Unit = {
 
+    val localFeaturesStd = bcFeaturesStd.value
     val localCoefficients = coefficientsArray
     val localGradientArray = gradientSumArray
     val margin = - {
       var sum = 0.0
       features.foreachNonZero { (index, value) =>
-        sum += localCoefficients(index) * value
+        if (localFeaturesStd(index) != 0.0) {
+          sum += localCoefficients(index) * value / localFeaturesStd(index)
+        }
       }
       if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1)
       sum
@@ -256,7 +236,9 @@ private[ml] class LogisticAggregator(
     val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label)
 
     features.foreachNonZero { (index, value) =>
-      localGradientArray(index) += multiplier * value
+      if (localFeaturesStd(index) != 0.0) {
+        localGradientArray(index) += multiplier * value / localFeaturesStd(index)
+      }
     }
 
     if (fitIntercept) {
@@ -271,61 +253,6 @@ private[ml] class LogisticAggregator(
     }
   }
 
-  /** Update gradient and loss using binary loss function. */
-  private def binaryUpdateInPlace(block: InstanceBlock): Unit = {
-    val size = block.size
-    val localGradientSumArray = gradientSumArray
-
-    // vec here represents margins or negative dotProducts
-    val vec = if (fitIntercept && coefficientsArray.last != 0) {
-      val intercept = coefficientsArray.last
-      new DenseVector(Array.fill(size)(intercept))
-    } else {
-      new DenseVector(Array.ofDim[Double](size))
-    }
-
-    if (fitIntercept) {
-      BLAS.gemv(-1.0, block.matrix, binaryLinear, -1.0, vec)
-    } else {
-      BLAS.gemv(-1.0, block.matrix, binaryLinear, 0.0, vec)
-    }
-
-    // in-place convert margins to multiplier
-    // then, vec represents multiplier
-    var i = 0
-    while (i < size) {
-      val weight = block.getWeight(i)
-      if (weight > 0) {
-        weightSum += weight
-        val label = block.getLabel(i)
-        val margin = vec(i)
-        if (label > 0) {
-          // The following is equivalent to log(1 + exp(margin)) but more numerically stable.
-          lossSum += weight * MLUtils.log1pExp(margin)
-        } else {
-          lossSum += weight * (MLUtils.log1pExp(margin) - margin)
-        }
-        val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label)
-        vec.values(i) = multiplier
-      } else {
-        vec.values(i) = 0.0
-      }
-      i += 1
-    }
-
-    if (fitIntercept) {
-      // localGradientSumArray is of size numFeatures+1, so can not
-      // be directly used as the output of BLAS.gemv
-      val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures))
-      BLAS.gemv(1.0, block.matrix.transpose, vec, 0.0, linearGradSumVec)
-      linearGradSumVec.foreachNonZero { (i, v) => localGradientSumArray(i) += v }
-      localGradientSumArray(numFeatures) += vec.values.sum
-    } else {
-      val gradSumVec = new DenseVector(localGradientSumArray)
-      BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec)
-    }
-  }
-
   /** Update gradient and loss using multinomial (softmax) loss function. */
   private def multinomialUpdateInPlace(features: Vector, weight: Double, label: Double): Unit = {
     // TODO: use level 2 BLAS operations
@@ -333,6 +260,7 @@ private[ml] class LogisticAggregator(
       Note: this can still be used when numClasses = 2 for binary
       logistic regression without pivoting.
      */
+    val localFeaturesStd = bcFeaturesStd.value
     val localCoefficients = coefficientsArray
     val localGradientArray = gradientSumArray
 
@@ -342,10 +270,13 @@ private[ml] class LogisticAggregator(
 
     val margins = new Array[Double](numClasses)
     features.foreachNonZero { (index, value) =>
-      var j = 0
-      while (j < numClasses) {
-        margins(j) += localCoefficients(index * numClasses + j) * value
-        j += 1
+      if (localFeaturesStd(index) != 0.0) {
+        val stdValue = value / localFeaturesStd(index)
+        var j = 0
+        while (j < numClasses) {
+          margins(j) += localCoefficients(index * numClasses + j) * stdValue
+          j += 1
+        }
       }
     }
     var i = 0
@@ -383,10 +314,13 @@ private[ml] class LogisticAggregator(
       multipliers(i) = multipliers(i) / sum - (if (label == i) 1.0 else 0.0)
     }
     features.foreachNonZero { (index, value) =>
-      var j = 0
-      while (j < numClasses) {
-        localGradientArray(index * numClasses + j) += weight * multipliers(j) * value
-        j += 1
+      if (localFeaturesStd(index) != 0.0) {
+        val stdValue = value / localFeaturesStd(index)
+        var j = 0
+        while (j < numClasses) {
+          localGradientArray(index * numClasses + j) += weight * multipliers(j) * stdValue
+          j += 1
+        }
       }
     }
     if (fitIntercept) {
@@ -405,112 +339,6 @@ private[ml] class LogisticAggregator(
     lossSum += weight * loss
   }
 
-  /** Update gradient and loss using multinomial (softmax) loss function. */
-  private def multinomialUpdateInPlace(block: InstanceBlock): Unit = {
-    val size = block.size
-    val localGradientSumArray = gradientSumArray
-
-    // mat here represents margins, shape: S X C
-    val mat = new DenseMatrix(size, numClasses, Array.ofDim[Double](size * numClasses))
-
-    if (fitIntercept) {
-      val intercept = coefficientsArray.takeRight(numClasses)
-      var i = 0
-      while (i < size) {
-        var j = 0
-        while (j < numClasses) {
-          mat.update(i, j, intercept(j))
-          j += 1
-        }
-        i += 1
-      }
-      BLAS.gemm(1.0, block.matrix, multinomialLinear.transpose, 1.0, mat)
-    } else {
-      BLAS.gemm(1.0, block.matrix, multinomialLinear.transpose, 0.0, mat)
-    }
-
-    // in-place convert margins to multipliers
-    // then, mat represents multipliers
-    var i = 0
-    val tmp = Array.ofDim[Double](numClasses)
-    while (i < size) {
-      val weight = block.getWeight(i)
-      if (weight > 0) {
-        weightSum += weight
-        val label = block.getLabel(i)
-
-        var maxMargin = Double.NegativeInfinity
-        var j = 0
-        while (j < numClasses) {
-          tmp(j) = mat(i, j)
-          maxMargin = math.max(maxMargin, tmp(j))
-          j += 1
-        }
-
-        // marginOfLabel is margins(label) in the formula
-        val marginOfLabel = tmp(label.toInt)
-
-        var sum = 0.0
-        j = 0
-        while (j < numClasses) {
-          if (maxMargin > 0) tmp(j) -= maxMargin
-          val exp = math.exp(tmp(j))
-          sum += exp
-          tmp(j) = exp
-          j += 1
-        }
-
-        j = 0
-        while (j < numClasses) {
-          val multiplier = weight * (tmp(j) / sum - (if (label == j) 1.0 else 0.0))
-          mat.update(i, j, multiplier)
-          j += 1
-        }
-
-        if (maxMargin > 0) {
-          lossSum += weight * (math.log(sum) - marginOfLabel + maxMargin)
-        } else {
-          lossSum += weight * (math.log(sum) - marginOfLabel)
-        }
-      } else {
-        var j = 0
-        while (j < numClasses) {
-          mat.update(i, j, 0.0)
-          j += 1
-        }
-      }
-      i += 1
-    }
-
-    // block.matrix:                  S X F, unknown type
-    // mat (multipliers):             S X C, dense
-    // gradSumMat(gradientSumArray):  C X FPI (numFeaturesPlusIntercept), dense
-    block.matrix match {
-      case dm: DenseMatrix if !fitIntercept =>
-        // If fitIntercept==false, gradientSumArray += mat.T X matrix
-        // GEMM requires block.matrix is dense
-        val gradSumMat = new DenseMatrix(numClasses, numFeatures, localGradientSumArray)
-        BLAS.gemm(1.0, mat.transpose, dm, 1.0, gradSumMat)
-
-      case _ =>
-        // Otherwise, use linearGradSumMat (F X C) as a temp matrix:
-        // linearGradSumMat = matrix.T X mat
-        val linearGradSumMat = new DenseMatrix(numFeatures, numClasses,
-          Array.ofDim[Double](numFeatures * numClasses))
-        BLAS.gemm(1.0, block.matrix.transpose, mat, 0.0, linearGradSumMat)
-        linearGradSumMat.foreachActive { (i, j, v) =>
-          if (v != 0) localGradientSumArray(i * numClasses + j) += v
-        }
-
-        if (fitIntercept) {
-          val start = numClasses * numFeatures
-          mat.foreachActive { (i, j, v) =>
-            if (v != 0) localGradientSumArray(start + j) += v
-          }
-        }
-    }
-  }
-
   /**
    * Add a new training instance to this LogisticAggregator, and update the loss and gradient
    * of the objective function.
@@ -535,28 +363,4 @@ private[ml] class LogisticAggregator(
       this
     }
   }
-
-  /**
-   * Add a new training instance block to this LogisticAggregator, and update the loss and gradient
-   * of the objective function.
-   *
-   * @param block The instance block of data point to be added.
-   * @return This LogisticAggregator object.
-   */
-  def add(block: InstanceBlock): this.type = {
-    require(numFeatures == block.numFeatures, s"Dimensions mismatch when adding new " +
-      s"instance. Expecting $numFeatures but got ${block.numFeatures}.")
-    require(block.weightIter.forall(_ >= 0),
-      s"instance weights ${block.weightIter.mkString("[", ",", "]")} has to be >= 0.0")
-
-    if (block.weightIter.forall(_ == 0)) return this
-
-    if (multinomial) {
-      multinomialUpdateInPlace(block)
-    } else {
-      binaryUpdateInPlace(block)
-    }
-
-    this
-  }
 }
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
index 3d1fab8..7ac680e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
@@ -104,11 +104,7 @@ private[shared] object SharedParamsCodeGen {
         isValid = "ParamValidators.inArray(Array(\"euclidean\", \"cosine\"))"),
       ParamDesc[String]("validationIndicatorCol", "name of the column that indicates whether " +
         "each row is for training or for validation. False indicates training; true indicates " +
-        "validation"),
-      ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " +
-        "stacked within partitions. If block size is more than remaining data in a partition " +
-        "then it is adjusted to the size of this data", Some("1024"),
-        isValid = "ParamValidators.gt(0)", isExpertParam = true)
+        "validation.")
     )
 
     val code = genSharedParams(params)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
index 7fe8ccd..44c993e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
@@ -570,31 +570,12 @@ trait HasDistanceMeasure extends Params {
 trait HasValidationIndicatorCol extends Params {
 
   /**
-   * Param for name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation.
+   * Param for name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation..
    * @group param
    */
-  final val validationIndicatorCol: Param[String] = new Param[String](this, "validationIndicatorCol", "name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation")
+  final val validationIndicatorCol: Param[String] = new Param[String](this, "validationIndicatorCol", "name of the column that indicates whether each row is for training or for validation. False indicates training; true indicates validation.")
 
   /** @group getParam */
   final def getValidationIndicatorCol: String = $(validationIndicatorCol)
 }
-
-/**
- * Trait for shared param blockSize (default: 1024). This trait may be changed or
- * removed between minor versions.
- */
-@DeveloperApi
-trait HasBlockSize extends Params {
-
-  /**
-   * Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.
-   * @group expertParam
-   */
-  final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data", ParamValidators.gt(0))
-
-  setDefault(blockSize, 1024)
-
-  /** @group expertGetParam */
-  final def getBlockSize: Int = $(blockSize)
-}
 // scalastyle:on
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 002146f..2fb9a27 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -54,8 +54,7 @@ import org.apache.spark.util.random.XORShiftRandom
 /**
  * Common params for ALS and ALSModel.
  */
-private[recommendation] trait ALSModelParams extends Params with HasPredictionCol
-  with HasBlockSize {
+private[recommendation] trait ALSModelParams extends Params with HasPredictionCol {
   /**
    * Param for the column name for user ids. Ids must be integers. Other
    * numeric types are supported for this column, but will be cast to integers as long as they
@@ -126,8 +125,6 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo
 
   /** @group expertGetParam */
   def getColdStartStrategy: String = $(coldStartStrategy).toLowerCase(Locale.ROOT)
-
-  setDefault(blockSize -> 4096)
 }
 
 /**
@@ -291,15 +288,6 @@ class ALSModel private[ml] (
   @Since("2.2.0")
   def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)
 
-  /**
-   * Set block size for stacking input data in matrices.
-   * Default is 4096.
-   *
-   * @group expertSetParam
-   */
-  @Since("3.0.0")
-  def setBlockSize(value: Int): this.type = set(blockSize, value)
-
   private val predict = udf { (featuresA: Seq[Float], featuresB: Seq[Float]) =>
     if (featuresA != null && featuresB != null) {
       var dotProduct = 0.0f
@@ -363,7 +351,7 @@ class ALSModel private[ml] (
    */
   @Since("2.2.0")
   def recommendForAllUsers(numItems: Int): DataFrame = {
-    recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
+    recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems)
   }
 
   /**
@@ -378,7 +366,7 @@ class ALSModel private[ml] (
   @Since("2.3.0")
   def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = {
     val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol))
-    recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems, $(blockSize))
+    recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems)
   }
 
   /**
@@ -389,7 +377,7 @@ class ALSModel private[ml] (
    */
   @Since("2.2.0")
   def recommendForAllItems(numUsers: Int): DataFrame = {
-    recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
+    recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers)
   }
 
   /**
@@ -404,7 +392,7 @@ class ALSModel private[ml] (
   @Since("2.3.0")
   def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = {
     val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol))
-    recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers, $(blockSize))
+    recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers)
   }
 
   /**
@@ -453,12 +441,11 @@ class ALSModel private[ml] (
       dstFactors: DataFrame,
       srcOutputColumn: String,
       dstOutputColumn: String,
-      num: Int,
-      blockSize: Int): DataFrame = {
+      num: Int): DataFrame = {
     import srcFactors.sparkSession.implicits._
 
-    val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize)
-    val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize)
+    val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])])
+    val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])])
     val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked)
       .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])]
       .flatMap { case (srcIter, dstIter) =>
@@ -496,10 +483,11 @@ class ALSModel private[ml] (
 
   /**
    * Blockifies factors to improve the efficiency of cross join
+   * TODO: SPARK-20443 - expose blockSize as a param?
    */
   private def blockify(
       factors: Dataset[(Int, Array[Float])],
-      blockSize: Int): Dataset[Seq[(Int, Array[Float])]] = {
+      blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = {
     import factors.sparkSession.implicits._
     factors.mapPartitions(_.grouped(blockSize))
   }
@@ -667,15 +655,6 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
   def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value)
 
   /**
-   * Set block size for stacking input data in matrices.
-   * Default is 4096.
-   *
-   * @group expertSetParam
-   */
-  @Since("3.0.0")
-  def setBlockSize(value: Int): this.type = set(blockSize, value)
-
-  /**
    * Sets both numUserBlocks and numItemBlocks to the specific value.
    *
    * @group setParam
@@ -704,7 +683,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
     instr.logDataset(dataset)
     instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
       itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval,
-      seed, intermediateStorageLevel, finalStorageLevel, blockSize)
+      seed, intermediateStorageLevel, finalStorageLevel)
 
     val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank),
       numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
@@ -715,8 +694,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
       checkpointInterval = $(checkpointInterval), seed = $(seed))
     val userDF = userFactors.toDF("id", "features")
     val itemDF = itemFactors.toDF("id", "features")
-    val model = new ALSModel(uid, $(rank), userDF, itemDF).setBlockSize($(blockSize))
-      .setParent(this)
+    val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this)
     copyValues(model)
   }
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index fc59da8..64e5e19 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -28,7 +28,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.Logging
 import org.apache.spark.ml.{PipelineStage, PredictorParams}
-import org.apache.spark.ml.feature.{Instance, InstanceBlock}
+import org.apache.spark.ml.feature.Instance
 import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.linalg.BLAS._
 import org.apache.spark.ml.optim.WeightedLeastSquares
@@ -55,7 +55,7 @@ import org.apache.spark.util.VersionUtils.majorMinorVersion
 private[regression] trait LinearRegressionParams extends PredictorParams
     with HasRegParam with HasElasticNetParam with HasMaxIter with HasTol
     with HasFitIntercept with HasStandardization with HasWeightCol with HasSolver
-    with HasAggregationDepth with HasLoss with HasBlockSize {
+    with HasAggregationDepth with HasLoss {
 
   import LinearRegression._
 
@@ -316,15 +316,6 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
   def setEpsilon(value: Double): this.type = set(epsilon, value)
   setDefault(epsilon -> 1.35)
 
-  /**
-   * Set block size for stacking input data in matrices.
-   * Default is 1024.
-   *
-   * @group expertSetParam
-   */
-  @Since("3.0.0")
-  def setBlockSize(value: Int): this.type = set(blockSize, value)
-
   override protected def train(dataset: Dataset[_]): LinearRegressionModel = instrumented { instr =>
     // Extract the number of features before deciding optimization solver.
     val numFeatures = MetadataUtils.getNumFeatures(dataset, $(featuresCol))
@@ -363,6 +354,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
       return lrModel.setSummary(Some(trainingSummary))
     }
 
+    val handlePersistence = dataset.storageLevel == StorageLevel.NONE
+    if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
+
     val (featuresSummarizer, ySummarizer) = instances.treeAggregate(
       (Summarizer.createSummarizerBuffer("mean", "std"),
         Summarizer.createSummarizerBuffer("mean", "std", "count")))(
@@ -398,6 +392,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
             s"will be zeros and the intercept will be the mean of the label; as a result, " +
             s"training is not needed.")
         }
+        if (handlePersistence) instances.unpersist()
         val coefficients = Vectors.sparse(numFeatures, Seq.empty)
         val intercept = yMean
 
@@ -426,8 +421,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
     // if y is constant (rawYStd is zero), then y cannot be scaled. In this case
     // setting yStd=abs(yMean) ensures that y is not scaled anymore in l-bfgs algorithm.
     val yStd = if (rawYStd > 0) rawYStd else math.abs(yMean)
-    val featuresMean = featuresSummarizer.mean.compressed
-    val featuresStd = featuresSummarizer.std.compressed
+    val featuresMean = featuresSummarizer.mean.toArray
+    val featuresStd = featuresSummarizer.std.toArray
     val bcFeaturesMean = instances.context.broadcast(featuresMean)
     val bcFeaturesStd = instances.context.broadcast(featuresStd)
 
@@ -447,36 +442,23 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
     val effectiveL1RegParam = $(elasticNetParam) * effectiveRegParam
     val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam
 
+    val getFeaturesStd = (j: Int) => if (j >= 0 && j < numFeatures) featuresStd(j) else 0.0
     val regularization = if (effectiveL2RegParam != 0.0) {
       val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures
       Some(new L2Regularization(effectiveL2RegParam, shouldApply,
-        if ($(standardization)) None else Some(featuresStd.apply)))
+        if ($(standardization)) None else Some(getFeaturesStd)))
     } else {
       None
     }
 
-    val standardized = instances.map {
-      case Instance(label, weight, features) =>
-        val featuresStd = bcFeaturesStd.value
-        val array = Array.ofDim[Double](numFeatures)
-        features.foreachNonZero { (i, v) =>
-          val std = featuresStd(i)
-          if (std != 0) array(i) = v / std
-        }
-        Instance(label, weight, Vectors.dense(array))
-    }
-    val blocks = InstanceBlock.blokify(standardized, $(blockSize))
-      .persist(StorageLevel.MEMORY_AND_DISK)
-      .setName(s"training dataset (blockSize=${$(blockSize)})")
-
     val costFun = $(loss) match {
       case SquaredError =>
         val getAggregatorFunc = new LeastSquaresAggregator(yStd, yMean, $(fitIntercept),
           bcFeaturesStd, bcFeaturesMean)(_)
-        new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth))
+        new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth))
       case Huber =>
-        val getAggregatorFunc = new HuberAggregator(numFeatures, $(fitIntercept), $(epsilon))(_)
-        new RDDLossFunction(blocks, getAggregatorFunc, regularization, $(aggregationDepth))
+        val getAggregatorFunc = new HuberAggregator($(fitIntercept), $(epsilon), bcFeaturesStd)(_)
+        new RDDLossFunction(instances, getAggregatorFunc, regularization, $(aggregationDepth))
     }
 
     val optimizer = $(loss) match {
@@ -542,7 +524,6 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
         throw new SparkException(msg)
       }
 
-      blocks.unpersist()
       bcFeaturesMean.destroy()
       bcFeaturesStd.destroy()
 
@@ -576,7 +557,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
             after the coefficients are converged. See the following discussion for detail.
             http://stats.stackexchange.com/questions/13617/how-is-the-intercept-computed-in-glmnet
             */
-            yMean - dot(Vectors.dense(rawCoefficients), featuresMean)
+            yMean - dot(Vectors.dense(rawCoefficients), Vectors.dense(featuresMean))
           case Huber => parameters(numFeatures)
         }
       } else {
@@ -591,6 +572,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
       (Vectors.dense(rawCoefficients).compressed, interceptValue, scaleValue, arrayBuilder.result())
     }
 
+    if (handlePersistence) instances.unpersist()
+
     val model = copyValues(new LinearRegressionModel(uid, coefficients, intercept, scale))
     // Handle possible missing or invalid prediction columns
     val (summaryModel, predictionColName) = model.findSummaryModelAndPredictionCol()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
index f88f3fc..21eb17d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
@@ -339,8 +339,10 @@ class LogisticRegressionWithLBFGS
         // Convert our input into a DataFrame
         val spark = SparkSession.builder().sparkContext(input.context).getOrCreate()
         val df = spark.createDataFrame(input.map(_.asML))
+        // Determine if we should cache the DF
+        val handlePersistence = input.getStorageLevel == StorageLevel.NONE
         // Train our model
-        val mlLogisticRegressionModel = lr.train(df)
+        val mlLogisticRegressionModel = lr.train(df, handlePersistence)
         // convert the model
         val weights = Vectors.dense(mlLogisticRegressionModel.coefficients.toArray)
         createModel(weights, mlLogisticRegressionModel.intercept)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
index 2b63dc2..c2072ce 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala
@@ -179,7 +179,7 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest {
   test("sparse coefficients in HingeAggregator") {
     val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0)))
     val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0))
-    val agg = new HingeAggregator(1, true)(bcCoefficients)
+    val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients)
     val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") {
       intercept[IllegalArgumentException] {
         agg.add(Instance(1.0, 1.0, Vectors.dense(1.0)))
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index 9e359ba..6d31e6e 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -542,7 +542,7 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
   test("sparse coefficients in LogisticAggregator") {
     val bcCoefficientsBinary = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0)))
     val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0))
-    val binaryAgg = new LogisticAggregator(1, 2,
+    val binaryAgg = new LogisticAggregator(bcFeaturesStd, 2,
       fitIntercept = true, multinomial = false)(bcCoefficientsBinary)
     val thrownBinary = withClue("binary logistic aggregator cannot handle sparse coefficients") {
       intercept[IllegalArgumentException] {
@@ -552,7 +552,7 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
     assert(thrownBinary.getMessage.contains("coefficients only supports dense"))
 
     val bcCoefficientsMulti = spark.sparkContext.broadcast(Vectors.sparse(6, Array(0), Array(1.0)))
-    val multinomialAgg = new LogisticAggregator(1, 3,
+    val multinomialAgg = new LogisticAggregator(bcFeaturesStd, 3,
       fitIntercept = true, multinomial = true)(bcCoefficientsMulti)
     val thrown = withClue("multinomial logistic aggregator cannot handle sparse coefficients") {
       intercept[IllegalArgumentException] {
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
index d780bdf..5a74490 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuite.scala
@@ -42,36 +42,5 @@ class InstanceSuite extends SparkFunSuite{
       val o2 = ser.deserialize[OffsetInstance](ser.serialize(o))
       assert(o === o2)
     }
-
-    val block1 = InstanceBlock.fromInstances(Seq(instance1))
-    val block2 = InstanceBlock.fromInstances(Seq(instance1, instance2))
-    Seq(block1, block2).foreach { o =>
-      val o2 = ser.deserialize[InstanceBlock](ser.serialize(o))
-      assert(o.labels === o2.labels)
-      assert(o.weights === o2.weights)
-      assert(o.matrix === o2.matrix)
-    }
-  }
-
-  test("InstanceBlock: check correctness") {
-    val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0))
-    val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse)
-    val instances = Seq(instance1, instance2)
-
-    val block = InstanceBlock.fromInstances(instances)
-    assert(block.size === 2)
-    assert(block.numFeatures === 2)
-    block.instanceIterator.zipWithIndex.foreach {
-      case (instance, i) =>
-        assert(instance.label === instances(i).label)
-        assert(instance.weight === instances(i).weight)
-        assert(instance.features.toArray === instances(i).features.toArray)
-    }
-    Seq(0, 1).foreach { i =>
-      val nzIter = block.getNonZeroIter(i)
-      val vec = Vectors.sparse(2, nzIter.toSeq)
-      assert(vec.toArray === instances(i).features.toArray)
-    }
   }
-
 }
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
index c02a0a5..61b48ff 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.ml.optim.aggregator
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.feature.{Instance, InstanceBlock}
+import org.apache.spark.ml.feature.Instance
 import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.util.MLlibTestSparkContext
@@ -32,21 +32,21 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    instances = standardize(Array(
+    instances = Array(
       Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
       Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
       Instance(0.0, 0.3, Vectors.dense(4.0, 0.5))
-    ))
-    instancesConstantFeature = standardize(Array(
+    )
+    instancesConstantFeature = Array(
       Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
       Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)),
       Instance(1.0, 0.3, Vectors.dense(1.0, 0.5))
-    ))
-    instancesConstantFeatureFiltered = standardize(Array(
+    )
+    instancesConstantFeatureFiltered = Array(
       Instance(0.0, 0.1, Vectors.dense(2.0)),
       Instance(1.0, 0.5, Vectors.dense(1.0)),
       Instance(2.0, 0.3, Vectors.dense(0.5))
-    ))
+    )
   }
 
    /** Get summary statistics for some data and create a new HingeAggregator. */
@@ -54,23 +54,12 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
       instances: Array[Instance],
       coefficients: Vector,
       fitIntercept: Boolean): HingeAggregator = {
-    val bcCoefficients = spark.sparkContext.broadcast(coefficients)
-    new HingeAggregator(instances.head.features.size, fitIntercept)(bcCoefficients)
-  }
-
-  private def standardize(instances: Array[Instance]): Array[Instance] = {
-    val (featuresSummarizer, _) =
+    val (featuresSummarizer, ySummarizer) =
       DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances)
-    val stdArray = featuresSummarizer.variance.toArray.map(math.sqrt)
-    val numFeatures = stdArray.length
-    instances.map { case Instance(label, weight, features) =>
-      val standardized = Array.ofDim[Double](numFeatures)
-      features.foreachNonZero { (i, v) =>
-        val std = stdArray(i)
-        if (std != 0) standardized(i) = v / std
-      }
-      Instance(label, weight, Vectors.dense(standardized).compressed)
-    }
+    val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
+    val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd)
+    val bcCoefficients = spark.sparkContext.broadcast(coefficients)
+    new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients)
   }
 
   test("aggregator add method input size") {
@@ -171,21 +160,4 @@ class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
     assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0))
   }
 
-  test("add instance block") {
-    val coefArray = Array(1.0, 2.0)
-    val intercept = 1.0
-
-    val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)),
-      fitIntercept = true)
-    instances.foreach(agg.add)
-
-    val agg2 = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)),
-      fitIntercept = true)
-    val block = InstanceBlock.fromInstances(instances)
-    agg2.add(block)
-
-    assert(agg.loss ~== agg2.loss relTol 1e-8)
-    assert(agg.gradient ~== agg2.gradient relTol 1e-8)
-  }
-
 }
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HuberAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HuberAggregatorSuite.scala
index 7c544e99..718ffa2 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HuberAggregatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HuberAggregatorSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.ml.optim.aggregator
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.feature.{Instance, InstanceBlock}
+import org.apache.spark.ml.feature.Instance
 import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.util.MLlibTestSparkContext
@@ -32,21 +32,21 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    instances = standardize(Array(
+    instances = Array(
       Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
       Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
       Instance(2.0, 0.3, Vectors.dense(4.0, 0.5))
-    ))
-    instancesConstantFeature = standardize(Array(
+    )
+    instancesConstantFeature = Array(
       Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
       Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)),
       Instance(2.0, 0.3, Vectors.dense(1.0, 0.5))
-    ))
-    instancesConstantFeatureFiltered = standardize(Array(
+    )
+    instancesConstantFeatureFiltered = Array(
       Instance(0.0, 0.1, Vectors.dense(2.0)),
       Instance(1.0, 0.5, Vectors.dense(1.0)),
       Instance(2.0, 0.3, Vectors.dense(0.5))
-    ))
+    )
   }
 
   /** Get summary statistics for some data and create a new HuberAggregator. */
@@ -56,28 +56,10 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
       fitIntercept: Boolean,
       epsilon: Double): HuberAggregator = {
     val (featuresSummarizer, _) = getRegressionSummarizers(instances)
-    val numFeatures = featuresSummarizer.variance.size
+    val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
+    val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd)
     val bcParameters = spark.sparkContext.broadcast(parameters)
-    new HuberAggregator(numFeatures, fitIntercept, epsilon)(bcParameters)
-  }
-
-  private def standardize(
-      instances: Array[Instance],
-      std: Array[Double] = null): Array[Instance] = {
-    val stdArray = if (std == null) {
-      getRegressionSummarizers(instances)._1.variance.toArray.map(math.sqrt)
-    } else {
-      std
-    }
-    val numFeatures = stdArray.length
-    instances.map { case Instance(label, weight, features) =>
-      val standardized = Array.ofDim[Double](numFeatures)
-      features.foreachNonZero { (i, v) =>
-        val std = stdArray(i)
-        if (std != 0) standardized(i) = v / std
-      }
-      Instance(label, weight, Vectors.dense(standardized).compressed)
-    }
+    new HuberAggregator(fitIntercept, epsilon, bcFeaturesStd)(bcParameters)
   }
 
   test("aggregator add method should check input size") {
@@ -173,15 +155,9 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
     val parametersFiltered = Vectors.dense(2.0, 3.0, 4.0)
     val aggConstantFeature = getNewAggregator(instancesConstantFeature, parameters,
       fitIntercept = true, epsilon = 1.35)
-    // std of instancesConstantFeature
-    val stdConstantFeature = getRegressionSummarizers(instancesConstantFeature)
-      ._1.variance.toArray.map(math.sqrt)
-    // Since 3.0.0, we start to standardize input outside of gradient computation,
-    // so here we use std of instancesConstantFeature to standardize instances
-    standardize(instances, stdConstantFeature).foreach(aggConstantFeature.add)
-
     val aggConstantFeatureFiltered = getNewAggregator(instancesConstantFeatureFiltered,
       parametersFiltered, fitIntercept = true, epsilon = 1.35)
+    instances.foreach(aggConstantFeature.add)
     instancesConstantFeatureFiltered.foreach(aggConstantFeatureFiltered.add)
     // constant features should not affect gradient
     def validateGradient(grad: Vector, gradFiltered: Vector): Unit = {
@@ -191,19 +167,4 @@ class HuberAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
 
     validateGradient(aggConstantFeature.gradient, aggConstantFeatureFiltered.gradient)
   }
-
-  test("add instance block") {
-    val paramWithIntercept = Vectors.dense(1.0, 2.0, 3.0, 4.0)
-    val agg1 = getNewAggregator(instances, paramWithIntercept,
-      fitIntercept = true, epsilon = 1.35)
-    instances.foreach(agg1.add)
-
-    val agg2 = getNewAggregator(instances, paramWithIntercept,
-      fitIntercept = true, epsilon = 1.35)
-    val block = InstanceBlock.fromInstances(instances)
-    agg2.add(block)
-
-    assert(agg1.loss ~== agg2.loss relTol 1e-8)
-    assert(agg1.gradient ~== agg2.gradient relTol 1e-8)
-  }
 }
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala
index 5eb4e41..35b6944 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresAggregatorSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.ml.optim.aggregator
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.feature.{Instance, InstanceBlock}
+import org.apache.spark.ml.feature.Instance
 import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.util.MLlibTestSparkContext
@@ -32,21 +32,21 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    instances = standardize(Array(
+    instances = Array(
       Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
       Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
       Instance(2.0, 0.3, Vectors.dense(4.0, 0.5))
-    ))
-    instancesConstantFeature = standardize(Array(
+    )
+    instancesConstantFeature = Array(
       Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
       Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)),
       Instance(2.0, 0.3, Vectors.dense(1.0, 0.5))
-    ))
-    instancesConstantLabel = standardize(Array(
+    )
+    instancesConstantLabel = Array(
       Instance(1.0, 0.1, Vectors.dense(1.0, 2.0)),
       Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
       Instance(1.0, 0.3, Vectors.dense(4.0, 0.5))
-    ))
+    )
   }
 
   /** Get summary statistics for some data and create a new LeastSquaresAggregator. */
@@ -57,34 +57,15 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte
     val (featuresSummarizer, ySummarizer) = getRegressionSummarizers(instances)
     val yStd = math.sqrt(ySummarizer.variance(0))
     val yMean = ySummarizer.mean(0)
-    val featuresStd = Vectors.dense(featuresSummarizer.variance.toArray.map(math.sqrt))
+    val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
     val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd)
-    val featuresMean = featuresSummarizer.mean.asML
-    val bcFeaturesMean = spark.sparkContext.broadcast(featuresMean.compressed)
-    val bcCoefficients = spark.sparkContext.broadcast(coefficients.compressed)
+    val featuresMean = featuresSummarizer.mean
+    val bcFeaturesMean = spark.sparkContext.broadcast(featuresMean.toArray)
+    val bcCoefficients = spark.sparkContext.broadcast(coefficients)
     new LeastSquaresAggregator(yStd, yMean, fitIntercept, bcFeaturesStd,
       bcFeaturesMean)(bcCoefficients)
   }
 
-  private def standardize(
-      instances: Array[Instance],
-      std: Array[Double] = null): Array[Instance] = {
-    val stdArray = if (std == null) {
-      getRegressionSummarizers(instances)._1.variance.toArray.map(math.sqrt)
-    } else {
-      std
-    }
-    val numFeatures = stdArray.length
-    instances.map { case Instance(label, weight, features) =>
-      val standardized = Array.ofDim[Double](numFeatures)
-      features.foreachNonZero { (i, v) =>
-        val std = stdArray(i)
-        if (std != 0) standardized(i) = v / std
-      }
-      Instance(label, weight, Vectors.dense(standardized).compressed)
-    }
-  }
-
   test("aggregator add method input size") {
     val coefficients = Vectors.dense(1.0, 2.0)
     val agg = getNewAggregator(instances, coefficients, fitIntercept = true)
@@ -164,15 +145,9 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte
 
   test("check with zero standard deviation") {
     val coefficients = Vectors.dense(1.0, 2.0)
-    // aggConstantFeature contains std of instancesConstantFeature, and the std of dim=0 is 0
     val aggConstantFeature = getNewAggregator(instancesConstantFeature, coefficients,
       fitIntercept = true)
-    // std of instancesConstantFeature
-    val stdConstantFeature = getRegressionSummarizers(instancesConstantFeature)
-      ._1.variance.toArray.map(math.sqrt)
-    // Since 3.0.0, we start to standardize input outside of gradient computation,
-    // so here we use std of instancesConstantFeature to standardize instances
-    standardize(instances, stdConstantFeature).foreach(aggConstantFeature.add)
+    instances.foreach(aggConstantFeature.add)
     // constant features should not affect gradient
     assert(aggConstantFeature.gradient(0) === 0.0)
 
@@ -182,17 +157,4 @@ class LeastSquaresAggregatorSuite extends SparkFunSuite with MLlibTestSparkConte
       }
     }
   }
-
-  test("add instance block") {
-    val coefficients = Vectors.dense(1.0, 2.0)
-    val agg1 = getNewAggregator(instances, coefficients, fitIntercept = true)
-    instances.foreach(agg1.add)
-
-    val agg2 = getNewAggregator(instances, coefficients, fitIntercept = true)
-    val block = InstanceBlock.fromInstances(instances)
-    agg2.add(block)
-
-    assert(agg1.loss ~== agg2.loss relTol 1e-8)
-    assert(agg1.gradient ~== agg2.gradient relTol 1e-8)
-  }
 }
diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala
index 8371807..e699adc 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.ml.optim.aggregator
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.feature.{Instance, InstanceBlock}
+import org.apache.spark.ml.feature.Instance
 import org.apache.spark.ml.linalg.{BLAS, Matrices, Vector, Vectors}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.util.MLlibTestSparkContext
@@ -32,21 +32,21 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    instances = standardize(Array(
+    instances = Array(
       Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
       Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)),
       Instance(2.0, 0.3, Vectors.dense(4.0, 0.5))
-    ))
-    instancesConstantFeature = standardize(Array(
+    )
+    instancesConstantFeature = Array(
       Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)),
       Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)),
       Instance(2.0, 0.3, Vectors.dense(1.0, 0.5))
-    ))
-    instancesConstantFeatureFiltered = standardize(Array(
+    )
+    instancesConstantFeatureFiltered = Array(
       Instance(0.0, 0.1, Vectors.dense(2.0)),
       Instance(1.0, 0.5, Vectors.dense(1.0)),
       Instance(2.0, 0.3, Vectors.dense(0.5))
-    ))
+    )
   }
 
   /** Get summary statistics for some data and create a new LogisticAggregator. */
@@ -55,27 +55,13 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
       coefficients: Vector,
       fitIntercept: Boolean,
       isMultinomial: Boolean): LogisticAggregator = {
-    val (_, ySummarizer) =
+    val (featuresSummarizer, ySummarizer) =
       DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances)
     val numClasses = ySummarizer.histogram.length
-    val numFeatures = instances.head.features.size
+    val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
+    val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd)
     val bcCoefficients = spark.sparkContext.broadcast(coefficients)
-    new LogisticAggregator(numFeatures, numClasses, fitIntercept, isMultinomial)(bcCoefficients)
-  }
-
-  private def standardize(instances: Array[Instance]): Array[Instance] = {
-    val (featuresSummarizer, _) =
-      DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances)
-    val stdArray = featuresSummarizer.variance.toArray.map(math.sqrt)
-    val numFeatures = stdArray.length
-    instances.map { case Instance(label, weight, features) =>
-      val standardized = Array.ofDim[Double](numFeatures)
-      features.foreachNonZero { (i, v) =>
-        val std = stdArray(i)
-        if (std != 0) standardized(i) = v / std
-      }
-      Instance(label, weight, Vectors.dense(standardized).compressed)
-    }
+    new LogisticAggregator(bcFeaturesStd, numClasses, fitIntercept, isMultinomial)(bcCoefficients)
   }
 
   test("aggregator add method input size") {
@@ -291,24 +277,4 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext {
     validateGradient(aggConstantFeatureBinary.gradient,
       aggConstantFeatureBinaryFiltered.gradient, 1)
   }
-
-  test("add instance block") {
-    val binaryInstances = instances.map { instance =>
-      if (instance.label <= 1.0) instance else Instance(0.0, instance.weight, instance.features)
-    }
-    val coefArray = Array(1.0, 2.0)
-    val intercept = 1.0
-
-    val agg = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)),
-      fitIntercept = true, isMultinomial = false)
-    binaryInstances.foreach(agg.add)
-
-    val agg2 = getNewAggregator(binaryInstances, Vectors.dense(coefArray ++ Array(intercept)),
-      fitIntercept = true, isMultinomial = false)
-    val block = InstanceBlock.fromInstances(binaryInstances)
-    agg2.add(block)
-
-    assert(agg.loss ~== agg2.loss relTol 1e-8)
-    assert(agg.gradient ~== agg2.gradient relTol 1e-8)
-  }
 }
diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py
index 5ee4231..5ab8e60 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -165,8 +165,7 @@ class JavaProbabilisticClassificationModel(JavaClassificationModel,
 
 
 class _LinearSVCParams(_JavaClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol,
-                       HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold,
-                       HasBlockSize):
+                       HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold):
     """
     Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`.
 
@@ -215,8 +214,6 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable
     LinearSVCModel...
     >>> model.getThreshold()
     0.5
-    >>> model.getBlockSize()
-    1024
     >>> model.coefficients
     DenseVector([0.0, -0.2792, -0.1833])
     >>> model.intercept
@@ -255,19 +252,18 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable
     def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
                  maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction",
                  fitIntercept=True, standardization=True, threshold=0.0, weightCol=None,
-                 aggregationDepth=2, blockSize=1024):
+                 aggregationDepth=2):
         """
         __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
                  maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \
                  fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \
-                 aggregationDepth=2, blockSize=1024):
+                 aggregationDepth=2):
         """
         super(LinearSVC, self).__init__()
         self._java_obj = self._new_java_obj(
             "org.apache.spark.ml.classification.LinearSVC", self.uid)
         self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True,
-                         standardization=True, threshold=0.0, aggregationDepth=2,
-                         blockSize=1024)
+                         standardization=True, threshold=0.0, aggregationDepth=2)
         kwargs = self._input_kwargs
         self.setParams(**kwargs)
 
@@ -276,12 +272,12 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable
     def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
                   maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction",
                   fitIntercept=True, standardization=True, threshold=0.0, weightCol=None,
-                  aggregationDepth=2, blockSize=1024):
+                  aggregationDepth=2):
         """
         setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
                   maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \
                   fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \
-                  aggregationDepth=2, blockSize=1024):
+                  aggregationDepth=2):
         Sets params for Linear SVM Classifier.
         """
         kwargs = self._input_kwargs
@@ -346,13 +342,6 @@ class LinearSVC(JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadable
         """
         return self._set(aggregationDepth=value)
 
-    @since("3.0.0")
-    def setBlockSize(self, value):
-        """
-        Sets the value of :py:attr:`blockSize`.
-        """
-        return self._set(blockSize=value)
-
 
 class LinearSVCModel(JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable):
     """
@@ -388,7 +377,7 @@ class LinearSVCModel(JavaClassificationModel, _LinearSVCParams, JavaMLWritable,
 class _LogisticRegressionParams(_JavaProbabilisticClassifierParams, HasRegParam,
                                 HasElasticNetParam, HasMaxIter, HasFitIntercept, HasTol,
                                 HasStandardization, HasWeightCol, HasAggregationDepth,
-                                HasThreshold, HasBlockSize):
+                                HasThreshold):
     """
     Params for :py:class:`LogisticRegression` and :py:class:`LogisticRegressionModel`.
 
@@ -570,8 +559,6 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams,
     10
     >>> blor.clear(blor.maxIter)
     >>> blorModel = blor.fit(bdf)
-    >>> blorModel.getBlockSize()
-    1024
     >>> blorModel.setFeaturesCol("features")
     LogisticRegressionModel...
     >>> blorModel.setProbabilityCol("newProbability")
@@ -640,7 +627,7 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams,
                  rawPredictionCol="rawPrediction", standardization=True, weightCol=None,
                  aggregationDepth=2, family="auto",
                  lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None,
-                 lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024):
+                 lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None):
 
         """
         __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
@@ -649,14 +636,13 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams,
                  rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \
                  aggregationDepth=2, family="auto", \
                  lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \
-                 lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024):
+                 lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None):
         If the threshold and thresholds Params are both set, they must be equivalent.
         """
         super(LogisticRegression, self).__init__()
         self._java_obj = self._new_java_obj(
             "org.apache.spark.ml.classification.LogisticRegression", self.uid)
-        self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto",
-                         blockSize=1024)
+        self._setDefault(maxIter=100, regParam=0.0, tol=1E-6, threshold=0.5, family="auto")
         kwargs = self._input_kwargs
         self.setParams(**kwargs)
         self._checkThresholdConsistency()
@@ -669,7 +655,7 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams,
                   rawPredictionCol="rawPrediction", standardization=True, weightCol=None,
                   aggregationDepth=2, family="auto",
                   lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None,
-                  lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024):
+                  lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None):
         """
         setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
                   maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \
@@ -677,7 +663,7 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams,
                   rawPredictionCol="rawPrediction", standardization=True, weightCol=None, \
                   aggregationDepth=2, family="auto", \
                   lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, \
-                  lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, blockSize=1024):
+                  lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None):
         Sets params for logistic regression.
         If the threshold and thresholds Params are both set, they must be equivalent.
         """
@@ -772,13 +758,6 @@ class LogisticRegression(JavaProbabilisticClassifier, _LogisticRegressionParams,
         """
         return self._set(aggregationDepth=value)
 
-    @since("3.0.0")
-    def setBlockSize(self, value):
-        """
-        Sets the value of :py:attr:`blockSize`.
-        """
-        return self._set(blockSize=value)
-
 
 class LogisticRegressionModel(JavaProbabilisticClassificationModel, _LogisticRegressionParams,
                               JavaMLWritable, JavaMLReadable, HasTrainingSummary):
@@ -2174,7 +2153,7 @@ class NaiveBayesModel(JavaProbabilisticClassificationModel, _NaiveBayesParams, J
 
 
 class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, HasMaxIter,
-                                  HasTol, HasStepSize, HasSolver, HasBlockSize):
+                                  HasTol, HasStepSize, HasSolver):
     """
     Params for :py:class:`MultilayerPerceptronClassifier`.
 
@@ -2185,6 +2164,11 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H
                    "E.g., Array(780, 100, 10) means 780 inputs, one hidden layer with 100 " +
                    "neurons and output layer of 10 neurons.",
                    typeConverter=TypeConverters.toListInt)
+    blockSize = Param(Params._dummy(), "blockSize", "Block size for stacking input data in " +
+                      "matrices. Data is stacked within partitions. If block size is more than " +
+                      "remaining data in a partition then it is adjusted to the size of this " +
+                      "data. Recommended size is between 10 and 1000, default is 128.",
+                      typeConverter=TypeConverters.toInt)
     solver = Param(Params._dummy(), "solver", "The solver algorithm for optimization. Supported " +
                    "options: l-bfgs, gd.", typeConverter=TypeConverters.toString)
     initialWeights = Param(Params._dummy(), "initialWeights", "The initial weights of the model.",
@@ -2197,6 +2181,13 @@ class _MultilayerPerceptronParams(_JavaProbabilisticClassifierParams, HasSeed, H
         """
         return self.getOrDefault(self.layers)
 
+    @since("1.6.0")
+    def getBlockSize(self):
+        """
+        Gets the value of blockSize or its default value.
+        """
+        return self.getOrDefault(self.blockSize)
+
     @since("2.0.0")
     def getInitialWeights(self):
         """
@@ -2220,17 +2211,11 @@ class MultilayerPerceptronClassifier(JavaProbabilisticClassifier, _MultilayerPer
     ...     (1.0, Vectors.dense([0.0, 1.0])),
     ...     (1.0, Vectors.dense([1.0, 0.0])),
     ...     (0.0, Vectors.dense([1.0, 1.0]))], ["label", "features"])
-    >>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123)
+    >>> mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], blockSize=1, seed=123)
     >>> mlp.setMaxIter(100)
     MultilayerPerceptronClassifier...
     >>> mlp.getMaxIter()
     100
-    >>> mlp.getBlockSize()
-    128
-    >>> mlp.setBlockSize(1)
-    MultilayerPerceptronClassifier...
-    >>> mlp.getBlockSize()
-    1
     >>> model = mlp.fit(df)
     >>> model.setFeaturesCol("features")
     MultilayerPerceptronClassificationModel...
diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py
index fb4d55d..ded3ca8 100644
--- a/python/pyspark/ml/param/_shared_params_code_gen.py
+++ b/python/pyspark/ml/param/_shared_params_code_gen.py
@@ -164,10 +164,7 @@ if __name__ == "__main__":
          "'euclidean'", "TypeConverters.toString"),
         ("validationIndicatorCol", "name of the column that indicates whether each row is for " +
          "training or for validation. False indicates training; true indicates validation.",
-         None, "TypeConverters.toString"),
-        ("blockSize", "block size for stacking input data in matrices. Data is stacked within "
-         "partitions. If block size is more than remaining data in a partition then it is "
-         "adjusted to the size of this data.", "1024", "TypeConverters.toInt")]
+         None, "TypeConverters.toString")]
 
     code = []
     for name, doc, defaultValueStr, typeConverter in shared:
diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py
index 4564635..8fc1156 100644
--- a/python/pyspark/ml/param/shared.py
+++ b/python/pyspark/ml/param/shared.py
@@ -580,21 +580,3 @@ class HasValidationIndicatorCol(Params):
         Gets the value of validationIndicatorCol or its default value.
         """
         return self.getOrDefault(self.validationIndicatorCol)
-
-
-class HasBlockSize(Params):
-    """
-    Mixin for param blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.
-    """
-
-    blockSize = Param(Params._dummy(), "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", typeConverter=TypeConverters.toInt)
-
-    def __init__(self):
-        super(HasBlockSize, self).__init__()
-        self._setDefault(blockSize=1024)
-
-    def getBlockSize(self):
-        """
-        Gets the value of blockSize or its default value.
-        """
-        return self.getOrDefault(self.blockSize)
diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py
index fe571e2..ee27696 100644
--- a/python/pyspark/ml/recommendation.py
+++ b/python/pyspark/ml/recommendation.py
@@ -28,7 +28,7 @@ __all__ = ['ALS', 'ALSModel']
 
 
 @inherit_doc
-class _ALSModelParams(HasPredictionCol, HasBlockSize):
+class _ALSModelParams(HasPredictionCol):
     """
     Params for :py:class:`ALS` and :py:class:`ALSModel`.
 
@@ -223,8 +223,6 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
     0.1
     >>> als.clear(als.regParam)
     >>> model = als.fit(df)
-    >>> model.getBlockSize()
-    4096
     >>> model.getUserCol()
     'user'
     >>> model.setUserCol("user")
@@ -284,13 +282,13 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
                  implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
                  ratingCol="rating", nonnegative=False, checkpointInterval=10,
                  intermediateStorageLevel="MEMORY_AND_DISK",
-                 finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
+                 finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
         """
         __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
                  implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=None, \
                  ratingCol="rating", nonnegative=false, checkpointInterval=10, \
                  intermediateStorageLevel="MEMORY_AND_DISK", \
-                 finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", lockSize=4096)
+                 finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
         """
         super(ALS, self).__init__()
         self._java_obj = self._new_java_obj("org.apache.spark.ml.recommendation.ALS", self.uid)
@@ -298,8 +296,7 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
                          implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item",
                          ratingCol="rating", nonnegative=False, checkpointInterval=10,
                          intermediateStorageLevel="MEMORY_AND_DISK",
-                         finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan",
-                         blockSize=4096)
+                         finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
         kwargs = self._input_kwargs
         self.setParams(**kwargs)
 
@@ -309,13 +306,13 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
                   implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
                   ratingCol="rating", nonnegative=False, checkpointInterval=10,
                   intermediateStorageLevel="MEMORY_AND_DISK",
-                  finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096):
+                  finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan"):
         """
         setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
                  implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, \
                  ratingCol="rating", nonnegative=False, checkpointInterval=10, \
                  intermediateStorageLevel="MEMORY_AND_DISK", \
-                 finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan", blockSize=4096)
+                 finalStorageLevel="MEMORY_AND_DISK", coldStartStrategy="nan")
         Sets params for ALS.
         """
         kwargs = self._input_kwargs
@@ -446,13 +443,6 @@ class ALS(JavaEstimator, _ALSParams, JavaMLWritable, JavaMLReadable):
         """
         return self._set(seed=value)
 
-    @since("3.0.0")
-    def setBlockSize(self, value):
-        """
-        Sets the value of :py:attr:`blockSize`.
-        """
-        return self._set(blockSize=value)
-
 
 class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable):
     """
@@ -489,13 +479,6 @@ class ALSModel(JavaModel, _ALSModelParams, JavaMLWritable, JavaMLReadable):
         """
         return self._set(predictionCol=value)
 
-    @since("3.0.0")
-    def setBlockSize(self, value):
-        """
-        Sets the value of :py:attr:`blockSize`.
-        """
-        return self._set(blockSize=value)
-
     @property
     @since("1.4.0")
     def rank(self):
diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py
index a74ba34..a4c9782 100644
--- a/python/pyspark/ml/regression.py
+++ b/python/pyspark/ml/regression.py
@@ -62,7 +62,7 @@ class JavaRegressionModel(JavaPredictionModel, _JavaPredictorParams):
 
 class _LinearRegressionParams(_JavaPredictorParams, HasRegParam, HasElasticNetParam, HasMaxIter,
                               HasTol, HasFitIntercept, HasStandardization, HasWeightCol, HasSolver,
-                              HasAggregationDepth, HasLoss, HasBlockSize):
+                              HasAggregationDepth, HasLoss):
     """
     Params for :py:class:`LinearRegression` and :py:class:`LinearRegressionModel`.
 
@@ -124,8 +124,6 @@ class LinearRegression(JavaRegressor, _LinearRegressionParams, JavaMLWritable, J
     >>> lr.setRegParam(0.0)
     LinearRegression...
     >>> model = lr.fit(df)
-    >>> model.getBlockSize()
-    1024
     >>> model.setFeaturesCol("features")
     LinearRegressionModel...
     >>> model.setPredictionCol("newPrediction")
@@ -171,18 +169,17 @@ class LinearRegression(JavaRegressor, _LinearRegressionParams, JavaMLWritable, J
     def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
                  maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True,
                  standardization=True, solver="auto", weightCol=None, aggregationDepth=2,
-                 loss="squaredError", epsilon=1.35, blockSize=1024):
+                 loss="squaredError", epsilon=1.35):
         """
         __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
                  maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \
                  standardization=True, solver="auto", weightCol=None, aggregationDepth=2, \
-                 loss="squaredError", epsilon=1.35, blockSize=1024)
+                 loss="squaredError", epsilon=1.35)
         """
         super(LinearRegression, self).__init__()
         self._java_obj = self._new_java_obj(
             "org.apache.spark.ml.regression.LinearRegression", self.uid)
-        self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, loss="squaredError", epsilon=1.35,
-                         blockSize=1024)
+        self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, loss="squaredError", epsilon=1.35)
         kwargs = self._input_kwargs
         self.setParams(**kwargs)
 
@@ -191,12 +188,12 @@ class LinearRegression(JavaRegressor, _LinearRegressionParams, JavaMLWritable, J
     def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
                   maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True,
                   standardization=True, solver="auto", weightCol=None, aggregationDepth=2,
-                  loss="squaredError", epsilon=1.35, blockSize=1024):
+                  loss="squaredError", epsilon=1.35):
         """
         setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
                   maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, \
                   standardization=True, solver="auto", weightCol=None, aggregationDepth=2, \
-                  loss="squaredError", epsilon=1.35, blockSize=1024)
+                  loss="squaredError", epsilon=1.35)
         Sets params for linear regression.
         """
         kwargs = self._input_kwargs
@@ -272,13 +269,6 @@ class LinearRegression(JavaRegressor, _LinearRegressionParams, JavaMLWritable, J
         """
         return self._set(lossType=value)
 
-    @since("3.0.0")
-    def setBlockSize(self, value):
-        """
-        Sets the value of :py:attr:`blockSize`.
-        """
-        return self._set(blockSize=value)
-
 
 class LinearRegressionModel(JavaRegressionModel, _LinearRegressionParams, GeneralJavaMLWritable,
                             JavaMLReadable, HasTrainingSummary):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org