You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@mahout.apache.org by Simanchal <si...@gmail.com> on 2019/07/22 17:16:22 UTC

Re: Error spark-mahout when spark-submit mode cluster

unsubscribe


On Wed, Aug 1, 2018 at 6:54 AM Jaume Galí <jg...@konodrac.com> wrote:

> Hi everybody, I'm trying to build a basic recomender with Spark and Mahout
> on Scala. I use the follow mahout repo to compile mahout with scala 2.11
> and spark 2.1.2 mahout_fork <
> https://github.com/actionml/mahout/tree/sparse-speedup-13.0>
> To execute my code I use spark-submit and it run fine when I put --master
>  local but when I try to run on a cluster with --master
>  spark://vagrant-ubuntu-trusty-64:7077
> <spark://vagrant-ubuntu-trusty-64:7077> it fails always with the same error.
>
> Command (Run Fine):
>
> /opt/spark/bin/spark-submit \
> --class 'com.reco.GenerateIndicator' \
> --name recomender \
> --master local \
> target/scala-2.11/recomender-0.0.1.jar
> Command (ERROR):
>
> /opt/spark/bin/spark-submit \
> --class 'com.reco.GenerateIndicator' \
> --name recomender \
> --master spark <spark://vagrant-ubuntu-trusty-64:7077>:
> <spark://vagrant-ubuntu-trusty-64:7077>//vagrant-ubuntu-trusty-64:7077
> <spark://vagrant-ubuntu-trusty-64:7077> \
> target/scala-2.11/recomender-0.0.1.jar
> Dependencies on Build.sbt :
>
> name := "recomender"
> version := "0.0.1"
> scalaVersion := "2.11.11"
> val mahoutVersion = "0.13.0"
> val sparkVersion = "2.1.2"
>
> libraryDependencies ++= {
>   Seq(
>     "org.apache.spark"        %% "spark-core" % sparkVersion % "provided" ,
>     "org.apache.spark"        %% "spark-sql" % sparkVersion % "provided" ,
>     "org.apache.spark"        %% "spark-mllib" % sparkVersion % "provided",
>     /* Mahout */
>     "org.apache.mahout" %% "mahout-spark" % mahoutVersion
>       exclude("org.apache.spark", "spark-core_2.11")
>       exclude("org.apache.spark", "spark-sql_2.11"),
>     "org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
>     "org.apache.mahout" % "mahout-math" % mahoutVersion,
>     "org.apache.mahout" % "mahout-hdfs" % mahoutVersion
>       exclude("com.thoughtworks.xstream", "xstream")
>       exclude("org.apache.hadoop", "hadoop-client")
>   )
> }
>
> resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo"
> resolvers += Resolver.mavenLocal
>
> …
> Main class:
>
> package com.reco
>
> import org.apache.mahout.sparkbindings.SparkDistributedContext
> import org.apache.mahout.sparkbindings._
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.SparkSession
>
> object GenerateIndicator {
>
>   def main(args: Array[String]) {
>     try {
>
>       // Create spark-conf
>       val sparkConf = new SparkConf().setAppName("recomender")
>
>       implicit val mahoutCtx: SparkDistributedContext = mahoutSparkContext(
>         masterUrl = sparkConf.get("spark.master"),
>         appName = "recomender",
>         sparkConf = sparkConf,
>         // addMahoutJars = true,
>         addMahoutJars = false
>       )
>
>       implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx)
>
>       val sparkSession = SparkSession
>         .builder()
>         .appName("recomender")
>         .config(sparkConf)
>         .getOrCreate()
>
>       val lines = returnData()
>
>       val linesRdd = sdc.sc.parallelize(lines)
>
>       println("...Collecting...")
>
>       linesRdd.collect().foreach( item => {  // ERROR HERE! on collect()
>         println(item)
>       })
>
>       // Destroy Spark Session
>       sparkSession.stop()
>       sparkSession.close()
>
>     } catch {
>       case e: Exception =>
>         println(e)
>         throw new Exception(e)
>
>     }
>
>   }
>
>   def returnData() : Array[String] = {
>     val lines = Array(
>       "17,Action",
>       "17,Comedy",
>       "17,Crime",
>       "17,Horror",
>       "17,Thriller",
>       "12,Crime",
>       "12,Thriller",
>       "16,Comedy",
>       "16,Romance",
>       "20,Drama",
>       "20,Romance",
>       "7,Drama",
>       "7,Sci-Fi",
>       // ... more lines in array ...
>       "1680,Drama",
>       "1680,Romance",
>       "1681,Comedy"
>     )
>     lines
>   }
>
> }
> Error::
>
> 18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at
> GenerateIndicator.scala:38) failed in 3.551 s due to Job aborted due to
> stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure:
> Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0):
> java.lang.IllegalStateException: unread block data
>     at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
>     at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
>     at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
>
> Driver stacktrace:
> 18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7)
> on 10.0.2.15, executor 0: java.lang.IllegalStateException (unread block
> data) [duplicate 7]
> 18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
> 18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at
> GenerateIndicator.scala:38, took 5.265593 s
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage
> 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException: unread
> block data
>     at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
>     at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
>     at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Thanks a lot for your time.
> Cheers.

Re: Error spark-mahout when spark-submit mode cluster

Posted by Andrew Musselman <an...@gmail.com>.
To unsubscribe, send mail to user-unsubscribe@mahout.apache.org.

You can leave the subject and body empty, just send anything to that
address.

Thanks, take care!

On Mon, Jul 22, 2019 at 10:16 AM Simanchal <si...@gmail.com> wrote:

> unsubscribe
>
>
> On Wed, Aug 1, 2018 at 6:54 AM Jaume Galí <jg...@konodrac.com> wrote:
>
> > Hi everybody, I'm trying to build a basic recomender with Spark and
> Mahout
> > on Scala. I use the follow mahout repo to compile mahout with scala 2.11
> > and spark 2.1.2 mahout_fork <
> > https://github.com/actionml/mahout/tree/sparse-speedup-13.0>
> > To execute my code I use spark-submit and it run fine when I put --master
> >  local but when I try to run on a cluster with --master
> >  spark://vagrant-ubuntu-trusty-64:7077
> > <spark://vagrant-ubuntu-trusty-64:7077> it fails always with the same
> error.
> >
> > Command (Run Fine):
> >
> > /opt/spark/bin/spark-submit \
> > --class 'com.reco.GenerateIndicator' \
> > --name recomender \
> > --master local \
> > target/scala-2.11/recomender-0.0.1.jar
> > Command (ERROR):
> >
> > /opt/spark/bin/spark-submit \
> > --class 'com.reco.GenerateIndicator' \
> > --name recomender \
> > --master spark <spark://vagrant-ubuntu-trusty-64:7077>:
> > <spark://vagrant-ubuntu-trusty-64:7077>//vagrant-ubuntu-trusty-64:7077
> > <spark://vagrant-ubuntu-trusty-64:7077> \
> > target/scala-2.11/recomender-0.0.1.jar
> > Dependencies on Build.sbt :
> >
> > name := "recomender"
> > version := "0.0.1"
> > scalaVersion := "2.11.11"
> > val mahoutVersion = "0.13.0"
> > val sparkVersion = "2.1.2"
> >
> > libraryDependencies ++= {
> >   Seq(
> >     "org.apache.spark"        %% "spark-core" % sparkVersion %
> "provided" ,
> >     "org.apache.spark"        %% "spark-sql" % sparkVersion % "provided"
> ,
> >     "org.apache.spark"        %% "spark-mllib" % sparkVersion %
> "provided",
> >     /* Mahout */
> >     "org.apache.mahout" %% "mahout-spark" % mahoutVersion
> >       exclude("org.apache.spark", "spark-core_2.11")
> >       exclude("org.apache.spark", "spark-sql_2.11"),
> >     "org.apache.mahout" %% "mahout-math-scala" % mahoutVersion,
> >     "org.apache.mahout" % "mahout-math" % mahoutVersion,
> >     "org.apache.mahout" % "mahout-hdfs" % mahoutVersion
> >       exclude("com.thoughtworks.xstream", "xstream")
> >       exclude("org.apache.hadoop", "hadoop-client")
> >   )
> > }
> >
> > resolvers += "Local Repository" at "file://"+baseDirectory.value / "repo"
> > resolvers += Resolver.mavenLocal
> >
> > …
> > Main class:
> >
> > package com.reco
> >
> > import org.apache.mahout.sparkbindings.SparkDistributedContext
> > import org.apache.mahout.sparkbindings._
> > import org.apache.spark.SparkConf
> > import org.apache.spark.sql.SparkSession
> >
> > object GenerateIndicator {
> >
> >   def main(args: Array[String]) {
> >     try {
> >
> >       // Create spark-conf
> >       val sparkConf = new SparkConf().setAppName("recomender")
> >
> >       implicit val mahoutCtx: SparkDistributedContext =
> mahoutSparkContext(
> >         masterUrl = sparkConf.get("spark.master"),
> >         appName = "recomender",
> >         sparkConf = sparkConf,
> >         // addMahoutJars = true,
> >         addMahoutJars = false
> >       )
> >
> >       implicit val sdc: SparkDistributedContext = sc2sdc(mahoutCtx)
> >
> >       val sparkSession = SparkSession
> >         .builder()
> >         .appName("recomender")
> >         .config(sparkConf)
> >         .getOrCreate()
> >
> >       val lines = returnData()
> >
> >       val linesRdd = sdc.sc.parallelize(lines)
> >
> >       println("...Collecting...")
> >
> >       linesRdd.collect().foreach( item => {  // ERROR HERE! on collect()
> >         println(item)
> >       })
> >
> >       // Destroy Spark Session
> >       sparkSession.stop()
> >       sparkSession.close()
> >
> >     } catch {
> >       case e: Exception =>
> >         println(e)
> >         throw new Exception(e)
> >
> >     }
> >
> >   }
> >
> >   def returnData() : Array[String] = {
> >     val lines = Array(
> >       "17,Action",
> >       "17,Comedy",
> >       "17,Crime",
> >       "17,Horror",
> >       "17,Thriller",
> >       "12,Crime",
> >       "12,Thriller",
> >       "16,Comedy",
> >       "16,Romance",
> >       "20,Drama",
> >       "20,Romance",
> >       "7,Drama",
> >       "7,Sci-Fi",
> >       // ... more lines in array ...
> >       "1680,Drama",
> >       "1680,Romance",
> >       "1681,Comedy"
> >     )
> >     lines
> >   }
> >
> > }
> > Error::
> >
> > 18/08/01 14:18:53 INFO DAGScheduler: ResultStage 0 (collect at
> > GenerateIndicator.scala:38) failed in 3.551 s due to Job aborted due to
> > stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure:
> > Lost task 1.3 in stage 0.0 (TID 6, 10.0.2.15, executor 0):
> > java.lang.IllegalStateException: unread block data
> >     at
> >
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
> >     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
> >     at
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
> >     at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
> >     at
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
> >     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
> >     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
> >     at
> >
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
> >     at
> >
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
> >     at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
> >     at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >     at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >     at java.lang.Thread.run(Thread.java:748)
> >
> > Driver stacktrace:
> > 18/08/01 14:18:53 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 7)
> > on 10.0.2.15, executor 0: java.lang.IllegalStateException (unread block
> > data) [duplicate 7]
> > 18/08/01 14:18:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
> tasks
> > have all completed, from pool
> > 18/08/01 14:18:53 INFO DAGScheduler: Job 0 failed: collect at
> > GenerateIndicator.scala:38, took 5.265593 s
> > org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> > in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage
> > 0.0 (TID 6, 10.0.2.15, executor 0): java.lang.IllegalStateException:
> unread
> > block data
> >     at
> >
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2773)
> >     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1599)
> >     at
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
> >     at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
> >     at
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
> >     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
> >     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
> >     at
> >
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
> >     at
> >
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
> >     at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:301)
> >     at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >     at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >     at java.lang.Thread.run(Thread.java:748)
> > Thanks a lot for your time.
> > Cheers.
>