You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aakash Basu <aa...@gmail.com> on 2018/09/12 08:40:03 UTC
[Help] Set nThread in Spark cluster
Hi,
API = ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
val xgbParam = Map("eta" -> 0.1f,
| "max_depth" -> 2,
| "objective" -> "multi:softprob",
| "num_class" -> 3,
| "num_round" -> 100,
| "num_workers" -> 2)
I'm running a job which will not work until the number of threads of the
API is equivalent to the num_worker set for Spark.
So, in master = local mode, when I do --master local[n] and also set
num_worker for that API as the same value as n, it works.
But, in cluster I do not know which parameter to control which precisely
takes the call of handling the number of threads. I tried with -
1) spark.task.cpus
2) spark.default.parallelism
3) executor cores
But, none of them works, and the speciality of this issue is, it goes into
a halt while distributing the XGBoost model if the above condition is not
met.
My code is as follows, it works in local mode, but not in cluster, any help?
Code:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *import org.apache.spark.sql.types.StringTypeimport
> org.apache.spark.sql.types.DoubleTypeimport
> org.apache.spark.sql.types.StructFieldimport
> org.apache.spark.sql.types.StructTypeval schema = new StructType(Array(
> StructField("sepal length", DoubleType, true), StructField("sepal width",
> DoubleType, true), StructField("petal length", DoubleType, true),
> StructField("petal width", DoubleType, true), StructField("class",
> StringType, true)))val rawInput =
> spark.read.schema(schema).csv("file:///appdata/bblite-data/iris.csv")import
> org.apache.spark.ml.feature.StringIndexerval stringIndexer = new
> StringIndexer(). setInputCol("class"). setOutputCol("classIndex").
> fit(rawInput)val labelTransformed =
> stringIndexer.transform(rawInput).drop("class")import
> org.apache.spark.ml.feature.VectorAssemblerval vectorAssembler = new
> VectorAssembler(). setInputCols(Array("sepal length", "sepal width",
> "petal length", "petal width")). setOutputCol("features")val xgbInput =
> vectorAssembler.transform(labelTransformed).select("features",
> "classIndex")import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifierval
> xgbParam = Map("eta" -> 0.1f, "max_depth" -> 2, "objective" ->
> "multi:softprob", "num_class" -> 3, "num_round" -> 100,
> "num_workers" -> 2)val xgbClassifier = new XGBoostClassifier(xgbParam).
> setFeaturesCol("features"). setLabelCol("classIndex")val
> xgbClassificationModel = xgbClassifier.fit(xgbInput)*
Link of the same question I posted in stackoverflow:
https://stackoverflow.com/questions/52290938/set-nthread-in-spark-cluster-for-xgboost
Thanks,
Aakash.