You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kürşat Kurt <ku...@kursatkurt.com> on 2016/11/07 05:32:28 UTC

Out of memory at 60GB free memory.

Hi;

I am trying to use Naive Bayes for multi-class classification.

I am getting OOM at "pipeline.fit(train)" line. When i submit the code,
everything is ok so far the stage "collect at NaiveBayes.scala:400".

At this stage, starting 375 tasks very fast and going slowing down at this
point. Task count could not became 500, getting OOM at 380-390th task.

 

Spark-submit parameters: 

./spark-submit --class main.scala.Test1 --master local[8]  --driver-memory
60g  /projects/simple-project_2.11-1.0.jar

 

System properties:

Mode: client

Free Mem: 60GB(Total 64GB)

OS: Ubuntu 14.04

Core : 8

Java:1.8

 

Code:

 

    object Test {

 

      var num = 50;   

      var savePath = "hdfs://localhost:54310/SparkWork/SparkModel/";   

      var stemmer = Resha.Instance

 

      var STOP_WORDS: Set[String] = Set();

 

      def cropSentence(s: String) = {

        s.replaceAll("\\([^\\)]*\\)", "")          

          .replaceAll(" - ", " ")

          .replaceAll("-", " ")

          .replaceAll(" tr. ", " ")

          .replaceAll("  +", " ")

          .replaceAll(",", " ").trim();   }

 

      def main(args: Array[String]): Unit = {

 

        val start1 = System.currentTimeMillis();

 

        val sc = new SparkConf().setAppName("Test")    

        .set("spark.hadoop.validateOutputSpecs", "false")

 
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

        .set("spark.kryoserializer.buffer.max","1g")

        .set("spark.driver.maxResultSize","20g")

        .set("spark.executor.memory", "30g")

        .set("spark.executor.cores", "6")

        .set("spark.speculation", "true")

        .set("spark.dynamicAllocation.enabled","true")

        .set("spark.files.overwrite","true")

 

        .set("spark.executor.heartbeatInterval","25s")

        .set("spark.sql.shuffle.partitions","2")

        .set("spark.sql.warehouse.dir", savePath+"wh")

 

        val spark = SparkSession.builder.appName("Java
Spark").config(sc).getOrCreate();

        import spark.implicits._

 

        val mainDataset =
spark.sparkContext.textFile(savePath+"classifications.csv")

          .map( _.split("ß"))

          .map(tokens => {      

             var list=new ListBuffer[String]();

          var
token0=cropSentence(tokens(0).toLowerCase(Locale.forLanguageTag("TR-tr")));

 

          token0.split("\\s+").map {list+=stemmer.stem(_)}   

          (tokens(1), list.toList.mkString(" "))

          }).toDF("className","productName");

 

         val classIndexer = new StringIndexer()

          .setInputCol("className")

          .setOutputCol("label");      

 

        val classIndexerModel = classIndexer.fit(mainDataset);

        var mainDS=classIndexerModel.transform(mainDataset);

        classIndexerModel.write.overwrite.save(savePath + "ClassIndexer");

 

 

        mainDS.write.mode(SaveMode.Overwrite).parquet(savePath+"processed");

        mainDS=spark.sqlContext.read.load(savePath+"processed")

         //Tokenizer

                  val tokenizer = new Tokenizer()


                               .setInputCol("productName")


                               .setOutputCol("words_nonfiltered")

                               ;

 

        //StopWords

                  val remover = new StopWordsRemover()

                                 .setInputCol("words_nonfiltered")

                                 .setOutputCol("words")

                                 .setStopWords(
Array[String]("word1","word2","-","//"));

 

        //CountVectorize

 

                  val countVectorizer = new CountVectorizer()

                                 .setInputCol("words")

                                 .setOutputCol("features")

 

 

                  val nb = new NaiveBayes()

                   .setSmoothing(0.1)

                   .setModelType("multinomial")

 

               val pipeline = new
Pipeline().setStages(Array(tokenizer,remover,countVectorizer,nb));

 

 

               val train =mainDS.repartition(500);

               val model = pipeline.fit(train);

 
model.write.overwrite.save(savePath+"RandomForestClassifier");

 

 

      } }

 

 

 

 

 

 

 



 

Log:

16/11/07 02:07:28 INFO Executor: Finished task 116.0 in stage 15.0 (TID
2433). 1025857381 bytes result sent via BlockManager)

16/11/07 02:07:28 INFO TaskSetManager: Finished task 44.0 in stage 15.0 (TID
2415) in 175757 ms on localhost (384/500)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 140.0 in stage 15.0
(TID 2439, localhost, partition 140, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 140.0 in stage 15.0 (TID 2439)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 144.0 in stage 15.0
(TID 2440, localhost, partition 144, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 144.0 in stage 15.0 (TID 2440)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 148.0 in stage 15.0
(TID 2441, localhost, partition 148, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 148.0 in stage 15.0 (TID 2441)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 152.0 in stage 15.0
(TID 2442, localhost, partition 152, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 152.0 in stage 15.0 (TID 2442)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 156.0 in stage 15.0
(TID 2443, localhost, partition 156, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 156.0 in stage 15.0 (TID 2443)

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 494 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches
in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 490 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches
in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 497 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches
in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 496 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches
in 1 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 495 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches
in 1 ms

16/11/07 02:07:43 INFO BlockManagerInfo: Removed taskresult_2412 on
89.163.242.124:49610 in memory (size: 979.2 MB, free: 8.1 GB)

16/11/07 02:08:32 WARN NettyRpcEndpointRef: Error sending message [message =
Heartbeat(driver,[Lscala.Tuple2;@544bf77,BlockManagerId(driver,
89.163.242.124, 49610))] in 1 attempts

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [25
seconds]. This timeout is controlled by spark.executor.heartbeatInterval

        at
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTi
meoutException(RpcTimeout.scala:48)

        at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(R
pcTimeout.scala:63)

        at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(R
pcTimeout.scala:59)

        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)

        at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)

        at
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$repor
tHeartBeat(Executor.scala:518)

        at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Execu
tor.scala:547)

        at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.sca
la:547)

        at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.sca
la:547)

        at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)

        at
org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)

        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$
301(ScheduledThreadPoolExecutor.java:180)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Sch
eduledThreadPoolExecutor.java:294)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[25 seconds]

        at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

        at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

        at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scal
a:53)

        at scala.concurrent.Await$.result(package.scala:190)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)

        ... 14 more

16/11/07 02:09:04 WARN NettyRpcEndpointRef: Error sending message [message =
Heartbeat(driver,[Lscala.Tuple2;@544bf77,BlockManagerId(driver,
89.163.242.124, 49610))] in 2 attempts

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [25
seconds]. This timeout is controlled by spark.executor.heartbeatInterval

        at
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTi
meoutException(RpcTimeout.scala:48)

        at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(R
pcTimeout.scala:63)

        at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(R
pcTimeout.scala:59)

        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)

        at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)

        at
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$repor
tHeartBeat(Executor.scala:518)

        at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Execu
tor.scala:547)

        at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.sca
la:547)

        at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.sca
la:547)

        at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)

        at
org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)

        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$
301(ScheduledThreadPoolExecutor.java:180)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Sch
eduledThreadPoolExecutor.java:294)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[25 seconds]

        at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

        at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

        at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scal
a:53)

        at scala.concurrent.Await$.result(package.scala:190)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)

        ... 14 more

16/11/07 02:09:18 WARN TaskMemoryManager: leak 1097.3 MB memory from
org.apache.spark.util.collection.ExternalAppendOnlyMap@53cf605d

16/11/07 02:09:31 ERROR Executor: Exception in task 140.0 in stage 15.0 (TID
2439)

java.lang.OutOfMemoryError: GC overhead limit exceeded

        at
net.jpountz.xxhash.StreamingXXHash32.asChecksum(StreamingXXHash32.java:81)

        at
org.apache.spark.io.LZ4BlockInputStream.<init>(LZ4BlockInputStream.java:94)

        at
org.apache.spark.io.LZ4BlockInputStream.<init>(LZ4BlockInputStream.java:104)

        at
org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCod
ec.scala:118)

        at
org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerM
anager.scala:116)

        at
org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStore
ShuffleReader.scala:56)

        at
org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStore
ShuffleReader.scala:55)

        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

        at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32
)

        at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:3
9)

        at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalApp
endOnlyMap.scala:154)

        at
org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)

        at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReade
r.scala:85)

        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)

        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)

        at org.apache.spark.scheduler.Task.run(Task.scala:86)

        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        at java.lang.Thread.run(Thread.java:745)

16/11/07 02:09:38 ERROR Executor: Exception in task 128.0 in stage 15.0 (TID
2436)

java.lang.OutOfMemoryError: Java heap space

        at com.esotericsoftware.kryo.io.Output.require(Output.java:168)

        at com.esotericsoftware.kryo.io.Output.writeLong(Output.java:519)

        at com.esotericsoftware.kryo.io.Output.writeDouble(Output.java:648)

        at com.esotericsoftware.kryo.io.Output.writeDoubles(Output.java:729)

        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySer
ializer.write(DefaultArraySerializers.java:216)

        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySer
ializer.write(DefaultArraySerializers.java:205)

        at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:606)

        at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:87)

        at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.
java:518)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)

        at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)

        at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySer
ializer.write(DefaultArraySerializers.java:366)

        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySer
ializer.write(DefaultArraySerializers.java:307)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.
scala:297)

        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        at java.lang.Thread.run(Thread.java:745)


RE: Out of memory at 60GB free memory.

Posted by Kürşat Kurt <ku...@kursatkurt.com>.
Spark is not running  on mesos, runing only client mode.

 

From: Rodrick Brown [mailto:rodrick@orchardplatform.com] 
Sent: Monday, November 7, 2016 8:15 PM
To: Kürşat Kurt <ku...@kursatkurt.com>
Cc: Sean Owen <so...@cloudera.com>; User <us...@spark.apache.org>
Subject: Re: Out of memory at 60GB free memory.

 

You should also set memory overhead i.e. --conf spark.mesos.executor.memoryOverhead=${EXECUTOR_MEM} * .10

 

On Mon, Nov 7, 2016 at 6:51 AM, Kürşat Kurt <kursat@kursatkurt.com <ma...@kursatkurt.com> > wrote:

I understand that i shoud set the executor memory. I tried with the parameters below but OOM still occures...

./spark-submit --class main.scala.Test1 --master local[8]  --driver-memory 20g --executor-memory 20g

 

From: Sean Owen [mailto:sowen@cloudera.com <ma...@cloudera.com> ] 
Sent: Monday, November 7, 2016 12:21 PM
To: Kürşat Kurt <kursat@kursatkurt.com <ma...@kursatkurt.com> >; user@spark.apache.org <ma...@spark.apache.org> 
Subject: Re: Out of memory at 60GB free memory.

 

You say "out of memory", and you allocate a huge amount of driver memory, but, it's your executor that's running out of memory. You want --executor-memory. You can't set it after the driver has run.

On Mon, Nov 7, 2016 at 5:35 AM Kürşat Kurt <kursat@kursatkurt.com <ma...@kursatkurt.com> > wrote:

Hi;

I am trying to use Naive Bayes for multi-class classification.

I am getting OOM at “pipeline.fit(train)” line. When i submit the code, everything is ok so far the stage “collect at NaiveBayes.scala:400”.

At this stage, starting 375 tasks very fast and going slowing down at this point. Task count could not became 500, getting OOM at 380-390th task.

 





 

-- 

 <http://www.orchardplatform.com/> 

Rodrick Brown / DevOPs

9174456839 / rodrick@orchardplatform.com <ma...@orchardplatform.com> 

Orchard Platform 
101 5th Avenue, 4th Floor, New York, NY

 

NOTICE TO RECIPIENTS: This communication is confidential and intended for the use of the addressee only. If you are not an intended recipient of this communication, please delete it immediately and notify the sender by return email. Unauthorized reading, dissemination, distribution or copying of this communication is prohibited. This communication does not constitute an offer to sell or a solicitation of an indication of interest to purchase any loan, security or any other financial product or instrument, nor is it an offer to sell or a solicitation of an indication of interest to purchase any products or services to any persons who are prohibited from receiving such information under applicable law. The contents of this communication may not be accurate or complete and are subject to change without notice. As such, Orchard App, Inc. (including its subsidiaries and affiliates, "Orchard") makes no representation regarding the accuracy or completeness of the information contained herein. The intended recipient is advised to consult its own professional advisors, including those specializing in legal, tax and accounting matters. Orchard does not provide legal, tax or accounting advice.


Re: Out of memory at 60GB free memory.

Posted by Rodrick Brown <ro...@orchardplatform.com>.
You should also set memory overhead i.e. --conf
spark.mesos.executor.memoryOverhead=${EXECUTOR_MEM} * .10

On Mon, Nov 7, 2016 at 6:51 AM, Kürşat Kurt <ku...@kursatkurt.com> wrote:

> I understand that i shoud set the executor memory. I tried with the
> parameters below but OOM still occures...
>
> ./spark-submit --class main.scala.Test1 --master local[8]  --driver-memory
> 20g --executor-memory 20g
>
>
>
> *From:* Sean Owen [mailto:sowen@cloudera.com]
> *Sent:* Monday, November 7, 2016 12:21 PM
> *To:* Kürşat Kurt <ku...@kursatkurt.com>; user@spark.apache.org
> *Subject:* Re: Out of memory at 60GB free memory.
>
>
>
> You say "out of memory", and you allocate a huge amount of driver memory,
> but, it's your executor that's running out of memory. You want
> --executor-memory. You can't set it after the driver has run.
>
> On Mon, Nov 7, 2016 at 5:35 AM Kürşat Kurt <ku...@kursatkurt.com> wrote:
>
> Hi;
>
> I am trying to use Naive Bayes for multi-class classification.
>
> I am getting OOM at “pipeline.fit(train)” line. When i submit the code,
> everything is ok so far the stage “collect at NaiveBayes.scala:400”.
>
> At this stage, starting 375 tasks very fast and going slowing down at this
> point. Task count could not became 500, getting OOM at 380-390th task.
>
>
>
>


-- 

[image: Orchard Platform] <http://www.orchardplatform.com/>

*Rodrick Brown */ *DevOPs*

9174456839 / rodrick@orchardplatform.com

Orchard Platform
101 5th Avenue, 4th Floor, New York, NY

-- 
*NOTICE TO RECIPIENTS*: This communication is confidential and intended for 
the use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an 
offer to sell or a solicitation of an indication of interest to purchase 
any loan, security or any other financial product or instrument, nor is it 
an offer to sell or a solicitation of an indication of interest to purchase 
any products or services to any persons who are prohibited from receiving 
such information under applicable law. The contents of this communication 
may not be accurate or complete and are subject to change without notice. 
As such, Orchard App, Inc. (including its subsidiaries and affiliates, 
"Orchard") makes no representation regarding the accuracy or completeness 
of the information contained herein. The intended recipient is advised to 
consult its own professional advisors, including those specializing in 
legal, tax and accounting matters. Orchard does not provide legal, tax or 
accounting advice.

RE: Out of memory at 60GB free memory.

Posted by Kürşat Kurt <ku...@kursatkurt.com>.
I understand that i shoud set the executor memory. I tried with the parameters below but OOM still occures...

./spark-submit --class main.scala.Test1 --master local[8]  --driver-memory 20g --executor-memory 20g

 

From: Sean Owen [mailto:sowen@cloudera.com] 
Sent: Monday, November 7, 2016 12:21 PM
To: Kürşat Kurt <ku...@kursatkurt.com>; user@spark.apache.org
Subject: Re: Out of memory at 60GB free memory.

 

You say "out of memory", and you allocate a huge amount of driver memory, but, it's your executor that's running out of memory. You want --executor-memory. You can't set it after the driver has run.

On Mon, Nov 7, 2016 at 5:35 AM Kürşat Kurt <kursat@kursatkurt.com <ma...@kursatkurt.com> > wrote:

Hi;

I am trying to use Naive Bayes for multi-class classification.

I am getting OOM at “pipeline.fit(train)” line. When i submit the code, everything is ok so far the stage “collect at NaiveBayes.scala:400”.

At this stage, starting 375 tasks very fast and going slowing down at this point. Task count could not became 500, getting OOM at 380-390th task.

 


Re: Out of memory at 60GB free memory.

Posted by Sean Owen <so...@cloudera.com>.
You say "out of memory", and you allocate a huge amount of driver memory,
but, it's your executor that's running out of memory. You want
--executor-memory. You can't set it after the driver has run.

On Mon, Nov 7, 2016 at 5:35 AM Kürşat Kurt <ku...@kursatkurt.com> wrote:

> Hi;
>
> I am trying to use Naive Bayes for multi-class classification.
>
> I am getting OOM at “pipeline.fit(train)” line. When i submit the code,
> everything is ok so far the stage “collect at NaiveBayes.scala:400”.
>
> At this stage, starting 375 tasks very fast and going slowing down at this
> point. Task count could not became 500, getting OOM at 380-390th task.
>
>
>

RE: Out of memory at 60GB free memory.

Posted by Kürşat Kurt <ku...@kursatkurt.com>.
Hi Daniel,

 

I started with 10g-10g, when oom occured i increased to this sizes.

I think i dont need(csv file has 300.00 lines), but i dont know what  causes this...

 

From: Daniel van der Ende [mailto:daniel.vanderende@gmail.com] 
Sent: Monday, November 7, 2016 10:03 AM
To: Kürşat Kurt <ku...@kursatkurt.com>
Cc: user@spark.apache.org
Subject: Re: Out of memory at 60GB free memory.

 

Looks to me like you are actually running out of memory. You're setting driver memory to 60G and executor memory to 30G. With only 64G available in total, and running in local mode, you won't have any memory left. Do you really need such a large amount of driver memory for this job? 

Cheers, 
Daniel

 

On 7 Nov 2016 6:35 a.m., "Kürşat Kurt" <kursat@kursatkurt.com <ma...@kursatkurt.com> > wrote:

Hi;

I am trying to use Naive Bayes for multi-class classification.

I am getting OOM at “pipeline.fit(train)” line. When i submit the code, everything is ok so far the stage “collect at NaiveBayes.scala:400”.

At this stage, starting 375 tasks very fast and going slowing down at this point. Task count could not became 500, getting OOM at 380-390th task.

 

Spark-submit parameters: 

./spark-submit --class main.scala.Test1 --master local[8]  --driver-memory 60g  /projects/simple-project_2.11-1.0.jar

 

System properties:

Mode: client

Free Mem: 60GB(Total 64GB)

OS: Ubuntu 14.04

Core : 8

Java:1.8

 

Code:

 

    object Test {

 

      var num = 50;   

      var savePath = "hdfs://localhost:54310/SparkWork/SparkModel/";   

      var stemmer = Resha.Instance

 

      var STOP_WORDS: Set[String] = Set();

 

      def cropSentence(s: String) = {

        s.replaceAll("\\([^\\)]*\\)", "")          

          .replaceAll(" - ", " ")

          .replaceAll("-", " ")

          .replaceAll(" tr. ", " ")

          .replaceAll("  +", " ")

          .replaceAll(",", " ").trim();   }

 

      def main(args: Array[String]): Unit = {

 

        val start1 = System.currentTimeMillis();

 

        val sc = new SparkConf().setAppName("Test")    

        .set("spark.hadoop.validateOutputSpecs", "false")

        .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

        .set("spark.kryoserializer.buffer.max","1g")

        .set("spark.driver.maxResultSize","20g")

        .set("spark.executor.memory", "30g")

        .set("spark.executor.cores", "6")

        .set("spark.speculation", "true")

        .set("spark.dynamicAllocation.enabled","true")

        .set("spark.files.overwrite","true")

 

        .set("spark.executor.heartbeatInterval","25s")

        .set("spark.sql.shuffle.partitions","2")

        .set("spark.sql.warehouse.dir", savePath+"wh")

 

        val spark = SparkSession.builder.appName("Java Spark").config(sc).getOrCreate();

        import spark.implicits._

 

        val mainDataset = spark.sparkContext.textFile(savePath+"classifications.csv")

          .map( _.split("ß"))

          .map(tokens => {      

             var list=new ListBuffer[String]();

          var token0=cropSentence(tokens(0).toLowerCase(Locale.forLanguageTag("TR-tr")));

 

          token0.split("\\s+ <file://s+> ").map {list+=stemmer.stem(_)}   

          (tokens(1), list.toList.mkString(" "))

          }).toDF("className","productName");

 

         val classIndexer = new StringIndexer()

          .setInputCol("className")

          .setOutputCol("label");      

 

        val classIndexerModel = classIndexer.fit(mainDataset);

        var mainDS=classIndexerModel.transform(mainDataset);

        classIndexerModel.write.overwrite.save(savePath + "ClassIndexer");

 

 

        mainDS.write.mode(SaveMode.Overwrite).parquet(savePath+"processed");

        mainDS=spark.sqlContext.read.load(savePath+"processed")

         //Tokenizer

                  val tokenizer = new Tokenizer()                                

                               .setInputCol("productName")                     

                               .setOutputCol("words_nonfiltered")

                               ;

 

        //StopWords

                  val remover = new StopWordsRemover()

                                 .setInputCol("words_nonfiltered")

                                 .setOutputCol("words")

                                 .setStopWords( Array[String]("word1","word2","-","//"));

 

        //CountVectorize

 

                  val countVectorizer = new CountVectorizer()

                                 .setInputCol("words")

                                 .setOutputCol("features")

 

 

                  val nb = new NaiveBayes()

                   .setSmoothing(0.1)

                   .setModelType("multinomial")

 

               val pipeline = new Pipeline().setStages(Array(tokenizer,remover,countVectorizer,nb));

 

 

               val train =mainDS.repartition(500);

               val model = pipeline.fit(train);

               model.write.overwrite.save(savePath+"RandomForestClassifier");

 

 

      } }

 

 

 

 

 

 





 

Log:

16/11/07 02:07:28 INFO Executor: Finished task 116.0 in stage 15.0 (TID 2433). 1025857381 bytes result sent via BlockManager)

16/11/07 02:07:28 INFO TaskSetManager: Finished task 44.0 in stage 15.0 (TID 2415) in 175757 ms on localhost (384/500)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 140.0 in stage 15.0 (TID 2439, localhost, partition 140, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 140.0 in stage 15.0 (TID 2439)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 144.0 in stage 15.0 (TID 2440, localhost, partition 144, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 144.0 in stage 15.0 (TID 2440)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 148.0 in stage 15.0 (TID 2441, localhost, partition 148, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 148.0 in stage 15.0 (TID 2441)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 152.0 in stage 15.0 (TID 2442, localhost, partition 152, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 152.0 in stage 15.0 (TID 2442)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 156.0 in stage 15.0 (TID 2443, localhost, partition 156, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 156.0 in stage 15.0 (TID 2443)

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 494 non-empty blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 490 non-empty blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 497 non-empty blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 496 non-empty blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 495 non-empty blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

16/11/07 02:07:43 INFO BlockManagerInfo: Removed taskresult_2412 on 89.163.242.124:49610 <http://89.163.242.124:49610>  in memory (size: 979.2 MB, free: 8.1 GB)

16/11/07 02:08:32 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@544bf77,BlockManagerId(driver, 89.163.242.124, 49610))] in 1 attempts

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [25 seconds]. This timeout is controlled by spark.executor.heartbeatInterval

        at org.apache.spark.rpc.RpcTimeout.org <http://org.apache.spark.rpc.RpcTimeout.org> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)

        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)

        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)

        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)

        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)

        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.util.concurrent.TimeoutException: Futures timed out after [25 seconds]

        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

        at scala.concurrent.Await$.result(package.scala:190)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)

        ... 14 more

16/11/07 02:09:04 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@544bf77,BlockManagerId(driver, 89.163.242.124, 49610))] in 2 attempts

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [25 seconds]. This timeout is controlled by spark.executor.heartbeatInterval

        at org.apache.spark.rpc.RpcTimeout.org <http://org.apache.spark.rpc.RpcTimeout.org> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)

        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)

        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)

        at org.apache.spark.executor.Executor.org <http://org.apache.spark.executor.Executor.org> $apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)

        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)

        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.util.concurrent.TimeoutException: Futures timed out after [25 seconds]

        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

        at scala.concurrent.Await$.result(package.scala:190)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)

        ... 14 more

16/11/07 02:09:18 WARN TaskMemoryManager: leak 1097.3 MB memory from org.apache.spark.util.collection.ExternalAppendOnlyMap@53cf605d <ma...@53cf605d> 

16/11/07 02:09:31 ERROR Executor: Exception in task 140.0 in stage 15.0 (TID 2439)

java.lang.OutOfMemoryError: GC overhead limit exceeded

        at net.jpountz.xxhash.StreamingXXHash32.asChecksum(StreamingXXHash32.java:81)

        at org.apache.spark.io <http://org.apache.spark.io> .LZ4BlockInputStream.<init>(LZ4BlockInputStream.java:94)

        at org.apache.spark.io <http://org.apache.spark.io> .LZ4BlockInputStream.<init>(LZ4BlockInputStream.java:104)

        at org.apache.spark.io <http://org.apache.spark.io> .LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:118)

        at org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:116)

        at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStoreShuffleReader.scala:56)

        at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStoreShuffleReader.scala:55)

        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)

        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

        at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)

        at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)

        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)

        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)

        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)

        at org.apache.spark.scheduler.Task.run(Task.scala:86)

        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

16/11/07 02:09:38 ERROR Executor: Exception in task 128.0 in stage 15.0 (TID 2436)

java.lang.OutOfMemoryError: Java heap space

        at com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io> .Output.require(Output.java:168)

        at com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io> .Output.writeLong(Output.java:519)

        at com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io> .Output.writeDouble(Output.java:648)

        at com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io> .Output.writeDoubles(Output.java:729)

        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:216)

        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:205)

        at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:606)

        at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:87)

        at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)

        at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)

        at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366)

        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:297)

        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

 


Re: Out of memory at 60GB free memory.

Posted by Daniel van der Ende <da...@gmail.com>.
Looks to me like you are actually running out of memory. You're setting
driver memory to 60G and executor memory to 30G. With only 64G available in
total, and running in local mode, you won't have any memory left. Do you
really need such a large amount of driver memory for this job?

Cheers,
Daniel

On 7 Nov 2016 6:35 a.m., "Kürşat Kurt" <ku...@kursatkurt.com> wrote:

Hi;

I am trying to use Naive Bayes for multi-class classification.

I am getting OOM at “pipeline.fit(train)” line. When i submit the code,
everything is ok so far the stage “collect at NaiveBayes.scala:400”.

At this stage, starting 375 tasks very fast and going slowing down at this
point. Task count could not became 500, getting OOM at 380-390th task.



*Spark-submit parameters: *

./spark-submit --class main.scala.Test1 --master local[8]  --driver-memory
60g  /projects/simple-project_2.11-1.0.jar



*System properties:*

Mode: client

Free Mem: 60GB(Total 64GB)

OS: Ubuntu 14.04

Core : 8

Java:1.8



Code:



    object Test {



      var num = 50;

      var savePath = "hdfs://localhost:54310/SparkWork/SparkModel/";

      var stemmer = Resha.Instance



      var STOP_WORDS: Set[String] = Set();



      def cropSentence(s: String) = {

        s.replaceAll("\\([^\\)]*\\)", "")

          .replaceAll(" - ", " ")

          .replaceAll("-", " ")

          .replaceAll(" tr. ", " ")

          .replaceAll("  +", " ")

          .replaceAll(",", " ").trim();   }



      def main(args: Array[String]): Unit = {



        val start1 = System.currentTimeMillis();



        val sc = new SparkConf().setAppName("Test")

        .set("spark.hadoop.validateOutputSpecs", "false")

        .set("spark.serializer","org.apache.spark.serializer.
KryoSerializer")

        .set("spark.kryoserializer.buffer.max","1g")

        .set("spark.driver.maxResultSize","20g")

        .set("spark.executor.memory", "30g")

        .set("spark.executor.cores", "6")

        .set("spark.speculation", "true")

        .set("spark.dynamicAllocation.enabled","true")

        .set("spark.files.overwrite","true")



        .set("spark.executor.heartbeatInterval","25s")

        .set("spark.sql.shuffle.partitions","2")

        .set("spark.sql.warehouse.dir", savePath+"wh")



        val spark = SparkSession.builder.appName("Java Spark").config(sc).
getOrCreate();

        import spark.implicits._



        val mainDataset = spark.sparkContext.textFile(
savePath+"classifications.csv")

          .map( _.split("ß"))

          .map(tokens => {

             var list=new ListBuffer[String]();

          var token0=cropSentence(tokens(0).toLowerCase(Locale.
forLanguageTag("TR-tr")));



          token0.split("\\s+").map {list+=stemmer.stem(_)}

          (tokens(1), list.toList.mkString(" "))

          }).toDF("className","productName");



         val classIndexer = new StringIndexer()

          .setInputCol("className")

          .setOutputCol("label");



        val classIndexerModel = classIndexer.fit(mainDataset);

        var mainDS=classIndexerModel.transform(mainDataset);

        classIndexerModel.write.overwrite.save(savePath + "ClassIndexer");





        mainDS.write.mode(SaveMode.Overwrite).parquet(savePath+"processed");

        mainDS=spark.sqlContext.read.load(savePath+"processed")

         //Tokenizer

                  val tokenizer = new Tokenizer()


                               .setInputCol("productName")


                               .setOutputCol("words_nonfiltered")

                               ;



        //StopWords

                  val remover = new StopWordsRemover()

                                 .setInputCol("words_nonfiltered")

                                 .setOutputCol("words")

                                 .setStopWords(
Array[String]("word1","word2","-","//"));



        //CountVectorize



                  val countVectorizer = new CountVectorizer()

                                 .setInputCol("words")

                                 .setOutputCol("features")





                  val nb = new NaiveBayes()

                   .setSmoothing(0.1)

                   .setModelType("multinomial")



               val pipeline = new Pipeline().setStages(Array(
tokenizer,remover,countVectorizer,nb));





               val train =mainDS.repartition(500);

               val model = pipeline.fit(train);

               model.write.overwrite.save(savePath+"
RandomForestClassifier");





      } }















*Log:*

16/11/07 02:07:28 INFO Executor: Finished task 116.0 in stage 15.0 (TID
2433). 1025857381 bytes result sent via BlockManager)

16/11/07 02:07:28 INFO TaskSetManager: Finished task 44.0 in stage 15.0
(TID 2415) in 175757 ms on localhost (384/500)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 140.0 in stage 15.0
(TID 2439, localhost, partition 140, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 140.0 in stage 15.0 (TID 2439)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 144.0 in stage 15.0
(TID 2440, localhost, partition 144, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 144.0 in stage 15.0 (TID 2440)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 148.0 in stage 15.0
(TID 2441, localhost, partition 148, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 148.0 in stage 15.0 (TID 2441)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 152.0 in stage 15.0
(TID 2442, localhost, partition 152, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 152.0 in stage 15.0 (TID 2442)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 156.0 in stage 15.0
(TID 2443, localhost, partition 156, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 156.0 in stage 15.0 (TID 2443)

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 494 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote
fetches in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 490 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote
fetches in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 497 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote
fetches in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 496 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote
fetches in 1 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 495 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote
fetches in 1 ms

16/11/07 02:07:43 INFO BlockManagerInfo: Removed taskresult_2412 on
89.163.242.124:49610 in memory (size: 979.2 MB, free: 8.1 GB)

16/11/07 02:08:32 WARN NettyRpcEndpointRef: Error sending message [message
= Heartbeat(driver,[Lscala.Tuple2;@544bf77,BlockManagerId(driver,
89.163.242.124, 49610))] in 1 attempts

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [25
seconds]. This timeout is controlled by spark.executor.heartbeatInterval

        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$
createRpcTimeoutException(RpcTimeout.scala:48)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
applyOrElse(RpcTimeout.scala:63)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
applyOrElse(RpcTimeout.scala:59)

        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)

        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(
RpcEndpointRef.scala:102)

        at org.apache.spark.executor.Executor.org$apache$spark$
executor$Executor$$reportHeartBeat(Executor.scala:518)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
1.apply$mcV$sp(Executor.scala:547)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
1.apply(Executor.scala:547)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
1.apply(Executor.scala:547)

        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.
scala:1877)

        at org.apache.spark.executor.Executor$$anon$1.run(Executor.
scala:547)

        at java.util.concurrent.Executors$RunnableAdapter.
call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at java.util.concurrent.ScheduledThreadPoolExecutor$
ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

        at java.util.concurrent.ScheduledThreadPoolExecutor$
ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[25 seconds]

        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.
scala:219)

        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.
scala:223)

        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(
BlockContext.scala:53)

        at scala.concurrent.Await$.result(package.scala:190)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)

        ... 14 more

16/11/07 02:09:04 WARN NettyRpcEndpointRef: Error sending message [message
= Heartbeat(driver,[Lscala.Tuple2;@544bf77,BlockManagerId(driver,
89.163.242.124, 49610))] in 2 attempts

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [25
seconds]. This timeout is controlled by spark.executor.heartbeatInterval

        at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$
createRpcTimeoutException(RpcTimeout.scala:48)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
applyOrElse(RpcTimeout.scala:63)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
applyOrElse(RpcTimeout.scala:59)

        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)

        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(
RpcEndpointRef.scala:102)

        at org.apache.spark.executor.Executor.org$apache$spark$
executor$Executor$$reportHeartBeat(Executor.scala:518)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
1.apply$mcV$sp(Executor.scala:547)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
1.apply(Executor.scala:547)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
1.apply(Executor.scala:547)

        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.
scala:1877)

        at org.apache.spark.executor.Executor$$anon$1.run(Executor.
scala:547)

        at java.util.concurrent.Executors$RunnableAdapter.
call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at java.util.concurrent.ScheduledThreadPoolExecutor$
ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

        at java.util.concurrent.ScheduledThreadPoolExecutor$
ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[25 seconds]

        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.
scala:219)

        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.
scala:223)

        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(
BlockContext.scala:53)

        at scala.concurrent.Await$.result(package.scala:190)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)

        ... 14 more

16/11/07 02:09:18 WARN TaskMemoryManager: leak 1097.3 MB memory from
org.apache.spark.util.collection.ExternalAppendOnlyMap@53cf605d

16/11/07 02:09:31 ERROR Executor: Exception in task 140.0 in stage 15.0
(TID 2439)

java.lang.OutOfMemoryError: GC overhead limit exceeded

        at net.jpountz.xxhash.StreamingXXHash32.asChecksum(
StreamingXXHash32.java:81)

        at org.apache.spark.io.LZ4BlockInputStream.<init>(
LZ4BlockInputStream.java:94)

        at org.apache.spark.io.LZ4BlockInputStream.<init>(
LZ4BlockInputStream.java:104)

        at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(
CompressionCodec.scala:118)

        at org.apache.spark.serializer.SerializerManager.wrapForCompression(
SerializerManager.scala:116)

        at org.apache.spark.shuffle.BlockStoreShuffleReader$$
anonfun$2.apply(BlockStoreShuffleReader.scala:56)

        at org.apache.spark.shuffle.BlockStoreShuffleReader$$
anonfun$2.apply(BlockStoreShuffleReader.scala:55)

        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

        at org.apache.spark.util.CompletionIterator.hasNext(
CompletionIterator.scala:32)

        at org.apache.spark.InterruptibleIterator.hasNext(
InterruptibleIterator.scala:39)

        at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(
ExternalAppendOnlyMap.scala:154)

        at org.apache.spark.Aggregator.combineCombinersByKey(
Aggregator.scala:50)

        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(
BlockStoreShuffleReader.scala:85)

        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)

        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
scala:70)

        at org.apache.spark.scheduler.Task.run(Task.scala:86)

        at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:274)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

16/11/07 02:09:38 ERROR Executor: Exception in task 128.0 in stage 15.0
(TID 2436)

java.lang.OutOfMemoryError: Java heap space

        at com.esotericsoftware.kryo.io.Output.require(Output.java:168)

        at com.esotericsoftware.kryo.io.Output.writeLong(Output.java:519)

        at com.esotericsoftware.kryo.io.Output.writeDouble(Output.java:648)

        at com.esotericsoftware.kryo.io.Output.writeDoubles(Output.java:729)

        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
DoubleArraySerializer.write(DefaultArraySerializers.java:216)

        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
DoubleArraySerializer.write(DefaultArraySerializers.java:205)

        at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:606)

        at com.esotericsoftware.kryo.serializers.ObjectField.write(
ObjectField.java:87)

        at com.esotericsoftware.kryo.serializers.FieldSerializer.
write(FieldSerializer.java:518)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at com.twitter.chill.Tuple2Serializer.write(
TupleSerializers.scala:37)

        at com.twitter.chill.Tuple2Serializer.write(
TupleSerializers.scala:33)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at com.twitter.chill.Tuple2Serializer.write(
TupleSerializers.scala:37)

        at com.twitter.chill.Tuple2Serializer.write(
TupleSerializers.scala:33)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
ObjectArraySerializer.write(DefaultArraySerializers.java:366)

        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
ObjectArraySerializer.write(DefaultArraySerializers.java:307)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at org.apache.spark.serializer.KryoSerializerInstance.
serialize(KryoSerializer.scala:297)

        at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:313)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)