You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mingchao Wu (Jira)" <ji...@apache.org> on 2022/08/28 12:54:00 UTC
[jira] [Created] (SPARK-40249) Some cache miss cases in MLLib
Mingchao Wu created SPARK-40249:
-----------------------------------
Summary: Some cache miss cases in MLLib
Key: SPARK-40249
URL: https://issues.apache.org/jira/browse/SPARK-40249
Project: Spark
Issue Type: Improvement
Components: MLlib
Affects Versions: 3.2.0
Environment: Spark core/sql/mllib 3.2.0
xgboost4j-spark_2.12 1.6.2
synapseml_2.12 0.10.0
Reporter: Mingchao Wu
We develop a tool named _AutoCache_ which can detect cache miss cases in spark application. After doing many tests, we detect many cache miss cases in MLLib, could you please condider fix these cases?
* mllib/src/main/scala/org/apache/spark/ml/Predictor.scala:81
* mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:246
* mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:105
* mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:255
* mllib/src/main/scala/org/apache/spark/ml/Predictor.scala:81
* mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:188
* mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:189
* mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala/CrossValidator.scala:161
the relevant test programs are:
{code:scala}
import com.microsoft.azure.synapse.ml.train.{ComputeModelStatistics, TrainClassifier}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.SparkSession
object TrainClassifierTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder
.appName("MLtest")
.master("local[*]")
.getOrCreate()
val dataFile = "data/AdultCensusIncome.csv"
val dataFrame = spark.read.format("csv")
.option("sep", ", ")
.option("header", "true")
.option("schema", "hours-per-week: float")
.load(dataFile)
val data = dataFrame.select("education", "marital-status", "hours-per-week", "income")
val splits = data.randomSplit(Array(0.75, 0.25), 123)
val train = splits(0)
val test = splits(1)
val startTime = System.currentTimeMillis()
val model = new TrainClassifier()
.setModel(new LogisticRegression())
.setLabelCol("income")
.fit(train)
val pred = model.transform(test)
new ComputeModelStatistics().transform(pred)
println(s"time: ${System.currentTimeMillis() - startTime}")
System.in.read()
}
}
{code}
{code:scala}
import com.microsoft.azure.synapse.ml.automl.FindBestModel
import com.microsoft.azure.synapse.ml.featurize.text.TextFeaturizer
import com.microsoft.azure.synapse.ml.train.TrainClassifier
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.SparkSession
object FindBestModelTest {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder
.appName("MLtest")
.master("local[*]")
.getOrCreate()
val data = spark.read.parquet(
"data/BookReviewsFromAmazon10K.parquet"
).cache()
// data.repartition(1).write.format("parquet").save("data/BookReviewsFromAmazon10K")
val textFeaturizer = new TextFeaturizer()
.setInputCol("text")
.setOutputCol("features")
.setUseStopWordsRemover(true)
.setUseIDF(true)
.setMinDocFreq(5)
.setNumFeatures(1 << 16)
.fit(data)
val processedData = textFeaturizer.transform(data)
val processed = processedData
.withColumn("label", processedData.col("rating") > 3)
.select("features", "label")
val splits = processed.randomSplit(Array(0.80, 0.20), seed = 42)
val train = splits(0)
val test = splits(1)
val lrHyperParams = Array(0.05, 0.1)
val lrs = lrHyperParams.map(p => new LogisticRegression().setRegParam(p))
val lrmodels: Array[Transformer] = lrs.map(lr => new TrainClassifier().setModel(lr).setLabelCol("label").fit(train))
new FindBestModel()
.setModels(lrmodels)
.setEvaluationMetric("AUC")
.fit(test)
System.in.read()
}
}
{code}
{code:scala}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.ml.tuning._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassifier, XGBoostClassificationModel}
// this example works with Iris dataset (https://archive.ics.uci.edu/ml/datasets/iris)
object SparkMLlibPipelineTest {
def main(args: Array[String]): Unit = {
val inputPath = "data/iris.csv"
val nativeModelPath = "data/model"
val pipelineModelPath = "data/model"
val (treeMethod, numWorkers) = if (args.length == 4 && args(3) == "gpu") {
("gpu_hist", 1)
} else ("auto", 2)
val spark = SparkSession
.builder()
.appName("XGBoost4J-Spark Pipeline Example")
.master("local[*]")
.getOrCreate()
// Load dataset
val schema = new StructType(Array(
StructField("sepal length", DoubleType, true),
StructField("sepal width", DoubleType, true),
StructField("petal length", DoubleType, true),
StructField("petal width", DoubleType, true),
StructField("class", StringType, true)))
val rawInput = spark.read.schema(schema).option("header", false).csv(inputPath)
// Split training and test dataset
val Array(training, test) = rawInput.randomSplit(Array(0.8, 0.2), 123)
// Build ML pipeline, it includes 4 stages:
// 1, Assemble all features into a single vector column.
// 2, From string label to indexed double label.
// 3, Use XGBoostClassifier to train classification model.
// 4, Convert indexed double label back to original string label.
val assembler = new VectorAssembler()
.setInputCols(Array("sepal length", "sepal width", "petal length", "petal width"))
.setOutputCol("features")
val labelIndexer = new StringIndexer()
.setInputCol("class")
.setOutputCol("classIndex")
.fit(training)
val booster = new XGBoostClassifier(
Map("eta" -> 0.1f,
"max_depth" -> 2,
"objective" -> "multi:softprob",
"num_class" -> 3,
"num_round" -> 100,
"num_workers" -> numWorkers,
"tree_method" -> treeMethod
)
)
booster.setFeaturesCol("features")
booster.setLabelCol("classIndex")
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("realLabel")
.setLabels(labelIndexer.labels)
val pipeline = new Pipeline()
.setStages(Array(assembler, labelIndexer, booster, labelConverter))
val model = pipeline.fit(training)
// Batch prediction
val prediction = model.transform(test)
// Model evaluation
val evaluator = new MulticlassClassificationEvaluator()
evaluator.setLabelCol("classIndex")
evaluator.setPredictionCol("prediction")
val accuracy = evaluator.evaluate(prediction)
println("The model accuracy is : " + accuracy)
// Tune model using cross validation
val paramGrid = new ParamGridBuilder()
.addGrid(booster.maxDepth, Array(3, 8))
.addGrid(booster.eta, Array(0.2, 0.6))
.build()
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3)
val cvModel = cv.fit(training)
val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel].stages(2)
.asInstanceOf[XGBoostClassificationModel]
println("The params of best XGBoostClassification model : " +
bestModel.extractParamMap())
println("The training summary of best XGBoostClassificationModel : " +
bestModel.summary)
// Export the XGBoostClassificationModel as local XGBoost model,
// then you can load it back in local Python environment.
bestModel.nativeBooster.saveModel(nativeModelPath)
// ML pipeline persistence
model.write.overwrite().save(pipelineModelPath)
// Load a saved model and serving
val model2 = PipelineModel.load(pipelineModelPath)
model2.transform(test)
System.in.read()
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org