You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2018/07/20 19:13:37 UTC

spark git commit: [SPARK-24852][ML] Update spark.ml to use Instrumentation.instrumented.

Repository: spark
Updated Branches:
  refs/heads/master 244bcff19 -> 3cb1b5780


[SPARK-24852][ML] Update spark.ml to use Instrumentation.instrumented.

## What changes were proposed in this pull request?

Followup for #21719.
Update spark.ml training code to fully wrap instrumented methods and remove old instrumentation APIs.

## How was this patch tested?

existing tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Bago Amirbekian <ba...@databricks.com>

Closes #21799 from MrBago/new-instrumentation-apis2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cb1b578
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cb1b578
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cb1b578

Branch: refs/heads/master
Commit: 3cb1b57809d0b4a93223669f5c10cea8fc53eff6
Parents: 244bcff
Author: Bago Amirbekian <ba...@databricks.com>
Authored: Fri Jul 20 12:13:15 2018 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Fri Jul 20 12:13:15 2018 -0700

----------------------------------------------------------------------
 .../classification/DecisionTreeClassifier.scala | 24 +++++-----
 .../spark/ml/classification/GBTClassifier.scala | 14 +++---
 .../spark/ml/classification/LinearSVC.scala     | 12 ++---
 .../ml/classification/LogisticRegression.scala  |  2 +-
 .../MultilayerPerceptronClassifier.scala        | 14 +++---
 .../spark/ml/classification/NaiveBayes.scala    | 12 ++---
 .../spark/ml/classification/OneVsRest.scala     |  9 ++--
 .../classification/RandomForestClassifier.scala | 13 +++---
 .../spark/ml/clustering/BisectingKMeans.scala   | 12 ++---
 .../spark/ml/clustering/GaussianMixture.scala   | 12 ++---
 .../org/apache/spark/ml/clustering/KMeans.scala |  9 ++--
 .../org/apache/spark/ml/clustering/LDA.scala    | 12 ++---
 .../org/apache/spark/ml/fpm/FPGrowth.scala      | 12 ++---
 .../apache/spark/ml/recommendation/ALS.scala    |  9 ++--
 .../ml/regression/AFTSurvivalRegression.scala   | 13 +++---
 .../ml/regression/DecisionTreeRegressor.scala   | 24 +++++-----
 .../spark/ml/regression/GBTRegressor.scala      | 12 ++---
 .../GeneralizedLinearRegression.scala           | 12 ++---
 .../ml/regression/IsotonicRegression.scala      | 12 ++---
 .../spark/ml/regression/LinearRegression.scala  | 21 ++++-----
 .../ml/regression/RandomForestRegressor.scala   | 14 +++---
 .../apache/spark/ml/tuning/CrossValidator.scala |  9 ++--
 .../spark/ml/tuning/TrainValidationSplit.scala  |  9 ++--
 .../apache/spark/ml/util/Instrumentation.scala  | 47 ++------------------
 24 files changed, 153 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
index c9786f1..8a57bfc 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
@@ -29,6 +29,7 @@ import org.apache.spark.ml.tree._
 import org.apache.spark.ml.tree.DecisionTreeModelReadWrite._
 import org.apache.spark.ml.tree.impl.RandomForest
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy}
 import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel}
 import org.apache.spark.rdd.RDD
@@ -96,8 +97,10 @@ class DecisionTreeClassifier @Since("1.4.0") (
   @Since("1.6.0")
   override def setSeed(value: Long): this.type = set(seed, value)
 
-  override protected def train(dataset: Dataset[_]): DecisionTreeClassificationModel = {
-    val instr = Instrumentation.create(this, dataset)
+  override protected def train(
+      dataset: Dataset[_]): DecisionTreeClassificationModel = instrumented { instr =>
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
     val categoricalFeatures: Map[Int, Int] =
       MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
     val numClasses: Int = getNumClasses(dataset)
@@ -112,30 +115,27 @@ class DecisionTreeClassifier @Since("1.4.0") (
     val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset, numClasses)
     val strategy = getOldStrategy(categoricalFeatures, numClasses)
 
-    instr.logParams(maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB,
+    instr.logParams(this, maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB,
       cacheNodeIds, checkpointInterval, impurity, seed)
 
     val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all",
       seed = $(seed), instr = Some(instr), parentUID = Some(uid))
 
-    val m = trees.head.asInstanceOf[DecisionTreeClassificationModel]
-    instr.logSuccess(m)
-    m
+    trees.head.asInstanceOf[DecisionTreeClassificationModel]
   }
 
   /** (private[ml]) Train a decision tree on an RDD */
   private[ml] def train(data: RDD[LabeledPoint],
-      oldStrategy: OldStrategy): DecisionTreeClassificationModel = {
-    val instr = Instrumentation.create(this, data)
-    instr.logParams(maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB,
+      oldStrategy: OldStrategy): DecisionTreeClassificationModel = instrumented { instr =>
+    instr.logPipelineStage(this)
+    instr.logDataset(data)
+    instr.logParams(this, maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB,
       cacheNodeIds, checkpointInterval, impurity, seed)
 
     val trees = RandomForest.run(data, oldStrategy, numTrees = 1, featureSubsetStrategy = "all",
       seed = 0L, instr = Some(instr), parentUID = Some(uid))
 
-    val m = trees.head.asInstanceOf[DecisionTreeClassificationModel]
-    instr.logSuccess(m)
-    m
+    trees.head.asInstanceOf[DecisionTreeClassificationModel]
   }
 
   /** (private[ml]) Create a Strategy instance to use with the old API. */

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
index 337133a..33acd99 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
@@ -31,9 +31,9 @@ import org.apache.spark.ml.tree._
 import org.apache.spark.ml.tree.impl.GradientBoostedTrees
 import org.apache.spark.ml.util._
 import org.apache.spark.ml.util.DefaultParamsReader.Metadata
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
 import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTModel}
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions._
 
@@ -152,7 +152,8 @@ class GBTClassifier @Since("1.4.0") (
     set(validationIndicatorCol, value)
   }
 
-  override protected def train(dataset: Dataset[_]): GBTClassificationModel = {
+  override protected def train(
+      dataset: Dataset[_]): GBTClassificationModel = instrumented { instr =>
     val categoricalFeatures: Map[Int, Int] =
       MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
 
@@ -189,8 +190,9 @@ class GBTClassifier @Since("1.4.0") (
         s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}")
     }
 
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType,
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, labelCol, featuresCol, predictionCol, impurity, lossType,
       maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode,
       seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval, featureSubsetStrategy,
       validationIndicatorCol)
@@ -204,9 +206,7 @@ class GBTClassifier @Since("1.4.0") (
       GradientBoostedTrees.run(trainDataset, boostingStrategy, $(seed), $(featureSubsetStrategy))
     }
 
-    val m = new GBTClassificationModel(uid, baseLearners, learnerWeights, numFeatures)
-    instr.logSuccess(m)
-    m
+    new GBTClassificationModel(uid, baseLearners, learnerWeights, numFeatures)
   }
 
   @Since("1.4.1")

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
index 38eb045..20f9366 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
@@ -33,6 +33,7 @@ import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.linalg.VectorImplicits._
 import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
 import org.apache.spark.rdd.RDD
@@ -162,7 +163,7 @@ class LinearSVC @Since("2.2.0") (
   @Since("2.2.0")
   override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)
 
-  override protected def train(dataset: Dataset[_]): LinearSVCModel = {
+  override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr =>
     val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
     val instances: RDD[Instance] =
       dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map {
@@ -170,8 +171,9 @@ class LinearSVC @Since("2.2.0") (
           Instance(label, weight, features)
       }
 
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(regParam, maxIter, fitIntercept, tol, standardization, threshold,
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, regParam, maxIter, fitIntercept, tol, standardization, threshold,
       aggregationDepth)
 
     val (summarizer, labelSummarizer) = {
@@ -276,9 +278,7 @@ class LinearSVC @Since("2.2.0") (
       (Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result())
     }
 
-    val model = copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
-    instr.logSuccess(model)
-    model
+    copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 25fb9c8..af651b0 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -503,7 +503,7 @@ class LogisticRegression @Since("1.2.0") (
 
     instr.logPipelineStage(this)
     instr.logDataset(dataset)
-    instr.logParams(regParam, elasticNetParam, standardization, threshold,
+    instr.logParams(this, regParam, elasticNetParam, standardization, threshold,
       maxIter, tol, fitIntercept)
 
     val (summarizer, labelSummarizer) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
index 57ba47e..65e3b2d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
@@ -28,6 +28,7 @@ import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.sql.Dataset
 
 /** Params for Multilayer Perceptron. */
@@ -230,9 +231,11 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
    * @param dataset Training dataset
    * @return Fitted model
    */
-  override protected def train(dataset: Dataset[_]): MultilayerPerceptronClassificationModel = {
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(labelCol, featuresCol, predictionCol, layers, maxIter, tol,
+  override protected def train(
+      dataset: Dataset[_]): MultilayerPerceptronClassificationModel = instrumented { instr =>
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, labelCol, featuresCol, predictionCol, layers, maxIter, tol,
       blockSize, solver, stepSize, seed)
 
     val myLayers = $(layers)
@@ -264,10 +267,7 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
     }
     trainer.setStackSize($(blockSize))
     val mlpModel = trainer.train(data)
-    val model = new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights)
-
-    instr.logSuccess(model)
-    model
+    new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
index 1dde18d..f65d397 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
@@ -25,6 +25,7 @@ import org.apache.spark.ml.linalg._
 import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap, ParamValidators}
 import org.apache.spark.ml.param.shared.HasWeightCol
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.functions.{col, lit}
@@ -125,8 +126,9 @@ class NaiveBayes @Since("1.5.0") (
    */
   private[spark] def trainWithLabelCheck(
       dataset: Dataset[_],
-      positiveLabel: Boolean): NaiveBayesModel = {
-    val instr = Instrumentation.create(this, dataset)
+      positiveLabel: Boolean): NaiveBayesModel = instrumented { instr =>
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
     if (positiveLabel && isDefined(thresholds)) {
       val numClasses = getNumClasses(dataset)
       instr.logNumClasses(numClasses)
@@ -148,7 +150,7 @@ class NaiveBayes @Since("1.5.0") (
       }
     }
 
-    instr.logParams(labelCol, featuresCol, weightCol, predictionCol, rawPredictionCol,
+    instr.logParams(this, labelCol, featuresCol, weightCol, predictionCol, rawPredictionCol,
       probabilityCol, modelType, smoothing, thresholds)
 
     val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
@@ -204,9 +206,7 @@ class NaiveBayes @Since("1.5.0") (
 
     val pi = Vectors.dense(piArray)
     val theta = new DenseMatrix(numLabels, numFeatures, thetaArray, true)
-    val model = new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
-    instr.logSuccess(model)
-    model
+    new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
   }
 
   @Since("1.5.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
index 3474b61..1835a91 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
@@ -36,6 +36,7 @@ import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params}
 import org.apache.spark.ml.param.shared.{HasParallelism, HasWeightCol}
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
@@ -362,11 +363,12 @@ final class OneVsRest @Since("1.4.0") (
   }
 
   @Since("2.0.0")
-  override def fit(dataset: Dataset[_]): OneVsRestModel = {
+  override def fit(dataset: Dataset[_]): OneVsRestModel = instrumented { instr =>
     transformSchema(dataset.schema)
 
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(labelCol, featuresCol, predictionCol, parallelism, rawPredictionCol)
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, labelCol, featuresCol, predictionCol, parallelism, rawPredictionCol)
     instr.logNamedValue("classifier", $(classifier).getClass.getCanonicalName)
 
     // determine number of classes either from metadata if provided, or via computation.
@@ -440,7 +442,6 @@ final class OneVsRest @Since("1.4.0") (
       case attr: Attribute => attr
     }
     val model = new OneVsRestModel(uid, labelAttribute.toMetadata(), models).setParent(this)
-    instr.logSuccess(model)
     copyValues(model)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala
index 040db3b..94887ac 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala
@@ -28,6 +28,7 @@ import org.apache.spark.ml.tree._
 import org.apache.spark.ml.tree.impl.RandomForest
 import org.apache.spark.ml.util._
 import org.apache.spark.ml.util.DefaultParamsReader.Metadata
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
 import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestModel}
 import org.apache.spark.rdd.RDD
@@ -115,8 +116,10 @@ class RandomForestClassifier @Since("1.4.0") (
   override def setFeatureSubsetStrategy(value: String): this.type =
     set(featureSubsetStrategy, value)
 
-  override protected def train(dataset: Dataset[_]): RandomForestClassificationModel = {
-    val instr = Instrumentation.create(this, dataset)
+  override protected def train(
+      dataset: Dataset[_]): RandomForestClassificationModel = instrumented { instr =>
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
     val categoricalFeatures: Map[Int, Int] =
       MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
     val numClasses: Int = getNumClasses(dataset)
@@ -131,7 +134,7 @@ class RandomForestClassifier @Since("1.4.0") (
     val strategy =
       super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.Classification, getOldImpurity)
 
-    instr.logParams(labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol,
+    instr.logParams(this, labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol,
       impurity, numTrees, featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain,
       minInstancesPerNode, seed, subsamplingRate, thresholds, cacheNodeIds, checkpointInterval)
 
@@ -140,11 +143,9 @@ class RandomForestClassifier @Since("1.4.0") (
       .map(_.asInstanceOf[DecisionTreeClassificationModel])
 
     val numFeatures = oldDataset.first().features.size
-    val m = new RandomForestClassificationModel(uid, trees, numFeatures, numClasses)
     instr.logNumClasses(numClasses)
     instr.logNumFeatures(numFeatures)
-    instr.logSuccess(m)
-    m
+    new RandomForestClassificationModel(uid, trees, numFeatures, numClasses)
   }
 
   @Since("1.4.1")

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
index de56447..48b8c52 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
@@ -26,6 +26,7 @@ import org.apache.spark.ml.linalg.Vector
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.clustering.{BisectingKMeans => MLlibBisectingKMeans,
   BisectingKMeansModel => MLlibBisectingKMeansModel}
 import org.apache.spark.mllib.linalg.VectorImplicits._
@@ -257,12 +258,13 @@ class BisectingKMeans @Since("2.0.0") (
   def setDistanceMeasure(value: String): this.type = set(distanceMeasure, value)
 
   @Since("2.0.0")
-  override def fit(dataset: Dataset[_]): BisectingKMeansModel = {
+  override def fit(dataset: Dataset[_]): BisectingKMeansModel = instrumented { instr =>
     transformSchema(dataset.schema, logging = true)
     val rdd = DatasetUtils.columnToOldVector(dataset, getFeaturesCol)
 
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(featuresCol, predictionCol, k, maxIter, seed,
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, featuresCol, predictionCol, k, maxIter, seed,
       minDivisibleClusterSize, distanceMeasure)
 
     val bkm = new MLlibBisectingKMeans()
@@ -275,10 +277,8 @@ class BisectingKMeans @Since("2.0.0") (
     val model = copyValues(new BisectingKMeansModel(uid, parentModel).setParent(this))
     val summary = new BisectingKMeansSummary(
       model.transform(dataset), $(predictionCol), $(featuresCol), $(k), $(maxIter))
-    model.setSummary(Some(summary))
     instr.logNamedValue("clusterSizes", summary.clusterSizes)
-    instr.logSuccess(model)
-    model
+    model.setSummary(Some(summary))
   }
 
   @Since("2.0.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
index f0707b3..310b03b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
@@ -29,6 +29,7 @@ import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.stat.distribution.MultivariateGaussian
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Matrix => OldMatrix,
   Vector => OldVector, Vectors => OldVectors}
 import org.apache.spark.rdd.RDD
@@ -335,7 +336,7 @@ class GaussianMixture @Since("2.0.0") (
   private val numSamples = 5
 
   @Since("2.0.0")
-  override def fit(dataset: Dataset[_]): GaussianMixtureModel = {
+  override def fit(dataset: Dataset[_]): GaussianMixtureModel = instrumented { instr =>
     transformSchema(dataset.schema, logging = true)
 
     val sc = dataset.sparkSession.sparkContext
@@ -352,8 +353,9 @@ class GaussianMixture @Since("2.0.0") (
       s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of the covariance" +
       s" matrix is quadratic in the number of features.")
 
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(featuresCol, predictionCol, probabilityCol, k, maxIter, seed, tol)
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, featuresCol, predictionCol, probabilityCol, k, maxIter, seed, tol)
     instr.logNumFeatures(numFeatures)
 
     val shouldDistributeGaussians = GaussianMixture.shouldDistributeGaussians(
@@ -425,11 +427,9 @@ class GaussianMixture @Since("2.0.0") (
     val model = copyValues(new GaussianMixtureModel(uid, weights, gaussianDists)).setParent(this)
     val summary = new GaussianMixtureSummary(model.transform(dataset),
       $(predictionCol), $(probabilityCol), $(featuresCol), $(k), logLikelihood, iter)
-    model.setSummary(Some(summary))
     instr.logNamedValue("logLikelihood", logLikelihood)
     instr.logNamedValue("clusterSizes", summary.clusterSizes)
-    instr.logSuccess(model)
-    model
+    model.setSummary(Some(summary))
   }
 
   @Since("2.0.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
index 6f4a30d..498310d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
@@ -28,6 +28,7 @@ import org.apache.spark.ml.linalg.Vector
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel}
 import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
 import org.apache.spark.mllib.linalg.VectorImplicits._
@@ -336,7 +337,7 @@ class KMeans @Since("1.5.0") (
   def setSeed(value: Long): this.type = set(seed, value)
 
   @Since("2.0.0")
-  override def fit(dataset: Dataset[_]): KMeansModel = {
+  override def fit(dataset: Dataset[_]): KMeansModel = instrumented { instr =>
     transformSchema(dataset.schema, logging = true)
 
     val handlePersistence = dataset.storageLevel == StorageLevel.NONE
@@ -346,8 +347,9 @@ class KMeans @Since("1.5.0") (
       instances.persist(StorageLevel.MEMORY_AND_DISK)
     }
 
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(featuresCol, predictionCol, k, initMode, initSteps, distanceMeasure,
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, featuresCol, predictionCol, k, initMode, initSteps, distanceMeasure,
       maxIter, seed, tol)
     val algo = new MLlibKMeans()
       .setK($(k))
@@ -369,7 +371,6 @@ class KMeans @Since("1.5.0") (
 
     model.setSummary(Some(summary))
     instr.logNamedValue("clusterSizes", summary.clusterSizes)
-    instr.logSuccess(model)
     if (handlePersistence) {
       instances.unpersist()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index fed42c9..50867f7 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -32,6 +32,7 @@ import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed}
 import org.apache.spark.ml.util._
 import org.apache.spark.ml.util.DefaultParamsReader.Metadata
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel,
   EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel,
   LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel,
@@ -896,11 +897,12 @@ class LDA @Since("1.6.0") (
   override def copy(extra: ParamMap): LDA = defaultCopy(extra)
 
   @Since("2.0.0")
-  override def fit(dataset: Dataset[_]): LDAModel = {
+  override def fit(dataset: Dataset[_]): LDAModel = instrumented { instr =>
     transformSchema(dataset.schema, logging = true)
 
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(featuresCol, topicDistributionCol, k, maxIter, subsamplingRate,
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, featuresCol, topicDistributionCol, k, maxIter, subsamplingRate,
       checkpointInterval, keepLastCheckpoint, optimizeDocConcentration, topicConcentration,
       learningDecay, optimizer, learningOffset, seed)
 
@@ -923,9 +925,7 @@ class LDA @Since("1.6.0") (
     }
 
     instr.logNumFeatures(newModel.vocabSize)
-    val model = copyValues(newModel).setParent(this)
-    instr.logSuccess(model)
-    model
+    copyValues(newModel).setParent(this)
   }
 
   @Since("1.6.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
index 9d664b6..85c483c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
@@ -26,6 +26,7 @@ import org.apache.spark.ml.{Estimator, Model}
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared.HasPredictionCol
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules,
   FPGrowth => MLlibFPGrowth}
 import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset
@@ -158,11 +159,12 @@ class FPGrowth @Since("2.2.0") (
     genericFit(dataset)
   }
 
-  private def genericFit[T: ClassTag](dataset: Dataset[_]): FPGrowthModel = {
+  private def genericFit[T: ClassTag](dataset: Dataset[_]): FPGrowthModel = instrumented { instr =>
     val handlePersistence = dataset.storageLevel == StorageLevel.NONE
 
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(params: _*)
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, params: _*)
     val data = dataset.select($(itemsCol))
     val items = data.where(col($(itemsCol)).isNotNull).rdd.map(r => r.getSeq[Any](0).toArray)
     val mllibFP = new MLlibFPGrowth().setMinSupport($(minSupport))
@@ -185,9 +187,7 @@ class FPGrowth @Since("2.2.0") (
       items.unpersist()
     }
 
-    val model = copyValues(new FPGrowthModel(uid, frequentItems)).setParent(this)
-    instr.logSuccess(model)
-    model
+    copyValues(new FPGrowthModel(uid, frequentItems)).setParent(this)
   }
 
   @Since("2.2.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index a23f955..ffe5927 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -39,6 +39,7 @@ import org.apache.spark.ml.linalg.BLAS
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.linalg.CholeskyDecomposition
 import org.apache.spark.mllib.optimization.NNLS
 import org.apache.spark.rdd.RDD
@@ -654,7 +655,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
   }
 
   @Since("2.0.0")
-  override def fit(dataset: Dataset[_]): ALSModel = {
+  override def fit(dataset: Dataset[_]): ALSModel = instrumented { instr =>
     transformSchema(dataset.schema)
     import dataset.sparkSession.implicits._
 
@@ -666,8 +667,9 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
         Rating(row.getInt(0), row.getInt(1), row.getFloat(2))
       }
 
-    val instr = Instrumentation.create(this, ratings)
-    instr.logParams(rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
       itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval,
       seed, intermediateStorageLevel, finalStorageLevel)
 
@@ -681,7 +683,6 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
     val userDF = userFactors.toDF("id", "features")
     val itemDF = itemFactors.toDF("id", "features")
     val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this)
-    instr.logSuccess(model)
     copyValues(model)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
index e27a96e..3cd0706 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
@@ -32,6 +32,7 @@ import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT}
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.linalg.VectorImplicits._
 import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
 import org.apache.spark.mllib.util.MLUtils
@@ -210,7 +211,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
   }
 
   @Since("2.0.0")
-  override def fit(dataset: Dataset[_]): AFTSurvivalRegressionModel = {
+  override def fit(dataset: Dataset[_]): AFTSurvivalRegressionModel = instrumented { instr =>
     transformSchema(dataset.schema, logging = true)
     val instances = extractAFTPoints(dataset)
     val handlePersistence = dataset.storageLevel == StorageLevel.NONE
@@ -229,8 +230,9 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
     val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
     val numFeatures = featuresStd.size
 
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(labelCol, featuresCol, censorCol, predictionCol, quantilesCol,
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, labelCol, featuresCol, censorCol, predictionCol, quantilesCol,
       fitIntercept, maxIter, tol, aggregationDepth)
     instr.logNamedValue("quantileProbabilities.size", $(quantileProbabilities).length)
     instr.logNumFeatures(numFeatures)
@@ -284,10 +286,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
     val coefficients = Vectors.dense(rawCoefficients)
     val intercept = parameters(1)
     val scale = math.exp(parameters(0))
-    val model = copyValues(new AFTSurvivalRegressionModel(uid, coefficients,
-      intercept, scale).setParent(this))
-    instr.logSuccess(model)
-    model
+    copyValues(new AFTSurvivalRegressionModel(uid, coefficients, intercept, scale).setParent(this))
   }
 
   @Since("1.6.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
index 8bcf079..018290f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
@@ -30,6 +30,7 @@ import org.apache.spark.ml.tree._
 import org.apache.spark.ml.tree.DecisionTreeModelReadWrite._
 import org.apache.spark.ml.tree.impl.RandomForest
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy}
 import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel}
 import org.apache.spark.rdd.RDD
@@ -99,37 +100,36 @@ class DecisionTreeRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S
   @Since("2.0.0")
   def setVarianceCol(value: String): this.type = set(varianceCol, value)
 
-  override protected def train(dataset: Dataset[_]): DecisionTreeRegressionModel = {
+  override protected def train(
+      dataset: Dataset[_]): DecisionTreeRegressionModel = instrumented { instr =>
     val categoricalFeatures: Map[Int, Int] =
       MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
     val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset)
     val strategy = getOldStrategy(categoricalFeatures)
 
-    val instr = Instrumentation.create(this, oldDataset)
-    instr.logParams(params: _*)
+    instr.logPipelineStage(this)
+    instr.logDataset(oldDataset)
+    instr.logParams(this, params: _*)
 
     val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all",
       seed = $(seed), instr = Some(instr), parentUID = Some(uid))
 
-    val m = trees.head.asInstanceOf[DecisionTreeRegressionModel]
-    instr.logSuccess(m)
-    m
+    trees.head.asInstanceOf[DecisionTreeRegressionModel]
   }
 
   /** (private[ml]) Train a decision tree on an RDD */
   private[ml] def train(
       data: RDD[LabeledPoint],
       oldStrategy: OldStrategy,
-      featureSubsetStrategy: String): DecisionTreeRegressionModel = {
-    val instr = Instrumentation.create(this, data)
-    instr.logParams(params: _*)
+      featureSubsetStrategy: String): DecisionTreeRegressionModel = instrumented { instr =>
+    instr.logPipelineStage(this)
+    instr.logDataset(data)
+    instr.logParams(this, params: _*)
 
     val trees = RandomForest.run(data, oldStrategy, numTrees = 1, featureSubsetStrategy,
       seed = $(seed), instr = Some(instr), parentUID = Some(uid))
 
-    val m = trees.head.asInstanceOf[DecisionTreeRegressionModel]
-    instr.logSuccess(m)
-    m
+    trees.head.asInstanceOf[DecisionTreeRegressionModel]
   }
 
   /** (private[ml]) Create a Strategy instance to use with the old API. */

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala
index eb8b3c0..3305881 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala
@@ -31,6 +31,7 @@ import org.apache.spark.ml.tree._
 import org.apache.spark.ml.tree.impl.GradientBoostedTrees
 import org.apache.spark.ml.util._
 import org.apache.spark.ml.util.DefaultParamsReader.Metadata
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
 import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTModel}
 import org.apache.spark.rdd.RDD
@@ -151,7 +152,7 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String)
     set(validationIndicatorCol, value)
   }
 
-  override protected def train(dataset: Dataset[_]): GBTRegressionModel = {
+  override protected def train(dataset: Dataset[_]): GBTRegressionModel = instrumented { instr =>
     val categoricalFeatures: Map[Int, Int] =
       MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
 
@@ -168,8 +169,9 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String)
     val numFeatures = trainDataset.first().features.size
     val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Regression)
 
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType,
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, labelCol, featuresCol, predictionCol, impurity, lossType,
       maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode,
       seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval, featureSubsetStrategy)
     instr.logNumFeatures(numFeatures)
@@ -181,9 +183,7 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String)
       GradientBoostedTrees.run(trainDataset, boostingStrategy,
         $(seed), $(featureSubsetStrategy))
     }
-    val m = new GBTRegressionModel(uid, baseLearners, learnerWeights, numFeatures)
-    instr.logSuccess(m)
-    m
+    new GBTRegressionModel(uid, baseLearners, learnerWeights, numFeatures)
   }
 
   @Since("1.4.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
index 143c8a3..20878b6 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
@@ -34,6 +34,7 @@ import org.apache.spark.ml.optim._
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions._
@@ -373,13 +374,15 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
   @Since("2.0.0")
   def setLinkPredictionCol(value: String): this.type = set(linkPredictionCol, value)
 
-  override protected def train(dataset: Dataset[_]): GeneralizedLinearRegressionModel = {
+  override protected def train(
+      dataset: Dataset[_]): GeneralizedLinearRegressionModel = instrumented { instr =>
     val familyAndLink = FamilyAndLink(this)
 
     val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(labelCol, featuresCol, weightCol, offsetCol, predictionCol, linkPredictionCol,
-      family, solver, fitIntercept, link, maxIter, regParam, tol)
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, labelCol, featuresCol, weightCol, offsetCol, predictionCol,
+      linkPredictionCol, family, solver, fitIntercept, link, maxIter, regParam, tol)
     instr.logNumFeatures(numFeatures)
 
     if (numFeatures > WeightedLeastSquares.MAX_NUM_FEATURES) {
@@ -431,7 +434,6 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
       model.setSummary(Some(trainingSummary))
     }
 
-    instr.logSuccess(model)
     model
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
index b046897..8b9233d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
@@ -27,6 +27,7 @@ import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.regression.IsotonicRegressionModel.IsotonicRegressionModelWriter
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.regression.{IsotonicRegression => MLlibIsotonicRegression}
 import org.apache.spark.mllib.regression.{IsotonicRegressionModel => MLlibIsotonicRegressionModel}
 import org.apache.spark.rdd.RDD
@@ -161,15 +162,16 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri
   override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra)
 
   @Since("2.0.0")
-  override def fit(dataset: Dataset[_]): IsotonicRegressionModel = {
+  override def fit(dataset: Dataset[_]): IsotonicRegressionModel = instrumented { instr =>
     transformSchema(dataset.schema, logging = true)
     // Extract columns from data.  If dataset is persisted, do not persist oldDataset.
     val instances = extractWeightedLabeledPoints(dataset)
     val handlePersistence = dataset.storageLevel == StorageLevel.NONE
     if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
 
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(labelCol, featuresCol, weightCol, predictionCol, featureIndex, isotonic)
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, labelCol, featuresCol, weightCol, predictionCol, featureIndex, isotonic)
     instr.logNumFeatures(1)
 
     val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonic))
@@ -177,9 +179,7 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri
 
     if (handlePersistence) instances.unpersist()
 
-    val model = copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this))
-    instr.logSuccess(model)
-    model
+    copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this))
   }
 
   @Since("1.5.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index c45ade9..ce6c12c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -37,6 +37,7 @@ import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
 import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap, ParamValidators}
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.evaluation.RegressionMetrics
 import org.apache.spark.mllib.linalg.VectorImplicits._
 import org.apache.spark.mllib.regression.{LinearRegressionModel => OldLinearRegressionModel}
@@ -315,7 +316,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
   def setEpsilon(value: Double): this.type = set(epsilon, value)
   setDefault(epsilon -> 1.35)
 
-  override protected def train(dataset: Dataset[_]): LinearRegressionModel = {
+  override protected def train(dataset: Dataset[_]): LinearRegressionModel = instrumented { instr =>
     // Extract the number of features before deciding optimization solver.
     val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size
     val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
@@ -326,9 +327,11 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
         Instance(label, weight, features)
     }
 
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(labelCol, featuresCol, weightCol, predictionCol, solver, tol, elasticNetParam,
-      fitIntercept, maxIter, regParam, standardization, aggregationDepth, loss, epsilon)
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, labelCol, featuresCol, weightCol, predictionCol, solver, tol,
+      elasticNetParam, fitIntercept, maxIter, regParam, standardization, aggregationDepth, loss,
+      epsilon)
     instr.logNumFeatures(numFeatures)
 
     if ($(loss) == SquaredError && (($(solver) == Auto &&
@@ -353,9 +356,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
         model.diagInvAtWA.toArray,
         model.objectiveHistory)
 
-      lrModel.setSummary(Some(trainingSummary))
-      instr.logSuccess(lrModel)
-      return lrModel
+      return lrModel.setSummary(Some(trainingSummary))
     }
 
     val handlePersistence = dataset.storageLevel == StorageLevel.NONE
@@ -415,9 +416,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
           Array(0D),
           Array(0D))
 
-        model.setSummary(Some(trainingSummary))
-        instr.logSuccess(model)
-        return model
+        return model.setSummary(Some(trainingSummary))
       } else {
         require($(regParam) == 0.0, "The standard deviation of the label is zero. " +
           "Model cannot be regularized.")
@@ -596,8 +595,6 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
       objectiveHistory)
 
     model.setSummary(Some(trainingSummary))
-    instr.logSuccess(model)
-    model
   }
 
   @Since("1.4.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala
index 4509f85..3587572 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala
@@ -29,6 +29,7 @@ import org.apache.spark.ml.tree._
 import org.apache.spark.ml.tree.impl.RandomForest
 import org.apache.spark.ml.util._
 import org.apache.spark.ml.util.DefaultParamsReader.Metadata
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
 import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestModel}
 import org.apache.spark.rdd.RDD
@@ -114,15 +115,17 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S
   override def setFeatureSubsetStrategy(value: String): this.type =
     set(featureSubsetStrategy, value)
 
-  override protected def train(dataset: Dataset[_]): RandomForestRegressionModel = {
+  override protected def train(
+      dataset: Dataset[_]): RandomForestRegressionModel = instrumented { instr =>
     val categoricalFeatures: Map[Int, Int] =
       MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
     val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset)
     val strategy =
       super.getOldStrategy(categoricalFeatures, numClasses = 0, OldAlgo.Regression, getOldImpurity)
 
-    val instr = Instrumentation.create(this, oldDataset)
-    instr.logParams(labelCol, featuresCol, predictionCol, impurity, numTrees,
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, labelCol, featuresCol, predictionCol, impurity, numTrees,
       featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain,
       minInstancesPerNode, seed, subsamplingRate, cacheNodeIds, checkpointInterval)
 
@@ -131,9 +134,8 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S
       .map(_.asInstanceOf[DecisionTreeRegressionModel])
 
     val numFeatures = oldDataset.first().features.size
-    val m = new RandomForestRegressionModel(uid, trees, numFeatures)
-    instr.logSuccess(m)
-    m
+    instr.logNamedValue(Instrumentation.loggerTags.numFeatures, numFeatures)
+    new RandomForestRegressionModel(uid, trees, numFeatures)
   }
 
   @Since("1.4.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
index f327f37..e60a14f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
@@ -33,6 +33,7 @@ import org.apache.spark.ml.evaluation.Evaluator
 import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators}
 import org.apache.spark.ml.param.shared.{HasCollectSubModels, HasParallelism}
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.sql.{DataFrame, Dataset}
 import org.apache.spark.sql.types.StructType
@@ -118,7 +119,7 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
   def setCollectSubModels(value: Boolean): this.type = set(collectSubModels, value)
 
   @Since("2.0.0")
-  override def fit(dataset: Dataset[_]): CrossValidatorModel = {
+  override def fit(dataset: Dataset[_]): CrossValidatorModel = instrumented { instr =>
     val schema = dataset.schema
     transformSchema(schema, logging = true)
     val sparkSession = dataset.sparkSession
@@ -129,8 +130,9 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
     // Create execution context based on $(parallelism)
     val executionContext = getExecutionContext
 
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(numFolds, seed, parallelism)
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, numFolds, seed, parallelism)
     logTuningParams(instr)
 
     val collectSubModelsParam = $(collectSubModels)
@@ -176,7 +178,6 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
     instr.logInfo(s"Best set of parameters:\n${epm(bestIndex)}")
     instr.logInfo(s"Best cross-validation metric: $bestMetric.")
     val bestModel = est.fit(dataset, epm(bestIndex)).asInstanceOf[Model[_]]
-    instr.logSuccess(bestModel)
     copyValues(new CrossValidatorModel(uid, bestModel, metrics)
       .setSubModels(subModels).setParent(this))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
index 14d6a69..8b25119 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
@@ -34,6 +34,7 @@ import org.apache.spark.ml.evaluation.Evaluator
 import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators}
 import org.apache.spark.ml.param.shared.{HasCollectSubModels, HasParallelism}
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.sql.{DataFrame, Dataset}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.ThreadUtils
@@ -117,7 +118,7 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St
   def setCollectSubModels(value: Boolean): this.type = set(collectSubModels, value)
 
   @Since("2.0.0")
-  override def fit(dataset: Dataset[_]): TrainValidationSplitModel = {
+  override def fit(dataset: Dataset[_]): TrainValidationSplitModel = instrumented { instr =>
     val schema = dataset.schema
     transformSchema(schema, logging = true)
     val est = $(estimator)
@@ -127,8 +128,9 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St
     // Create execution context based on $(parallelism)
     val executionContext = getExecutionContext
 
-    val instr = Instrumentation.create(this, dataset)
-    instr.logParams(trainRatio, seed, parallelism)
+    instr.logPipelineStage(this)
+    instr.logDataset(dataset)
+    instr.logParams(this, trainRatio, seed, parallelism)
     logTuningParams(instr)
 
     val Array(trainingDataset, validationDataset) =
@@ -172,7 +174,6 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St
     instr.logInfo(s"Best set of parameters:\n${epm(bestIndex)}")
     instr.logInfo(s"Best train validation split metric: $bestMetric.")
     val bestModel = est.fit(dataset, epm(bestIndex)).asInstanceOf[Model[_]]
-    instr.logSuccess(bestModel)
     copyValues(new TrainValidationSplitModel(uid, bestModel, metrics)
       .setSubModels(subModels).setParent(this))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
index 2e43a9e..4965491 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
@@ -27,7 +27,7 @@ import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.ml.{Estimator, Model, PipelineStage}
+import org.apache.spark.ml.PipelineStage
 import org.apache.spark.ml.param.{Param, Params}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Dataset
@@ -37,26 +37,16 @@ import org.apache.spark.util.Utils
  * A small wrapper that defines a training session for an estimator, and some methods to log
  * useful information during this session.
  */
-private[spark] class Instrumentation extends Logging {
+private[spark] class Instrumentation private () extends Logging {
 
   private val id = UUID.randomUUID()
   private val shortId = id.toString.take(8)
-  private val prefix = s"[$shortId] "
-
-  // TODO: remove stage
-  var stage: Params = _
-  // TODO: update spark.ml to use new Instrumentation APIs and remove this constructor
-  private def this(estimator: Estimator[_], dataset: RDD[_]) = {
-    this()
-    logPipelineStage(estimator)
-    logDataset(dataset)
-  }
+  private[util] val prefix = s"[$shortId] "
 
   /**
    * Log some info about the pipeline stage being fit.
    */
   def logPipelineStage(stage: PipelineStage): Unit = {
-    this.stage = stage
     // estimator.getClass.getSimpleName can cause Malformed class name error,
     // call safer `Utils.getSimpleName` instead
     val className = Utils.getSimpleName(stage.getClass)
@@ -119,13 +109,6 @@ private[spark] class Instrumentation extends Logging {
     logInfo(compact(render(map2jvalue(pairs.toMap))))
   }
 
-  // TODO: remove this
-  def logParams(params: Param[_]*): Unit = {
-    require(stage != null, "`logStageParams` must be called before `logParams` (or an instance of" +
-      " Params must be provided explicitly).")
-    logParams(stage, params: _*)
-  }
-
   def logNumFeatures(num: Long): Unit = {
     logNamedValue(Instrumentation.loggerTags.numFeatures, num)
   }
@@ -166,14 +149,9 @@ private[spark] class Instrumentation extends Logging {
   }
 
 
-  // TODO: Remove this (possibly replace with logModel?)
   /**
    * Logs the successful completion of the training session.
    */
-  def logSuccess(model: Model[_]): Unit = {
-    logInfo(s"training finished")
-  }
-
   def logSuccess(): Unit = {
     logInfo("training finished")
   }
@@ -200,22 +178,6 @@ private[spark] object Instrumentation {
     val varianceOfLabels = "varianceOfLabels"
   }
 
-  // TODO: Remove these
-  /**
-   * Creates an instrumentation object for a training session.
-   */
-  def create(estimator: Estimator[_], dataset: Dataset[_]): Instrumentation = {
-    create(estimator, dataset.rdd)
-  }
-
-  /**
-   * Creates an instrumentation object for a training session.
-   */
-  def create(estimator: Estimator[_], dataset: RDD[_]): Instrumentation = {
-    new Instrumentation(estimator, dataset)
-  }
-  // end remove
-
   def instrumented[T](body: (Instrumentation => T)): T = {
     val instr = new Instrumentation()
     Try(body(instr)) match {
@@ -268,8 +230,7 @@ private[spark] object OptionalInstrumentation {
    * Creates an `OptionalInstrumentation` object from an existing `Instrumentation` object.
    */
   def create(instr: Instrumentation): OptionalInstrumentation = {
-    new OptionalInstrumentation(Some(instr),
-      instr.stage.getClass.getName.stripSuffix("$"))
+    new OptionalInstrumentation(Some(instr), instr.prefix)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org