You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kürşat Kurt <ku...@kursatkurt.com> on 2016/10/23 19:38:57 UTC

Random forest classifier error : Size exceeds Integer.MAX_VALUE

Hi;



I am trying to train Random forest classifier.

I have predefined classification set (classifications.csv , ~300.000 line)

While fitting, i am getting "Size exceeds Integer.MAX_VALUE" error.

 

 

Here is the code:

 

object Test1 {

 

  var savePath = "c:/Temp/SparkModel/"

  var stemmer = Resha.Instance

 

  var STOP_WORDS: Set[String] = Set()

 

  def cropSentence(s: String) = {

    s.replaceAll("\\([^\\)]*\\)", "")

      .replaceAll(" - ", " ")

      .replaceAll("-", " ")

      .replaceAll("  +", " ")

      .replaceAll(",", " ").trim()

  }

 

  def main(args: Array[String]): Unit = {

 

    val sc = new SparkConf().setAppName("Test").setMaster("local[*]")

      .set("spark.sql.warehouse.dir", "D:/Temp/wh")

      .set("spark.executor.memory", "12g")

      .set("spark.driver.memory", "4g")

      .set("spark.hadoop.validateOutputSpecs", "false")

 

    val spark = SparkSession.builder.appName("Java
Spark").config(sc).getOrCreate()

    import spark.implicits._

 

    val mainDataset =
spark.sparkContext.textFile("file:///C:/Temp/classifications.csv")

      .map( _.split(";"))

      .map(tokens => {      

         var list=new ListBuffer[String]()

      var
token0=cropSentence(tokens(0).toLowerCase(Locale.forLanguageTag("TR-tr")));


      token0.split("\\s+").map {list+=stemmer.stem(_)}   

      (tokens(1), tokens(0),list.toList.mkString(" "))

      }).toDF("className","productName")

 

 

    val classIndexer = new StringIndexer()

      .setInputCol("className")

      .setOutputCol("label")

 

    val classIndexerModel = classIndexer.fit(mainDataset)

    var mainDS=classIndexerModel.transform(mainDataset)

    classIndexerModel.write.overwrite.save(savePath + "ClassIndexer")

 

    //Tokenizer

              val tokenizer = new Tokenizer()


                           .setInputCol("productName")                     

                           .setOutputCol("words_nonfiltered")

    //StopWords

              val remover = new StopWordsRemover()

                             .setInputCol("words_nonfiltered")

                             .setOutputCol("words")

                             .setStopWords(
Array[String]("stop1","stop2","stop3"))

    //CountVectorize

 

              val countVectorizer = new CountVectorizer()

                             .setInputCol("words")

                             .setOutputCol("features")

 

              val  rfc = new RandomForestClassifier ()


                      .setLabelCol("label")

                      .setNumTrees(50)

                      .setMaxDepth(15)

                      .setFeatureSubsetStrategy("auto")

                      .setFeaturesCol("features")

                      .setImpurity("gini")

                      .setMaxBins(32)

 

 

           val pipeline = new
Pipeline().setStages(Array(tokenizer,remover,countVectorizer,rfc))

           val train =mainDS

           val model = pipeline.fit(train) <============= EXCEPTION

           model.write.overwrite.save(savePath+"RandomForestClassifier")

 

  }

}

 

 

16/10/23 19:10:37 INFO scheduler.DAGScheduler: Job 8 failed: collectAsMap at
RandomForest.scala:550, took 848.552917 s

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 1 in stage 13.0 failed 1 times, most recent failure:
Lost task 1.0 in stage 13.0 (TID 36, localhost):
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

        at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)

        at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala
:103)

        at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala
:91)

        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)

        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)

        at
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438)

        at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:674
)

        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)

        at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)

        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)

        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)

        at org.apache.spark.scheduler.Task.run(Task.scala:86)

        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        at java.lang.Thread.run(Thread.java:745)

 

Driver stacktrace:

        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedu
ler$$failJobAndIndependentStages(DAGScheduler.scala:1454)

        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSched
uler.scala:1442)

        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGSched
uler.scala:1441)

        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:5
9)

        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)

        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply
(DAGScheduler.scala:811)

        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply
(DAGScheduler.scala:811)

        at scala.Option.foreach(Option.scala:257)

        at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.sca
la:811)

        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGSched
uler.scala:1667)

        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSchedul
er.scala:1622)

        at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGSchedul
er.scala:1611)

        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

        at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)

        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)

        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:15
1)

        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:11
2)

        at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)

        at org.apache.spark.rdd.RDD.collect(RDD.scala:911)

        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDF
unctions.scala:745)

        at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDF
unctions.scala:744)

        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:15
1)

        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:11
2)

        at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)

        at
org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:74
4)

        at
org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scal
a:550)

        at
org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:187)

        at
org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForest
Classifier.scala:118)

        at
org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForest
Classifier.scala:45)

        at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)

        at org.apache.spark.ml.Predictor.fit(Predictor.scala:71)

        at
org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149)

        at
org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:145)

        at scala.collection.Iterator$class.foreach(Iterator.scala:893)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

        at
scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike
.scala:44)

        at
scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:3
7)

        at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:145)

        at main.Test1$.main(Test1.scala:162)

        at main.Test1.main(Test1.scala)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
)

        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$ru
nMain(SparkSubmit.scala:736)

        at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)

        at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)

        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)

        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)