You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aakash Basu <aa...@gmail.com> on 2018/09/12 08:40:03 UTC

[Help] Set nThread in Spark cluster

Hi,

API = ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier

val xgbParam = Map("eta" -> 0.1f,

     |       "max_depth" -> 2,

     |       "objective" -> "multi:softprob",

     |       "num_class" -> 3,

     |       "num_round" -> 100,

     |       "num_workers" -> 2)

I'm running a job which will not work until the number of threads of the
API is equivalent to the num_worker set for Spark.

So, in master = local mode, when I do --master local[n] and also set
num_worker for that API as the same value as n, it works.

But, in cluster I do not know which parameter to control which precisely
takes the call of handling the number of threads. I tried with -

1) spark.task.cpus
2) spark.default.parallelism
3) executor cores

But, none of them works, and the speciality of this issue is, it goes into
a halt while distributing the XGBoost model if the above condition is not
met.


My code is as follows, it works in local mode, but not in cluster, any help?

Code:


>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *import org.apache.spark.sql.types.StringTypeimport
> org.apache.spark.sql.types.DoubleTypeimport
> org.apache.spark.sql.types.StructFieldimport
> org.apache.spark.sql.types.StructTypeval schema = new StructType(Array(
> StructField("sepal length", DoubleType, true),  StructField("sepal width",
> DoubleType, true),  StructField("petal length", DoubleType, true),
> StructField("petal width", DoubleType, true),  StructField("class",
> StringType, true)))val rawInput =
> spark.read.schema(schema).csv("file:///appdata/bblite-data/iris.csv")import
> org.apache.spark.ml.feature.StringIndexerval stringIndexer = new
> StringIndexer().  setInputCol("class").  setOutputCol("classIndex").
> fit(rawInput)val labelTransformed =
> stringIndexer.transform(rawInput).drop("class")import
> org.apache.spark.ml.feature.VectorAssemblerval vectorAssembler = new
> VectorAssembler().  setInputCols(Array("sepal length", "sepal width",
> "petal length", "petal width")).  setOutputCol("features")val xgbInput =
> vectorAssembler.transform(labelTransformed).select("features",
> "classIndex")import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifierval
> xgbParam = Map("eta" -> 0.1f,      "max_depth" -> 2,      "objective" ->
> "multi:softprob",      "num_class" -> 3,      "num_round" -> 100,
> "num_workers" -> 2)val xgbClassifier = new XGBoostClassifier(xgbParam).
>   setFeaturesCol("features").      setLabelCol("classIndex")val
> xgbClassificationModel = xgbClassifier.fit(xgbInput)*



Link of the same question I posted in stackoverflow:
https://stackoverflow.com/questions/52290938/set-nthread-in-spark-cluster-for-xgboost

Thanks,
Aakash.