You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mingchao Wu (Jira)" <ji...@apache.org> on 2022/08/28 12:54:00 UTC

[jira] [Created] (SPARK-40249) Some cache miss cases in MLLib

Mingchao Wu created SPARK-40249:
-----------------------------------

             Summary: Some cache miss cases in MLLib 
                 Key: SPARK-40249
                 URL: https://issues.apache.org/jira/browse/SPARK-40249
             Project: Spark
          Issue Type: Improvement
          Components: MLlib
    Affects Versions: 3.2.0
         Environment: Spark core/sql/mllib 3.2.0

xgboost4j-spark_2.12 1.6.2

synapseml_2.12 0.10.0
            Reporter: Mingchao Wu


We develop a tool named _AutoCache_ which can detect cache miss cases in spark application. After doing many tests, we detect many cache miss cases in MLLib, could you please condider fix these cases?
 *  mllib/src/main/scala/org/apache/spark/ml/Predictor.scala:81
 * mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:246
 * mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:105
 * mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:255
 * mllib/src/main/scala/org/apache/spark/ml/Predictor.scala:81
 * mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:188
 * mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala:189
 * mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala/CrossValidator.scala:161

the relevant test programs are:
{code:scala}
import com.microsoft.azure.synapse.ml.train.{ComputeModelStatistics, TrainClassifier}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.SparkSession


object TrainClassifierTest {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder
      .appName("MLtest")
      .master("local[*]")
      .getOrCreate()

    val dataFile = "data/AdultCensusIncome.csv"

    val dataFrame = spark.read.format("csv")
      .option("sep", ", ")
      .option("header", "true")
      .option("schema", "hours-per-week: float")
      .load(dataFile)

    val data = dataFrame.select("education", "marital-status", "hours-per-week", "income")
    val splits = data.randomSplit(Array(0.75, 0.25), 123)
    val train = splits(0)
    val test = splits(1)

    val startTime = System.currentTimeMillis()

    val model = new TrainClassifier()
      .setModel(new LogisticRegression())
      .setLabelCol("income")
      .fit(train)

    val pred = model.transform(test)
    new ComputeModelStatistics().transform(pred)


    println(s"time: ${System.currentTimeMillis() - startTime}")
    System.in.read()
  }
}
{code}
{code:scala}
import com.microsoft.azure.synapse.ml.automl.FindBestModel
import com.microsoft.azure.synapse.ml.featurize.text.TextFeaturizer
import com.microsoft.azure.synapse.ml.train.TrainClassifier
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.SparkSession

object FindBestModelTest {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder
      .appName("MLtest")
      .master("local[*]")
      .getOrCreate()

    val data = spark.read.parquet(
      "data/BookReviewsFromAmazon10K.parquet"
    ).cache()
    //    data.repartition(1).write.format("parquet").save("data/BookReviewsFromAmazon10K")
    val textFeaturizer = new TextFeaturizer()
      .setInputCol("text")
      .setOutputCol("features")
      .setUseStopWordsRemover(true)
      .setUseIDF(true)
      .setMinDocFreq(5)
      .setNumFeatures(1 << 16)
      .fit(data)

    val processedData = textFeaturizer.transform(data)
    val processed = processedData
      .withColumn("label", processedData.col("rating") > 3)
      .select("features", "label")

    val splits = processed.randomSplit(Array(0.80, 0.20), seed = 42)

    val train = splits(0)
    val test = splits(1)

    val lrHyperParams = Array(0.05, 0.1)

    val lrs = lrHyperParams.map(p => new LogisticRegression().setRegParam(p))

    val lrmodels: Array[Transformer] = lrs.map(lr => new TrainClassifier().setModel(lr).setLabelCol("label").fit(train))

    new FindBestModel()
      .setModels(lrmodels)
      .setEvaluationMetric("AUC")
      .fit(test)

    System.in.read()
  }
}
{code}
{code:scala}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.ml.tuning._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassifier, XGBoostClassificationModel}

// this example works with Iris dataset (https://archive.ics.uci.edu/ml/datasets/iris)

object SparkMLlibPipelineTest {

  def main(args: Array[String]): Unit = {

    val inputPath = "data/iris.csv"
    val nativeModelPath = "data/model"
    val pipelineModelPath = "data/model"

    val (treeMethod, numWorkers) = if (args.length == 4 && args(3) == "gpu") {
      ("gpu_hist", 1)
    } else ("auto", 2)

    val spark = SparkSession
      .builder()
      .appName("XGBoost4J-Spark Pipeline Example")
      .master("local[*]")
      .getOrCreate()

    // Load dataset
    val schema = new StructType(Array(
      StructField("sepal length", DoubleType, true),
      StructField("sepal width", DoubleType, true),
      StructField("petal length", DoubleType, true),
      StructField("petal width", DoubleType, true),
      StructField("class", StringType, true)))

    val rawInput = spark.read.schema(schema).option("header", false).csv(inputPath)

    // Split training and test dataset
    val Array(training, test) = rawInput.randomSplit(Array(0.8, 0.2), 123)

    // Build ML pipeline, it includes 4 stages:
    // 1, Assemble all features into a single vector column.
    // 2, From string label to indexed double label.
    // 3, Use XGBoostClassifier to train classification model.
    // 4, Convert indexed double label back to original string label.
    val assembler = new VectorAssembler()
      .setInputCols(Array("sepal length", "sepal width", "petal length", "petal width"))
      .setOutputCol("features")
    val labelIndexer = new StringIndexer()
      .setInputCol("class")
      .setOutputCol("classIndex")
      .fit(training)
    val booster = new XGBoostClassifier(
      Map("eta" -> 0.1f,
        "max_depth" -> 2,
        "objective" -> "multi:softprob",
        "num_class" -> 3,
        "num_round" -> 100,
        "num_workers" -> numWorkers,
        "tree_method" -> treeMethod
      )
    )
    booster.setFeaturesCol("features")
    booster.setLabelCol("classIndex")
    val labelConverter = new IndexToString()
      .setInputCol("prediction")
      .setOutputCol("realLabel")
      .setLabels(labelIndexer.labels)

    val pipeline = new Pipeline()
      .setStages(Array(assembler, labelIndexer, booster, labelConverter))
    val model = pipeline.fit(training)

    // Batch prediction
    val prediction = model.transform(test)

    // Model evaluation
    val evaluator = new MulticlassClassificationEvaluator()
    evaluator.setLabelCol("classIndex")
    evaluator.setPredictionCol("prediction")
    val accuracy = evaluator.evaluate(prediction)
    println("The model accuracy is : " + accuracy)

    // Tune model using cross validation
    val paramGrid = new ParamGridBuilder()
      .addGrid(booster.maxDepth, Array(3, 8))
      .addGrid(booster.eta, Array(0.2, 0.6))
      .build()
    val cv = new CrossValidator()
      .setEstimator(pipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(3)

    val cvModel = cv.fit(training)

    val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel].stages(2)
      .asInstanceOf[XGBoostClassificationModel]
    println("The params of best XGBoostClassification model : " +
      bestModel.extractParamMap())
    println("The training summary of best XGBoostClassificationModel : " +
      bestModel.summary)

    // Export the XGBoostClassificationModel as local XGBoost model,
    // then you can load it back in local Python environment.
    bestModel.nativeBooster.saveModel(nativeModelPath)

    // ML pipeline persistence
    model.write.overwrite().save(pipelineModelPath)

    // Load a saved model and serving
    val model2 = PipelineModel.load(pipelineModelPath)
    model2.transform(test)

    System.in.read()
  }
}
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org