You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jk...@apache.org on 2018/04/24 18:02:31 UTC
spark git commit: [SPARK-23990][ML] Instruments logging improvements
- ML regression package
Repository: spark
Updated Branches:
refs/heads/master 83013752e -> 379bffa05
[SPARK-23990][ML] Instruments logging improvements - ML regression package
## What changes were proposed in this pull request?
Instruments logging improvements - ML regression package
I add an `OptionalInstrument` class which used in `WeightLeastSquares` and `IterativelyReweightedLeastSquares`.
## How was this patch tested?
N/A
Author: WeichenXu <we...@databricks.com>
Closes #21078 from WeichenXu123/inst_reg.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/379bffa0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/379bffa0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/379bffa0
Branch: refs/heads/master
Commit: 379bffa0525a4343f8c10e51ed192031922f9874
Parents: 8301375
Author: WeichenXu <we...@databricks.com>
Authored: Tue Apr 24 11:02:22 2018 -0700
Committer: Joseph K. Bradley <jo...@databricks.com>
Committed: Tue Apr 24 11:02:22 2018 -0700
----------------------------------------------------------------------
.../ml/classification/LogisticRegression.scala | 4 +-
.../IterativelyReweightedLeastSquares.scala | 18 ++++--
.../spark/ml/optim/WeightedLeastSquares.scala | 32 +++++----
.../ml/regression/AFTSurvivalRegression.scala | 2 +-
.../GeneralizedLinearRegression.scala | 14 ++--
.../spark/ml/regression/LinearRegression.scala | 22 ++++---
.../spark/ml/tree/impl/RandomForest.scala | 2 +
.../apache/spark/ml/util/Instrumentation.scala | 68 +++++++++++++++++++-
8 files changed, 125 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/379bffa0/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index e426263..06ca37b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -500,7 +500,7 @@ class LogisticRegression @Since("1.2.0") (
if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
- val instr = Instrumentation.create(this, instances)
+ val instr = Instrumentation.create(this, dataset)
instr.logParams(regParam, elasticNetParam, standardization, threshold,
maxIter, tol, fitIntercept)
@@ -816,7 +816,7 @@ class LogisticRegression @Since("1.2.0") (
if (state == null) {
val msg = s"${optimizer.getClass.getName} failed."
- logError(msg)
+ instr.logError(msg)
throw new SparkException(msg)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/379bffa0/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala
index 6961b45..572b8cf 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala
@@ -17,9 +17,9 @@
package org.apache.spark.ml.optim
-import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.{Instance, OffsetInstance}
import org.apache.spark.ml.linalg._
+import org.apache.spark.ml.util.OptionalInstrumentation
import org.apache.spark.rdd.RDD
/**
@@ -61,9 +61,12 @@ private[ml] class IterativelyReweightedLeastSquares(
val fitIntercept: Boolean,
val regParam: Double,
val maxIter: Int,
- val tol: Double) extends Logging with Serializable {
+ val tol: Double) extends Serializable {
- def fit(instances: RDD[OffsetInstance]): IterativelyReweightedLeastSquaresModel = {
+ def fit(
+ instances: RDD[OffsetInstance],
+ instr: OptionalInstrumentation = OptionalInstrumentation.create(
+ classOf[IterativelyReweightedLeastSquares])): IterativelyReweightedLeastSquaresModel = {
var converged = false
var iter = 0
@@ -83,7 +86,8 @@ private[ml] class IterativelyReweightedLeastSquares(
// Estimate new model
model = new WeightedLeastSquares(fitIntercept, regParam, elasticNetParam = 0.0,
- standardizeFeatures = false, standardizeLabel = false).fit(newInstances)
+ standardizeFeatures = false, standardizeLabel = false)
+ .fit(newInstances, instr = instr)
// Check convergence
val oldCoefficients = oldModel.coefficients
@@ -96,14 +100,14 @@ private[ml] class IterativelyReweightedLeastSquares(
if (maxTol < tol) {
converged = true
- logInfo(s"IRLS converged in $iter iterations.")
+ instr.logInfo(s"IRLS converged in $iter iterations.")
}
- logInfo(s"Iteration $iter : relative tolerance = $maxTol")
+ instr.logInfo(s"Iteration $iter : relative tolerance = $maxTol")
iter = iter + 1
if (iter == maxIter) {
- logInfo(s"IRLS reached the max number of iterations: $maxIter.")
+ instr.logInfo(s"IRLS reached the max number of iterations: $maxIter.")
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/379bffa0/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala
index c5c9c8e..1b7c15f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala
@@ -17,9 +17,9 @@
package org.apache.spark.ml.optim
-import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg._
+import org.apache.spark.ml.util.OptionalInstrumentation
import org.apache.spark.rdd.RDD
/**
@@ -81,13 +81,11 @@ private[ml] class WeightedLeastSquares(
val standardizeLabel: Boolean,
val solverType: WeightedLeastSquares.Solver = WeightedLeastSquares.Auto,
val maxIter: Int = 100,
- val tol: Double = 1e-6) extends Logging with Serializable {
+ val tol: Double = 1e-6
+ ) extends Serializable {
import WeightedLeastSquares._
require(regParam >= 0.0, s"regParam cannot be negative: $regParam")
- if (regParam == 0.0) {
- logWarning("regParam is zero, which might cause numerical instability and overfitting.")
- }
require(elasticNetParam >= 0.0 && elasticNetParam <= 1.0,
s"elasticNetParam must be in [0, 1]: $elasticNetParam")
require(maxIter >= 0, s"maxIter must be a positive integer: $maxIter")
@@ -96,10 +94,17 @@ private[ml] class WeightedLeastSquares(
/**
* Creates a [[WeightedLeastSquaresModel]] from an RDD of [[Instance]]s.
*/
- def fit(instances: RDD[Instance]): WeightedLeastSquaresModel = {
+ def fit(
+ instances: RDD[Instance],
+ instr: OptionalInstrumentation = OptionalInstrumentation.create(classOf[WeightedLeastSquares])
+ ): WeightedLeastSquaresModel = {
+ if (regParam == 0.0) {
+ instr.logWarning("regParam is zero, which might cause numerical instability and overfitting.")
+ }
+
val summary = instances.treeAggregate(new Aggregator)(_.add(_), _.merge(_))
summary.validate()
- logInfo(s"Number of instances: ${summary.count}.")
+ instr.logInfo(s"Number of instances: ${summary.count}.")
val k = if (fitIntercept) summary.k + 1 else summary.k
val numFeatures = summary.k
val triK = summary.triK
@@ -114,11 +119,12 @@ private[ml] class WeightedLeastSquares(
if (rawBStd == 0) {
if (fitIntercept || rawBBar == 0.0) {
if (rawBBar == 0.0) {
- logWarning(s"Mean and standard deviation of the label are zero, so the coefficients " +
- s"and the intercept will all be zero; as a result, training is not needed.")
+ instr.logWarning(s"Mean and standard deviation of the label are zero, so the " +
+ s"coefficients and the intercept will all be zero; as a result, training is not " +
+ s"needed.")
} else {
- logWarning(s"The standard deviation of the label is zero, so the coefficients will be " +
- s"zeros and the intercept will be the mean of the label; as a result, " +
+ instr.logWarning(s"The standard deviation of the label is zero, so the coefficients " +
+ s"will be zeros and the intercept will be the mean of the label; as a result, " +
s"training is not needed.")
}
val coefficients = new DenseVector(Array.ofDim(numFeatures))
@@ -128,7 +134,7 @@ private[ml] class WeightedLeastSquares(
} else {
require(!(regParam > 0.0 && standardizeLabel), "The standard deviation of the label is " +
"zero. Model cannot be regularized with standardization=true")
- logWarning(s"The standard deviation of the label is zero. Consider setting " +
+ instr.logWarning(s"The standard deviation of the label is zero. Consider setting " +
s"fitIntercept=true.")
}
}
@@ -256,7 +262,7 @@ private[ml] class WeightedLeastSquares(
// if Auto solver is used and Cholesky fails due to singular AtA, then fall back to
// Quasi-Newton solver.
case _: SingularMatrixException if solverType == WeightedLeastSquares.Auto =>
- logWarning("Cholesky solver failed due to singular covariance matrix. " +
+ instr.logWarning("Cholesky solver failed due to singular covariance matrix. " +
"Retrying with Quasi-Newton solver.")
// ab and aa were modified in place, so reconstruct them
val _aa = getAtA(aaBarValues, aBarValues)
http://git-wip-us.apache.org/repos/asf/spark/blob/379bffa0/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
index 7c6ec2a..e27a96e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
@@ -237,7 +237,7 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
featuresStd(i) == 0.0 && featuresSummarizer.mean(i) != 0.0 }) {
- logWarning("Fitting AFTSurvivalRegressionModel without intercept on dataset with " +
+ instr.logWarning("Fitting AFTSurvivalRegressionModel without intercept on dataset with " +
"constant nonzero column, Spark MLlib outputs zero coefficients for constant nonzero " +
"columns. This behavior is different from R survival::survreg.")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/379bffa0/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
index e030a40..143c8a3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
@@ -404,7 +404,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
}
val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), elasticNetParam = 0.0,
standardizeFeatures = true, standardizeLabel = true)
- val wlsModel = optimizer.fit(instances)
+ val wlsModel = optimizer.fit(instances, instr = OptionalInstrumentation.create(instr))
val model = copyValues(
new GeneralizedLinearRegressionModel(uid, wlsModel.coefficients, wlsModel.intercept)
.setParent(this))
@@ -418,10 +418,11 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val
OffsetInstance(label, weight, offset, features)
}
// Fit Generalized Linear Model by iteratively reweighted least squares (IRLS).
- val initialModel = familyAndLink.initialize(instances, $(fitIntercept), $(regParam))
+ val initialModel = familyAndLink.initialize(instances, $(fitIntercept), $(regParam),
+ instr = OptionalInstrumentation.create(instr))
val optimizer = new IterativelyReweightedLeastSquares(initialModel,
familyAndLink.reweightFunc, $(fitIntercept), $(regParam), $(maxIter), $(tol))
- val irlsModel = optimizer.fit(instances)
+ val irlsModel = optimizer.fit(instances, instr = OptionalInstrumentation.create(instr))
val model = copyValues(
new GeneralizedLinearRegressionModel(uid, irlsModel.coefficients, irlsModel.intercept)
.setParent(this))
@@ -492,7 +493,10 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine
def initialize(
instances: RDD[OffsetInstance],
fitIntercept: Boolean,
- regParam: Double): WeightedLeastSquaresModel = {
+ regParam: Double,
+ instr: OptionalInstrumentation = OptionalInstrumentation.create(
+ classOf[GeneralizedLinearRegression])
+ ): WeightedLeastSquaresModel = {
val newInstances = instances.map { instance =>
val mu = family.initialize(instance.label, instance.weight)
val eta = predict(mu) - instance.offset
@@ -501,7 +505,7 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine
// TODO: Make standardizeFeatures and standardizeLabel configurable.
val initialModel = new WeightedLeastSquares(fitIntercept, regParam, elasticNetParam = 0.0,
standardizeFeatures = true, standardizeLabel = true)
- .fit(newInstances)
+ .fit(newInstances, instr)
initialModel
}
http://git-wip-us.apache.org/repos/asf/spark/blob/379bffa0/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index f1d9a44..c45ade9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -339,7 +339,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam),
elasticNetParam = $(elasticNetParam), $(standardization), true,
solverType = WeightedLeastSquares.Auto, maxIter = $(maxIter), tol = $(tol))
- val model = optimizer.fit(instances)
+ val model = optimizer.fit(instances, instr = OptionalInstrumentation.create(instr))
// When it is trained by WeightedLeastSquares, training summary does not
// attach returned model.
val lrModel = copyValues(new LinearRegressionModel(uid, model.coefficients, model.intercept))
@@ -378,6 +378,11 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val yMean = ySummarizer.mean(0)
val rawYStd = math.sqrt(ySummarizer.variance(0))
+
+ instr.logNumExamples(ySummarizer.count)
+ instr.logNamedValue(Instrumentation.loggerTags.meanOfLabels, yMean)
+ instr.logNamedValue(Instrumentation.loggerTags.varianceOfLabels, rawYStd)
+
if (rawYStd == 0.0) {
if ($(fitIntercept) || yMean == 0.0) {
// If the rawYStd==0 and fitIntercept==true, then the intercept is yMean with
@@ -385,11 +390,12 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
// Also, if yMean==0 and rawYStd==0, all the coefficients are zero regardless of
// the fitIntercept.
if (yMean == 0.0) {
- logWarning(s"Mean and standard deviation of the label are zero, so the coefficients " +
- s"and the intercept will all be zero; as a result, training is not needed.")
+ instr.logWarning(s"Mean and standard deviation of the label are zero, so the " +
+ s"coefficients and the intercept will all be zero; as a result, training is not " +
+ s"needed.")
} else {
- logWarning(s"The standard deviation of the label is zero, so the coefficients will be " +
- s"zeros and the intercept will be the mean of the label; as a result, " +
+ instr.logWarning(s"The standard deviation of the label is zero, so the coefficients " +
+ s"will be zeros and the intercept will be the mean of the label; as a result, " +
s"training is not needed.")
}
if (handlePersistence) instances.unpersist()
@@ -415,7 +421,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
} else {
require($(regParam) == 0.0, "The standard deviation of the label is zero. " +
"Model cannot be regularized.")
- logWarning(s"The standard deviation of the label is zero. " +
+ instr.logWarning(s"The standard deviation of the label is zero. " +
"Consider setting fitIntercept=true.")
}
}
@@ -430,7 +436,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) {
- logWarning("Fitting LinearRegressionModel without intercept on dataset with " +
+ instr.logWarning("Fitting LinearRegressionModel without intercept on dataset with " +
"constant nonzero column, Spark MLlib outputs zero coefficients for constant nonzero " +
"columns. This behavior is the same as R glmnet but different from LIBSVM.")
}
@@ -522,7 +528,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
}
if (state == null) {
val msg = s"${optimizer.getClass.getName} failed."
- logError(msg)
+ instr.logError(msg)
throw new SparkException(msg)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/379bffa0/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
index 056a94b..9058701 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
@@ -108,9 +108,11 @@ private[spark] object RandomForest extends Logging {
case Some(instrumentation) =>
instrumentation.logNumFeatures(metadata.numFeatures)
instrumentation.logNumClasses(metadata.numClasses)
+ instrumentation.logNumExamples(metadata.numExamples)
case None =>
logInfo("numFeatures: " + metadata.numFeatures)
logInfo("numClasses: " + metadata.numClasses)
+ logInfo("numExamples: " + metadata.numExamples)
}
// Find the splits and the corresponding bins (interval between the splits) using a sample
http://git-wip-us.apache.org/repos/asf/spark/blob/379bffa0/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
index e694bc2..3247c39 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
@@ -19,6 +19,8 @@ package org.apache.spark.ml.util
import java.util.UUID
+import scala.reflect.ClassTag
+
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
@@ -40,7 +42,8 @@ import org.apache.spark.sql.Dataset
* @tparam E the type of the estimator
*/
private[spark] class Instrumentation[E <: Estimator[_]] private (
- estimator: E, dataset: RDD[_]) extends Logging {
+ val estimator: E,
+ val dataset: RDD[_]) extends Logging {
private val id = UUID.randomUUID()
private val prefix = {
@@ -103,6 +106,10 @@ private[spark] class Instrumentation[E <: Estimator[_]] private (
logNamedValue(Instrumentation.loggerTags.numClasses, num)
}
+ def logNumExamples(num: Long): Unit = {
+ logNamedValue(Instrumentation.loggerTags.numExamples, num)
+ }
+
/**
* Logs the value with customized name field.
*/
@@ -114,6 +121,10 @@ private[spark] class Instrumentation[E <: Estimator[_]] private (
log(compact(render(name -> value)))
}
+ def logNamedValue(name: String, value: Double): Unit = {
+ log(compact(render(name -> value)))
+ }
+
/**
* Logs the successful completion of the training session.
*/
@@ -131,6 +142,8 @@ private[spark] object Instrumentation {
val numFeatures = "numFeatures"
val numClasses = "numClasses"
val numExamples = "numExamples"
+ val meanOfLabels = "meanOfLabels"
+ val varianceOfLabels = "varianceOfLabels"
}
/**
@@ -150,3 +163,56 @@ private[spark] object Instrumentation {
}
}
+
+/**
+ * A small wrapper that contains an optional `Instrumentation` object.
+ * Provide some log methods, if the containing `Instrumentation` object is defined,
+ * will log via it, otherwise will log via common logger.
+ */
+private[spark] class OptionalInstrumentation private(
+ val instrumentation: Option[Instrumentation[_ <: Estimator[_]]],
+ val className: String) extends Logging {
+
+ protected override def logName: String = className
+
+ override def logInfo(msg: => String) {
+ instrumentation match {
+ case Some(instr) => instr.logInfo(msg)
+ case None => super.logInfo(msg)
+ }
+ }
+
+ override def logWarning(msg: => String) {
+ instrumentation match {
+ case Some(instr) => instr.logWarning(msg)
+ case None => super.logWarning(msg)
+ }
+ }
+
+ override def logError(msg: => String) {
+ instrumentation match {
+ case Some(instr) => instr.logError(msg)
+ case None => super.logError(msg)
+ }
+ }
+}
+
+private[spark] object OptionalInstrumentation {
+
+ /**
+ * Creates an `OptionalInstrumentation` object from an existing `Instrumentation` object.
+ */
+ def create[E <: Estimator[_]](instr: Instrumentation[E]): OptionalInstrumentation = {
+ new OptionalInstrumentation(Some(instr),
+ instr.estimator.getClass.getName.stripSuffix("$"))
+ }
+
+ /**
+ * Creates an `OptionalInstrumentation` object from a `Class` object.
+ * The created `OptionalInstrumentation` object will log messages via common logger and use the
+ * specified class name as logger name.
+ */
+ def create(clazz: Class[_]): OptionalInstrumentation = {
+ new OptionalInstrumentation(None, clazz.getName.stripSuffix("$"))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org