You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2018/07/20 19:13:37 UTC
spark git commit: [SPARK-24852][ML] Update spark.ml to use
Instrumentation.instrumented.
Repository: spark
Updated Branches:
refs/heads/master 244bcff19 -> 3cb1b5780
[SPARK-24852][ML] Update spark.ml to use Instrumentation.instrumented.
## What changes were proposed in this pull request?
Followup for #21719.
Update spark.ml training code to fully wrap instrumented methods and remove old instrumentation APIs.
## How was this patch tested?
existing tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Bago Amirbekian <ba...@databricks.com>
Closes #21799 from MrBago/new-instrumentation-apis2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cb1b578
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cb1b578
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cb1b578
Branch: refs/heads/master
Commit: 3cb1b57809d0b4a93223669f5c10cea8fc53eff6
Parents: 244bcff
Author: Bago Amirbekian <ba...@databricks.com>
Authored: Fri Jul 20 12:13:15 2018 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Fri Jul 20 12:13:15 2018 -0700
----------------------------------------------------------------------
.../classification/DecisionTreeClassifier.scala | 24 +++++-----
.../spark/ml/classification/GBTClassifier.scala | 14 +++---
.../spark/ml/classification/LinearSVC.scala | 12 ++---
.../ml/classification/LogisticRegression.scala | 2 +-
.../MultilayerPerceptronClassifier.scala | 14 +++---
.../spark/ml/classification/NaiveBayes.scala | 12 ++---
.../spark/ml/classification/OneVsRest.scala | 9 ++--
.../classification/RandomForestClassifier.scala | 13 +++---
.../spark/ml/clustering/BisectingKMeans.scala | 12 ++---
.../spark/ml/clustering/GaussianMixture.scala | 12 ++---
.../org/apache/spark/ml/clustering/KMeans.scala | 9 ++--
.../org/apache/spark/ml/clustering/LDA.scala | 12 ++---
.../org/apache/spark/ml/fpm/FPGrowth.scala | 12 ++---
.../apache/spark/ml/recommendation/ALS.scala | 9 ++--
.../ml/regression/AFTSurvivalRegression.scala | 13 +++---
.../ml/regression/DecisionTreeRegressor.scala | 24 +++++-----
.../spark/ml/regression/GBTRegressor.scala | 12 ++---
.../GeneralizedLinearRegression.scala | 12 ++---
.../ml/regression/IsotonicRegression.scala | 12 ++---
.../spark/ml/regression/LinearRegression.scala | 21 ++++-----
.../ml/regression/RandomForestRegressor.scala | 14 +++---
.../apache/spark/ml/tuning/CrossValidator.scala | 9 ++--
.../spark/ml/tuning/TrainValidationSplit.scala | 9 ++--
.../apache/spark/ml/util/Instrumentation.scala | 47 ++------------------
24 files changed, 153 insertions(+), 186 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
index c9786f1..8a57bfc 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
@@ -29,6 +29,7 @@ import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.DecisionTreeModelReadWrite._
import org.apache.spark.ml.tree.impl.RandomForest
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy}
import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel}
import org.apache.spark.rdd.RDD
@@ -96,8 +97,10 @@ class DecisionTreeClassifier @Since("1.4.0") (
@Since("1.6.0")
override def setSeed(value: Long): this.type = set(seed, value)
- override protected def train(dataset: Dataset[_]): DecisionTreeClassificationModel = {
- val instr = Instrumentation.create(this, dataset)
+ override protected def train(
+ dataset: Dataset[_]): DecisionTreeClassificationModel = instrumented { instr =>
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
val categoricalFeatures: Map[Int, Int] =
MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
val numClasses: Int = getNumClasses(dataset)
@@ -112,30 +115,27 @@ class DecisionTreeClassifier @Since("1.4.0") (
val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset, numClasses)
val strategy = getOldStrategy(categoricalFeatures, numClasses)
- instr.logParams(maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB,
+ instr.logParams(this, maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB,
cacheNodeIds, checkpointInterval, impurity, seed)
val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all",
seed = $(seed), instr = Some(instr), parentUID = Some(uid))
- val m = trees.head.asInstanceOf[DecisionTreeClassificationModel]
- instr.logSuccess(m)
- m
+ trees.head.asInstanceOf[DecisionTreeClassificationModel]
}
/** (private[ml]) Train a decision tree on an RDD */
private[ml] def train(data: RDD[LabeledPoint],
- oldStrategy: OldStrategy): DecisionTreeClassificationModel = {
- val instr = Instrumentation.create(this, data)
- instr.logParams(maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB,
+ oldStrategy: OldStrategy): DecisionTreeClassificationModel = instrumented { instr =>
+ instr.logPipelineStage(this)
+ instr.logDataset(data)
+ instr.logParams(this, maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB,
cacheNodeIds, checkpointInterval, impurity, seed)
val trees = RandomForest.run(data, oldStrategy, numTrees = 1, featureSubsetStrategy = "all",
seed = 0L, instr = Some(instr), parentUID = Some(uid))
- val m = trees.head.asInstanceOf[DecisionTreeClassificationModel]
- instr.logSuccess(m)
- m
+ trees.head.asInstanceOf[DecisionTreeClassificationModel]
}
/** (private[ml]) Create a Strategy instance to use with the old API. */
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
index 337133a..33acd99 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
@@ -31,9 +31,9 @@ import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.impl.GradientBoostedTrees
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTModel}
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
@@ -152,7 +152,8 @@ class GBTClassifier @Since("1.4.0") (
set(validationIndicatorCol, value)
}
- override protected def train(dataset: Dataset[_]): GBTClassificationModel = {
+ override protected def train(
+ dataset: Dataset[_]): GBTClassificationModel = instrumented { instr =>
val categoricalFeatures: Map[Int, Int] =
MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
@@ -189,8 +190,9 @@ class GBTClassifier @Since("1.4.0") (
s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}")
}
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType,
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, labelCol, featuresCol, predictionCol, impurity, lossType,
maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode,
seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval, featureSubsetStrategy,
validationIndicatorCol)
@@ -204,9 +206,7 @@ class GBTClassifier @Since("1.4.0") (
GradientBoostedTrees.run(trainDataset, boostingStrategy, $(seed), $(featureSubsetStrategy))
}
- val m = new GBTClassificationModel(uid, baseLearners, learnerWeights, numFeatures)
- instr.logSuccess(m)
- m
+ new GBTClassificationModel(uid, baseLearners, learnerWeights, numFeatures)
}
@Since("1.4.1")
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
index 38eb045..20f9366 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala
@@ -33,6 +33,7 @@ import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.rdd.RDD
@@ -162,7 +163,7 @@ class LinearSVC @Since("2.2.0") (
@Since("2.2.0")
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)
- override protected def train(dataset: Dataset[_]): LinearSVCModel = {
+ override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr =>
val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
val instances: RDD[Instance] =
dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map {
@@ -170,8 +171,9 @@ class LinearSVC @Since("2.2.0") (
Instance(label, weight, features)
}
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(regParam, maxIter, fitIntercept, tol, standardization, threshold,
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, regParam, maxIter, fitIntercept, tol, standardization, threshold,
aggregationDepth)
val (summarizer, labelSummarizer) = {
@@ -276,9 +278,7 @@ class LinearSVC @Since("2.2.0") (
(Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result())
}
- val model = copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
- instr.logSuccess(model)
- model
+ copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 25fb9c8..af651b0 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -503,7 +503,7 @@ class LogisticRegression @Since("1.2.0") (
instr.logPipelineStage(this)
instr.logDataset(dataset)
- instr.logParams(regParam, elasticNetParam, standardization, threshold,
+ instr.logParams(this, regParam, elasticNetParam, standardization, threshold,
maxIter, tol, fitIntercept)
val (summarizer, labelSummarizer) = {
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
index 57ba47e..65e3b2d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
@@ -28,6 +28,7 @@ import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.sql.Dataset
/** Params for Multilayer Perceptron. */
@@ -230,9 +231,11 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
* @param dataset Training dataset
* @return Fitted model
*/
- override protected def train(dataset: Dataset[_]): MultilayerPerceptronClassificationModel = {
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(labelCol, featuresCol, predictionCol, layers, maxIter, tol,
+ override protected def train(
+ dataset: Dataset[_]): MultilayerPerceptronClassificationModel = instrumented { instr =>
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, labelCol, featuresCol, predictionCol, layers, maxIter, tol,
blockSize, solver, stepSize, seed)
val myLayers = $(layers)
@@ -264,10 +267,7 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
}
trainer.setStackSize($(blockSize))
val mlpModel = trainer.train(data)
- val model = new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights)
-
- instr.logSuccess(model)
- model
+ new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
index 1dde18d..f65d397 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
@@ -25,6 +25,7 @@ import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared.HasWeightCol
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.functions.{col, lit}
@@ -125,8 +126,9 @@ class NaiveBayes @Since("1.5.0") (
*/
private[spark] def trainWithLabelCheck(
dataset: Dataset[_],
- positiveLabel: Boolean): NaiveBayesModel = {
- val instr = Instrumentation.create(this, dataset)
+ positiveLabel: Boolean): NaiveBayesModel = instrumented { instr =>
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
if (positiveLabel && isDefined(thresholds)) {
val numClasses = getNumClasses(dataset)
instr.logNumClasses(numClasses)
@@ -148,7 +150,7 @@ class NaiveBayes @Since("1.5.0") (
}
}
- instr.logParams(labelCol, featuresCol, weightCol, predictionCol, rawPredictionCol,
+ instr.logParams(this, labelCol, featuresCol, weightCol, predictionCol, rawPredictionCol,
probabilityCol, modelType, smoothing, thresholds)
val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
@@ -204,9 +206,7 @@ class NaiveBayes @Since("1.5.0") (
val pi = Vectors.dense(piArray)
val theta = new DenseMatrix(numLabels, numFeatures, thetaArray, true)
- val model = new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
- instr.logSuccess(model)
- model
+ new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
}
@Since("1.5.0")
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
index 3474b61..1835a91 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
@@ -36,6 +36,7 @@ import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params}
import org.apache.spark.ml.param.shared.{HasParallelism, HasWeightCol}
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
@@ -362,11 +363,12 @@ final class OneVsRest @Since("1.4.0") (
}
@Since("2.0.0")
- override def fit(dataset: Dataset[_]): OneVsRestModel = {
+ override def fit(dataset: Dataset[_]): OneVsRestModel = instrumented { instr =>
transformSchema(dataset.schema)
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(labelCol, featuresCol, predictionCol, parallelism, rawPredictionCol)
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, labelCol, featuresCol, predictionCol, parallelism, rawPredictionCol)
instr.logNamedValue("classifier", $(classifier).getClass.getCanonicalName)
// determine number of classes either from metadata if provided, or via computation.
@@ -440,7 +442,6 @@ final class OneVsRest @Since("1.4.0") (
case attr: Attribute => attr
}
val model = new OneVsRestModel(uid, labelAttribute.toMetadata(), models).setParent(this)
- instr.logSuccess(model)
copyValues(model)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala
index 040db3b..94887ac 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala
@@ -28,6 +28,7 @@ import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.impl.RandomForest
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestModel}
import org.apache.spark.rdd.RDD
@@ -115,8 +116,10 @@ class RandomForestClassifier @Since("1.4.0") (
override def setFeatureSubsetStrategy(value: String): this.type =
set(featureSubsetStrategy, value)
- override protected def train(dataset: Dataset[_]): RandomForestClassificationModel = {
- val instr = Instrumentation.create(this, dataset)
+ override protected def train(
+ dataset: Dataset[_]): RandomForestClassificationModel = instrumented { instr =>
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
val categoricalFeatures: Map[Int, Int] =
MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
val numClasses: Int = getNumClasses(dataset)
@@ -131,7 +134,7 @@ class RandomForestClassifier @Since("1.4.0") (
val strategy =
super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.Classification, getOldImpurity)
- instr.logParams(labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol,
+ instr.logParams(this, labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol,
impurity, numTrees, featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain,
minInstancesPerNode, seed, subsamplingRate, thresholds, cacheNodeIds, checkpointInterval)
@@ -140,11 +143,9 @@ class RandomForestClassifier @Since("1.4.0") (
.map(_.asInstanceOf[DecisionTreeClassificationModel])
val numFeatures = oldDataset.first().features.size
- val m = new RandomForestClassificationModel(uid, trees, numFeatures, numClasses)
instr.logNumClasses(numClasses)
instr.logNumFeatures(numFeatures)
- instr.logSuccess(m)
- m
+ new RandomForestClassificationModel(uid, trees, numFeatures, numClasses)
}
@Since("1.4.1")
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
index de56447..48b8c52 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
@@ -26,6 +26,7 @@ import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.clustering.{BisectingKMeans => MLlibBisectingKMeans,
BisectingKMeansModel => MLlibBisectingKMeansModel}
import org.apache.spark.mllib.linalg.VectorImplicits._
@@ -257,12 +258,13 @@ class BisectingKMeans @Since("2.0.0") (
def setDistanceMeasure(value: String): this.type = set(distanceMeasure, value)
@Since("2.0.0")
- override def fit(dataset: Dataset[_]): BisectingKMeansModel = {
+ override def fit(dataset: Dataset[_]): BisectingKMeansModel = instrumented { instr =>
transformSchema(dataset.schema, logging = true)
val rdd = DatasetUtils.columnToOldVector(dataset, getFeaturesCol)
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(featuresCol, predictionCol, k, maxIter, seed,
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, featuresCol, predictionCol, k, maxIter, seed,
minDivisibleClusterSize, distanceMeasure)
val bkm = new MLlibBisectingKMeans()
@@ -275,10 +277,8 @@ class BisectingKMeans @Since("2.0.0") (
val model = copyValues(new BisectingKMeansModel(uid, parentModel).setParent(this))
val summary = new BisectingKMeansSummary(
model.transform(dataset), $(predictionCol), $(featuresCol), $(k), $(maxIter))
- model.setSummary(Some(summary))
instr.logNamedValue("clusterSizes", summary.clusterSizes)
- instr.logSuccess(model)
- model
+ model.setSummary(Some(summary))
}
@Since("2.0.0")
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
index f0707b3..310b03b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
@@ -29,6 +29,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.stat.distribution.MultivariateGaussian
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Matrix => OldMatrix,
Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.rdd.RDD
@@ -335,7 +336,7 @@ class GaussianMixture @Since("2.0.0") (
private val numSamples = 5
@Since("2.0.0")
- override def fit(dataset: Dataset[_]): GaussianMixtureModel = {
+ override def fit(dataset: Dataset[_]): GaussianMixtureModel = instrumented { instr =>
transformSchema(dataset.schema, logging = true)
val sc = dataset.sparkSession.sparkContext
@@ -352,8 +353,9 @@ class GaussianMixture @Since("2.0.0") (
s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of the covariance" +
s" matrix is quadratic in the number of features.")
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(featuresCol, predictionCol, probabilityCol, k, maxIter, seed, tol)
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, featuresCol, predictionCol, probabilityCol, k, maxIter, seed, tol)
instr.logNumFeatures(numFeatures)
val shouldDistributeGaussians = GaussianMixture.shouldDistributeGaussians(
@@ -425,11 +427,9 @@ class GaussianMixture @Since("2.0.0") (
val model = copyValues(new GaussianMixtureModel(uid, weights, gaussianDists)).setParent(this)
val summary = new GaussianMixtureSummary(model.transform(dataset),
$(predictionCol), $(probabilityCol), $(featuresCol), $(k), logLikelihood, iter)
- model.setSummary(Some(summary))
instr.logNamedValue("logLikelihood", logLikelihood)
instr.logNamedValue("clusterSizes", summary.clusterSizes)
- instr.logSuccess(model)
- model
+ model.setSummary(Some(summary))
}
@Since("2.0.0")
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
index 6f4a30d..498310d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
@@ -28,6 +28,7 @@ import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel}
import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.mllib.linalg.VectorImplicits._
@@ -336,7 +337,7 @@ class KMeans @Since("1.5.0") (
def setSeed(value: Long): this.type = set(seed, value)
@Since("2.0.0")
- override def fit(dataset: Dataset[_]): KMeansModel = {
+ override def fit(dataset: Dataset[_]): KMeansModel = instrumented { instr =>
transformSchema(dataset.schema, logging = true)
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
@@ -346,8 +347,9 @@ class KMeans @Since("1.5.0") (
instances.persist(StorageLevel.MEMORY_AND_DISK)
}
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(featuresCol, predictionCol, k, initMode, initSteps, distanceMeasure,
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, featuresCol, predictionCol, k, initMode, initSteps, distanceMeasure,
maxIter, seed, tol)
val algo = new MLlibKMeans()
.setK($(k))
@@ -369,7 +371,6 @@ class KMeans @Since("1.5.0") (
model.setSummary(Some(summary))
instr.logNamedValue("clusterSizes", summary.clusterSizes)
- instr.logSuccess(model)
if (handlePersistence) {
instances.unpersist()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index fed42c9..50867f7 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -32,6 +32,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed}
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel,
EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel,
LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel,
@@ -896,11 +897,12 @@ class LDA @Since("1.6.0") (
override def copy(extra: ParamMap): LDA = defaultCopy(extra)
@Since("2.0.0")
- override def fit(dataset: Dataset[_]): LDAModel = {
+ override def fit(dataset: Dataset[_]): LDAModel = instrumented { instr =>
transformSchema(dataset.schema, logging = true)
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(featuresCol, topicDistributionCol, k, maxIter, subsamplingRate,
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, featuresCol, topicDistributionCol, k, maxIter, subsamplingRate,
checkpointInterval, keepLastCheckpoint, optimizeDocConcentration, topicConcentration,
learningDecay, optimizer, learningOffset, seed)
@@ -923,9 +925,7 @@ class LDA @Since("1.6.0") (
}
instr.logNumFeatures(newModel.vocabSize)
- val model = copyValues(newModel).setParent(this)
- instr.logSuccess(model)
- model
+ copyValues(newModel).setParent(this)
}
@Since("1.6.0")
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
index 9d664b6..85c483c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
@@ -26,6 +26,7 @@ import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.HasPredictionCol
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.fpm.{AssociationRules => MLlibAssociationRules,
FPGrowth => MLlibFPGrowth}
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset
@@ -158,11 +159,12 @@ class FPGrowth @Since("2.2.0") (
genericFit(dataset)
}
- private def genericFit[T: ClassTag](dataset: Dataset[_]): FPGrowthModel = {
+ private def genericFit[T: ClassTag](dataset: Dataset[_]): FPGrowthModel = instrumented { instr =>
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(params: _*)
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, params: _*)
val data = dataset.select($(itemsCol))
val items = data.where(col($(itemsCol)).isNotNull).rdd.map(r => r.getSeq[Any](0).toArray)
val mllibFP = new MLlibFPGrowth().setMinSupport($(minSupport))
@@ -185,9 +187,7 @@ class FPGrowth @Since("2.2.0") (
items.unpersist()
}
- val model = copyValues(new FPGrowthModel(uid, frequentItems)).setParent(this)
- instr.logSuccess(model)
- model
+ copyValues(new FPGrowthModel(uid, frequentItems)).setParent(this)
}
@Since("2.2.0")
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index a23f955..ffe5927 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -39,6 +39,7 @@ import org.apache.spark.ml.linalg.BLAS
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.linalg.CholeskyDecomposition
import org.apache.spark.mllib.optimization.NNLS
import org.apache.spark.rdd.RDD
@@ -654,7 +655,7 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
}
@Since("2.0.0")
- override def fit(dataset: Dataset[_]): ALSModel = {
+ override def fit(dataset: Dataset[_]): ALSModel = instrumented { instr =>
transformSchema(dataset.schema)
import dataset.sparkSession.implicits._
@@ -666,8 +667,9 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
Rating(row.getInt(0), row.getInt(1), row.getFloat(2))
}
- val instr = Instrumentation.create(this, ratings)
- instr.logParams(rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol,
itemCol, ratingCol, predictionCol, maxIter, regParam, nonnegative, checkpointInterval,
seed, intermediateStorageLevel, finalStorageLevel)
@@ -681,7 +683,6 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
val userDF = userFactors.toDF("id", "features")
val itemDF = itemFactors.toDF("id", "features")
val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this)
- instr.logSuccess(model)
copyValues(model)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
index e27a96e..3cd0706 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
@@ -32,6 +32,7 @@ import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.mllib.util.MLUtils
@@ -210,7 +211,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
}
@Since("2.0.0")
- override def fit(dataset: Dataset[_]): AFTSurvivalRegressionModel = {
+ override def fit(dataset: Dataset[_]): AFTSurvivalRegressionModel = instrumented { instr =>
transformSchema(dataset.schema, logging = true)
val instances = extractAFTPoints(dataset)
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
@@ -229,8 +230,9 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
val numFeatures = featuresStd.size
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(labelCol, featuresCol, censorCol, predictionCol, quantilesCol,
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, labelCol, featuresCol, censorCol, predictionCol, quantilesCol,
fitIntercept, maxIter, tol, aggregationDepth)
instr.logNamedValue("quantileProbabilities.size", $(quantileProbabilities).length)
instr.logNumFeatures(numFeatures)
@@ -284,10 +286,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
val coefficients = Vectors.dense(rawCoefficients)
val intercept = parameters(1)
val scale = math.exp(parameters(0))
- val model = copyValues(new AFTSurvivalRegressionModel(uid, coefficients,
- intercept, scale).setParent(this))
- instr.logSuccess(model)
- model
+ copyValues(new AFTSurvivalRegressionModel(uid, coefficients, intercept, scale).setParent(this))
}
@Since("1.6.0")
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
index 8bcf079..018290f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
@@ -30,6 +30,7 @@ import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.DecisionTreeModelReadWrite._
import org.apache.spark.ml.tree.impl.RandomForest
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy}
import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel}
import org.apache.spark.rdd.RDD
@@ -99,37 +100,36 @@ class DecisionTreeRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S
@Since("2.0.0")
def setVarianceCol(value: String): this.type = set(varianceCol, value)
- override protected def train(dataset: Dataset[_]): DecisionTreeRegressionModel = {
+ override protected def train(
+ dataset: Dataset[_]): DecisionTreeRegressionModel = instrumented { instr =>
val categoricalFeatures: Map[Int, Int] =
MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset)
val strategy = getOldStrategy(categoricalFeatures)
- val instr = Instrumentation.create(this, oldDataset)
- instr.logParams(params: _*)
+ instr.logPipelineStage(this)
+ instr.logDataset(oldDataset)
+ instr.logParams(this, params: _*)
val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all",
seed = $(seed), instr = Some(instr), parentUID = Some(uid))
- val m = trees.head.asInstanceOf[DecisionTreeRegressionModel]
- instr.logSuccess(m)
- m
+ trees.head.asInstanceOf[DecisionTreeRegressionModel]
}
/** (private[ml]) Train a decision tree on an RDD */
private[ml] def train(
data: RDD[LabeledPoint],
oldStrategy: OldStrategy,
- featureSubsetStrategy: String): DecisionTreeRegressionModel = {
- val instr = Instrumentation.create(this, data)
- instr.logParams(params: _*)
+ featureSubsetStrategy: String): DecisionTreeRegressionModel = instrumented { instr =>
+ instr.logPipelineStage(this)
+ instr.logDataset(data)
+ instr.logParams(this, params: _*)
val trees = RandomForest.run(data, oldStrategy, numTrees = 1, featureSubsetStrategy,
seed = $(seed), instr = Some(instr), parentUID = Some(uid))
- val m = trees.head.asInstanceOf[DecisionTreeRegressionModel]
- instr.logSuccess(m)
- m
+ trees.head.asInstanceOf[DecisionTreeRegressionModel]
}
/** (private[ml]) Create a Strategy instance to use with the old API. */
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala
index eb8b3c0..3305881 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala
@@ -31,6 +31,7 @@ import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.impl.GradientBoostedTrees
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTModel}
import org.apache.spark.rdd.RDD
@@ -151,7 +152,7 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String)
set(validationIndicatorCol, value)
}
- override protected def train(dataset: Dataset[_]): GBTRegressionModel = {
+ override protected def train(dataset: Dataset[_]): GBTRegressionModel = instrumented { instr =>
val categoricalFeatures: Map[Int, Int] =
MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
@@ -168,8 +169,9 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String)
val numFeatures = trainDataset.first().features.size
val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Regression)
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType,
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, labelCol, featuresCol, predictionCol, impurity, lossType,
maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode,
seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval, featureSubsetStrategy)
instr.logNumFeatures(numFeatures)
@@ -181,9 +183,7 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String)
GradientBoostedTrees.run(trainDataset, boostingStrategy,
$(seed), $(featureSubsetStrategy))
}
- val m = new GBTRegressionModel(uid, baseLearners, learnerWeights, numFeatures)
- instr.logSuccess(m)
- m
+ new GBTRegressionModel(uid, baseLearners, learnerWeights, numFeatures)
}
@Since("1.4.0")
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
index 143c8a3..20878b6 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
@@ -34,6 +34,7 @@ import org.apache.spark.ml.optim._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
@@ -373,13 +374,15 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
@Since("2.0.0")
def setLinkPredictionCol(value: String): this.type = set(linkPredictionCol, value)
- override protected def train(dataset: Dataset[_]): GeneralizedLinearRegressionModel = {
+ override protected def train(
+ dataset: Dataset[_]): GeneralizedLinearRegressionModel = instrumented { instr =>
val familyAndLink = FamilyAndLink(this)
val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(labelCol, featuresCol, weightCol, offsetCol, predictionCol, linkPredictionCol,
- family, solver, fitIntercept, link, maxIter, regParam, tol)
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, labelCol, featuresCol, weightCol, offsetCol, predictionCol,
+ linkPredictionCol, family, solver, fitIntercept, link, maxIter, regParam, tol)
instr.logNumFeatures(numFeatures)
if (numFeatures > WeightedLeastSquares.MAX_NUM_FEATURES) {
@@ -431,7 +434,6 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
model.setSummary(Some(trainingSummary))
}
- instr.logSuccess(model)
model
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
index b046897..8b9233d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
@@ -27,6 +27,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.regression.IsotonicRegressionModel.IsotonicRegressionModelWriter
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.regression.{IsotonicRegression => MLlibIsotonicRegression}
import org.apache.spark.mllib.regression.{IsotonicRegressionModel => MLlibIsotonicRegressionModel}
import org.apache.spark.rdd.RDD
@@ -161,15 +162,16 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri
override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra)
@Since("2.0.0")
- override def fit(dataset: Dataset[_]): IsotonicRegressionModel = {
+ override def fit(dataset: Dataset[_]): IsotonicRegressionModel = instrumented { instr =>
transformSchema(dataset.schema, logging = true)
// Extract columns from data. If dataset is persisted, do not persist oldDataset.
val instances = extractWeightedLabeledPoints(dataset)
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(labelCol, featuresCol, weightCol, predictionCol, featureIndex, isotonic)
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, labelCol, featuresCol, weightCol, predictionCol, featureIndex, isotonic)
instr.logNumFeatures(1)
val isotonicRegression = new MLlibIsotonicRegression().setIsotonic($(isotonic))
@@ -177,9 +179,7 @@ class IsotonicRegression @Since("1.5.0") (@Since("1.5.0") override val uid: Stri
if (handlePersistence) instances.unpersist()
- val model = copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this))
- instr.logSuccess(model)
- model
+ copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this))
}
@Since("1.5.0")
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index c45ade9..ce6c12c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -37,6 +37,7 @@ import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.mllib.regression.{LinearRegressionModel => OldLinearRegressionModel}
@@ -315,7 +316,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
def setEpsilon(value: Double): this.type = set(epsilon, value)
setDefault(epsilon -> 1.35)
- override protected def train(dataset: Dataset[_]): LinearRegressionModel = {
+ override protected def train(dataset: Dataset[_]): LinearRegressionModel = instrumented { instr =>
// Extract the number of features before deciding optimization solver.
val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size
val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
@@ -326,9 +327,11 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
Instance(label, weight, features)
}
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(labelCol, featuresCol, weightCol, predictionCol, solver, tol, elasticNetParam,
- fitIntercept, maxIter, regParam, standardization, aggregationDepth, loss, epsilon)
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, labelCol, featuresCol, weightCol, predictionCol, solver, tol,
+ elasticNetParam, fitIntercept, maxIter, regParam, standardization, aggregationDepth, loss,
+ epsilon)
instr.logNumFeatures(numFeatures)
if ($(loss) == SquaredError && (($(solver) == Auto &&
@@ -353,9 +356,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
model.diagInvAtWA.toArray,
model.objectiveHistory)
- lrModel.setSummary(Some(trainingSummary))
- instr.logSuccess(lrModel)
- return lrModel
+ return lrModel.setSummary(Some(trainingSummary))
}
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
@@ -415,9 +416,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
Array(0D),
Array(0D))
- model.setSummary(Some(trainingSummary))
- instr.logSuccess(model)
- return model
+ return model.setSummary(Some(trainingSummary))
} else {
require($(regParam) == 0.0, "The standard deviation of the label is zero. " +
"Model cannot be regularized.")
@@ -596,8 +595,6 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
objectiveHistory)
model.setSummary(Some(trainingSummary))
- instr.logSuccess(model)
- model
}
@Since("1.4.0")
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala
index 4509f85..3587572 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala
@@ -29,6 +29,7 @@ import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.impl.RandomForest
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestModel}
import org.apache.spark.rdd.RDD
@@ -114,15 +115,17 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S
override def setFeatureSubsetStrategy(value: String): this.type =
set(featureSubsetStrategy, value)
- override protected def train(dataset: Dataset[_]): RandomForestRegressionModel = {
+ override protected def train(
+ dataset: Dataset[_]): RandomForestRegressionModel = instrumented { instr =>
val categoricalFeatures: Map[Int, Int] =
MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset)
val strategy =
super.getOldStrategy(categoricalFeatures, numClasses = 0, OldAlgo.Regression, getOldImpurity)
- val instr = Instrumentation.create(this, oldDataset)
- instr.logParams(labelCol, featuresCol, predictionCol, impurity, numTrees,
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, labelCol, featuresCol, predictionCol, impurity, numTrees,
featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain,
minInstancesPerNode, seed, subsamplingRate, cacheNodeIds, checkpointInterval)
@@ -131,9 +134,8 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S
.map(_.asInstanceOf[DecisionTreeRegressionModel])
val numFeatures = oldDataset.first().features.size
- val m = new RandomForestRegressionModel(uid, trees, numFeatures)
- instr.logSuccess(m)
- m
+ instr.logNamedValue(Instrumentation.loggerTags.numFeatures, numFeatures)
+ new RandomForestRegressionModel(uid, trees, numFeatures)
}
@Since("1.4.0")
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
index f327f37..e60a14f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala
@@ -33,6 +33,7 @@ import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared.{HasCollectSubModels, HasParallelism}
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types.StructType
@@ -118,7 +119,7 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
def setCollectSubModels(value: Boolean): this.type = set(collectSubModels, value)
@Since("2.0.0")
- override def fit(dataset: Dataset[_]): CrossValidatorModel = {
+ override def fit(dataset: Dataset[_]): CrossValidatorModel = instrumented { instr =>
val schema = dataset.schema
transformSchema(schema, logging = true)
val sparkSession = dataset.sparkSession
@@ -129,8 +130,9 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
// Create execution context based on $(parallelism)
val executionContext = getExecutionContext
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(numFolds, seed, parallelism)
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, numFolds, seed, parallelism)
logTuningParams(instr)
val collectSubModelsParam = $(collectSubModels)
@@ -176,7 +178,6 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
instr.logInfo(s"Best set of parameters:\n${epm(bestIndex)}")
instr.logInfo(s"Best cross-validation metric: $bestMetric.")
val bestModel = est.fit(dataset, epm(bestIndex)).asInstanceOf[Model[_]]
- instr.logSuccess(bestModel)
copyValues(new CrossValidatorModel(uid, bestModel, metrics)
.setSubModels(subModels).setParent(this))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
index 14d6a69..8b25119 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala
@@ -34,6 +34,7 @@ import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared.{HasCollectSubModels, HasParallelism}
import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ThreadUtils
@@ -117,7 +118,7 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St
def setCollectSubModels(value: Boolean): this.type = set(collectSubModels, value)
@Since("2.0.0")
- override def fit(dataset: Dataset[_]): TrainValidationSplitModel = {
+ override def fit(dataset: Dataset[_]): TrainValidationSplitModel = instrumented { instr =>
val schema = dataset.schema
transformSchema(schema, logging = true)
val est = $(estimator)
@@ -127,8 +128,9 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St
// Create execution context based on $(parallelism)
val executionContext = getExecutionContext
- val instr = Instrumentation.create(this, dataset)
- instr.logParams(trainRatio, seed, parallelism)
+ instr.logPipelineStage(this)
+ instr.logDataset(dataset)
+ instr.logParams(this, trainRatio, seed, parallelism)
logTuningParams(instr)
val Array(trainingDataset, validationDataset) =
@@ -172,7 +174,6 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St
instr.logInfo(s"Best set of parameters:\n${epm(bestIndex)}")
instr.logInfo(s"Best train validation split metric: $bestMetric.")
val bestModel = est.fit(dataset, epm(bestIndex)).asInstanceOf[Model[_]]
- instr.logSuccess(bestModel)
copyValues(new TrainValidationSplitModel(uid, bestModel, metrics)
.setSubModels(subModels).setParent(this))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3cb1b578/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
index 2e43a9e..4965491 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
@@ -27,7 +27,7 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.internal.Logging
-import org.apache.spark.ml.{Estimator, Model, PipelineStage}
+import org.apache.spark.ml.PipelineStage
import org.apache.spark.ml.param.{Param, Params}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
@@ -37,26 +37,16 @@ import org.apache.spark.util.Utils
* A small wrapper that defines a training session for an estimator, and some methods to log
* useful information during this session.
*/
-private[spark] class Instrumentation extends Logging {
+private[spark] class Instrumentation private () extends Logging {
private val id = UUID.randomUUID()
private val shortId = id.toString.take(8)
- private val prefix = s"[$shortId] "
-
- // TODO: remove stage
- var stage: Params = _
- // TODO: update spark.ml to use new Instrumentation APIs and remove this constructor
- private def this(estimator: Estimator[_], dataset: RDD[_]) = {
- this()
- logPipelineStage(estimator)
- logDataset(dataset)
- }
+ private[util] val prefix = s"[$shortId] "
/**
* Log some info about the pipeline stage being fit.
*/
def logPipelineStage(stage: PipelineStage): Unit = {
- this.stage = stage
// estimator.getClass.getSimpleName can cause Malformed class name error,
// call safer `Utils.getSimpleName` instead
val className = Utils.getSimpleName(stage.getClass)
@@ -119,13 +109,6 @@ private[spark] class Instrumentation extends Logging {
logInfo(compact(render(map2jvalue(pairs.toMap))))
}
- // TODO: remove this
- def logParams(params: Param[_]*): Unit = {
- require(stage != null, "`logStageParams` must be called before `logParams` (or an instance of" +
- " Params must be provided explicitly).")
- logParams(stage, params: _*)
- }
-
def logNumFeatures(num: Long): Unit = {
logNamedValue(Instrumentation.loggerTags.numFeatures, num)
}
@@ -166,14 +149,9 @@ private[spark] class Instrumentation extends Logging {
}
- // TODO: Remove this (possibly replace with logModel?)
/**
* Logs the successful completion of the training session.
*/
- def logSuccess(model: Model[_]): Unit = {
- logInfo(s"training finished")
- }
-
def logSuccess(): Unit = {
logInfo("training finished")
}
@@ -200,22 +178,6 @@ private[spark] object Instrumentation {
val varianceOfLabels = "varianceOfLabels"
}
- // TODO: Remove these
- /**
- * Creates an instrumentation object for a training session.
- */
- def create(estimator: Estimator[_], dataset: Dataset[_]): Instrumentation = {
- create(estimator, dataset.rdd)
- }
-
- /**
- * Creates an instrumentation object for a training session.
- */
- def create(estimator: Estimator[_], dataset: RDD[_]): Instrumentation = {
- new Instrumentation(estimator, dataset)
- }
- // end remove
-
def instrumented[T](body: (Instrumentation => T)): T = {
val instr = new Instrumentation()
Try(body(instr)) match {
@@ -268,8 +230,7 @@ private[spark] object OptionalInstrumentation {
* Creates an `OptionalInstrumentation` object from an existing `Instrumentation` object.
*/
def create(instr: Instrumentation): OptionalInstrumentation = {
- new OptionalInstrumentation(Some(instr),
- instr.stage.getClass.getName.stripSuffix("$"))
+ new OptionalInstrumentation(Some(instr), instr.prefix)
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org