You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jenny Zhao <li...@gmail.com> on 2014/04/10 00:05:21 UTC

Problem with running LogisticRegression in spark cluster mode

Hi all,

I have been able to run LR in local mode,  but I am facing problem to run
it in cluster mode,  below is the source script, and stack trace when
running it cluster mode, I used sbt package to build the project, not sure
what it is complaining?

another question I have is for LogisticRegression itself:

1) I noticed, the LogisticRegressionWithSGD doesn't ask information about
the input features, for instance, if the feature is scale, norminal or
ordinal, or if MLLib only supports scale features?

2) Trainning error is pretty high even when the iteration is set to very
high, do we have number about the accuracy rate of LR model?

Thank you for your help!

/**
 * Logistic regression
 */
object SparkLogisticRegression {


  def main(args: Array[String]) {
    if ( args.length != 3) {
      System.err.println("Usage: SparkLogisticRegression <master> <input
file path> <number of iterations]  ")
      System.exit(1)
    }

    val numIterations = args(2).toInt;

    val sc = new SparkContext(args(0), "SparkLogisticRegression",
                              System.getenv("SPARK_HOME"),
                              SparkContext.jarOfClass(this.getClass))

    // parse in the input data
    val data = sc.textFile(args(1))
    val lpoints = data.map{ line =>
      val parts = line.split(',')
      LabeledPoint(parts(0).toDouble, parts.tail.map( x =>
x.toDouble).toArray)
    }

    // setup LR
    val model = LogisticRegressionWithSGD.train(lpoints, numIterations)

    val labelPred = lpoints.map { p =>
          val pred = model.predict(p.features)
          (p.label, pred)
    }

    val predErr = labelPred.filter (r => r._1 != r._2).count
    println("Training Error: " + predErr.toDouble/lpoints.count + " " +
predErr + "/" + lpoints.count)
 }

}

14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
14/04/09 14:50:48 WARN scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundException
java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
        at java.lang.Class.forName(Class.java:211)
        at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
        at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609)
        at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1768)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1988)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1795)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)
        at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
        at
org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
        at
org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
        at
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1834)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1793)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)
        at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
        at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
        at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195)
        at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:906)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:929)
        at java.lang.Thread.run(Thread.java:796)
14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 1 (task 0.0:1)
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
[duplicate 1]
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID
2 on executor 1: hdtest022.svl.ibm.com (NODE_LOCAL)
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
1696 bytes in 0 ms
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID
3 on executor 0: hdtest023.svl.ibm.com (NODE_LOCAL)
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
1696 bytes in 0 ms
14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 3 (task 0.0:0)
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
[duplicate 2]
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID
4 on executor 1: hdtest022.svl.ibm.com (NODE_LOCAL)
14/04/09 14:50:48 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
1696 bytes in 1 ms
14/04/09 14:50:49 WARN scheduler.TaskSetManager: Lost TID 4 (task 0.0:0)
14/04/09 14:50:49 INFO scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
[duplicate 3]
14/04/09 14:50:49 WARN scheduler.TaskSetManager: Lost TID 2 (task 0.0:1)
14/04/09 14:50:49 INFO scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
[duplicate 4]
14/04/09 14:50:49 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID
5 on executor 1: hdtest022.svl.ibm.com (NODE_LOCAL)
14/04/09 14:50:49 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
1696 bytes in 1 ms
14/04/09 14:50:49 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID
6 on executor 0: hdtest023.svl.ibm.com (NODE_LOCAL)
14/04/09 14:50:49 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
1696 bytes in 1 ms
14/04/09 14:50:49 WARN scheduler.TaskSetManager: Lost TID 5 (task 0.0:1)
14/04/09 14:50:49 INFO scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
[duplicate 5]
14/04/09 14:50:49 WARN scheduler.TaskSetManager: Lost TID 6 (task 0.0:0)
14/04/09 14:50:49 INFO scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
[duplicate 6]
14/04/09 14:50:49 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 4
times; aborting job
14/04/09 14:50:49 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from
pool
14/04/09 14:50:49 INFO scheduler.DAGScheduler: Failed to run collect at
SparkLinearRegression.scala:34
^[[0m[^[[31merror^[[0m] ^[[0m(run-main) org.apache.spark.SparkException:
Job aborted: Task 0.0:0 failed 4 times (most recent failure: Exception
failure: java.lang.ClassNotFoundException:
SparkLinearRegression$$anonfun$2)^[[0m
org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 4 times
(most recent failure: Exception failure: java.lang.ClassNotFoundException:
SparkLinearRegression$$anonfun$2)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
        at scala.Option.foreach(Option.scala:236)
        at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
^[[0m[^[[31mtrace^[[0m] ^[[0mStack trace suppressed: run ^[[34mlast
compile:run-main^[[0m for the full output.^[[0m
14/04/09 14:50:49 INFO network.ConnectionManager: Selector thread was
interrupted!
java.lang.RuntimeException: Nonzero exit code: 1
        at scala.sys.package$.error(package.scala:27)

Re: Problem with running LogisticRegression in spark cluster mode

Posted by Jenny Zhao <li...@gmail.com>.
Hi Jagat,

yes, I did specify mllib in build.sbt

name := "Spark LogisticRegression"

version :="1.0"

scalaVersion := "2.10.3"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" %
"0.9.0-incubating"

libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" %
"0.9.0-incubating"

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "1.2.1"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"



On Wed, Apr 9, 2014 at 3:23 PM, Jagat Singh <ja...@gmail.com> wrote:

> Hi Jenny,
>
> How are you packaging your jar.
>
> Can you please confirm if you have included the Mlib jar inside the fat
> jar you have created for your code.
>
> libraryDependencies += "org.apache.spark" % "spark-mllib_2.9.3" %
> "0.8.1-incubating"
>
> Thanks,
>
> Jagat Singh
>
>
> On Thu, Apr 10, 2014 at 8:05 AM, Jenny Zhao <li...@gmail.com>wrote:
>
>>
>> Hi all,
>>
>> I have been able to run LR in local mode,  but I am facing problem to run
>> it in cluster mode,  below is the source script, and stack trace when
>> running it cluster mode, I used sbt package to build the project, not sure
>> what it is complaining?
>>
>> another question I have is for LogisticRegression itself:
>>
>> 1) I noticed, the LogisticRegressionWithSGD doesn't ask information about
>> the input features, for instance, if the feature is scale, norminal or
>> ordinal, or if MLLib only supports scale features?
>>
>> 2) Trainning error is pretty high even when the iteration is set to very
>> high, do we have number about the accuracy rate of LR model?
>>
>> Thank you for your help!
>>
>> /**
>>  * Logistic regression
>>  */
>> object SparkLogisticRegression {
>>
>>
>>   def main(args: Array[String]) {
>>     if ( args.length != 3) {
>>       System.err.println("Usage: SparkLogisticRegression <master> <input
>> file path> <number of iterations]  ")
>>       System.exit(1)
>>     }
>>
>>     val numIterations = args(2).toInt;
>>
>>     val sc = new SparkContext(args(0), "SparkLogisticRegression",
>>                               System.getenv("SPARK_HOME"),
>>                               SparkContext.jarOfClass(this.getClass))
>>
>>     // parse in the input data
>>     val data = sc.textFile(args(1))
>>     val lpoints = data.map{ line =>
>>       val parts = line.split(',')
>>       LabeledPoint(parts(0).toDouble, parts.tail.map( x =>
>> x.toDouble).toArray)
>>     }
>>
>>     // setup LR
>>     val model = LogisticRegressionWithSGD.train(lpoints, numIterations)
>>
>>     val labelPred = lpoints.map { p =>
>>           val pred = model.predict(p.features)
>>           (p.label, pred)
>>     }
>>
>>     val predErr = labelPred.filter (r => r._1 != r._2).count
>>     println("Training Error: " + predErr.toDouble/lpoints.count + " " +
>> predErr + "/" + lpoints.count)
>>  }
>>
>> }
>>
>> 14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
>> 14/04/09 14:50:48 WARN scheduler.TaskSetManager: Loss was due to
>> java.lang.ClassNotFoundException
>> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
>>         at java.lang.Class.forName(Class.java:211)
>>         at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>>         at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609)
>>         at
>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
>>         at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1768)
>>         at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>>         at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1988)
>>         at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
>>         at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1795)
>>         at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>>         at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)
>>         at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>>         at
>> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
>>         at
>> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
>>         at
>> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1834)
>>         at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1793)
>>         at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>>         at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)
>>         at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>>         at
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195)
>>         at
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:906)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:929)
>>         at java.lang.Thread.run(Thread.java:796)
>> 14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 1 (task 0.0:1)
>> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Loss was due to
>> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
>> [duplicate 1]
>> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Starting task 0.0:1 as
>> TID 2 on executor 1: hdtest022.svl.ibm.com (NODE_LOCAL)
>> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
>> 1696 bytes in 0 ms
>> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Starting task 0.0:0 as
>> TID 3 on executor 0: hdtest023.svl.ibm.com (NODE_LOCAL)
>> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
>> 1696 bytes in 0 ms
>> 14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 3 (task 0.0:0)
>> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Loss was due to
>> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
>> [duplicate 2]
>> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Starting task 0.0:0 as
>> TID 4 on executor 1: hdtest022.svl.ibm.com (NODE_LOCAL)
>> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
>> 1696 bytes in 1 ms
>> 14/04/09 14:50:49 WARN scheduler.TaskSetManager: Lost TID 4 (task 0.0:0)
>> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Loss was due to
>> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
>> [duplicate 3]
>> 14/04/09 14:50:49 WARN scheduler.TaskSetManager: Lost TID 2 (task 0.0:1)
>> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Loss was due to
>> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
>> [duplicate 4]
>> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Starting task 0.0:1 as
>> TID 5 on executor 1: hdtest022.svl.ibm.com (NODE_LOCAL)
>> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
>> 1696 bytes in 1 ms
>> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Starting task 0.0:0 as
>> TID 6 on executor 0: hdtest023.svl.ibm.com (NODE_LOCAL)
>> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
>> 1696 bytes in 1 ms
>> 14/04/09 14:50:49 WARN scheduler.TaskSetManager: Lost TID 5 (task 0.0:1)
>> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Loss was due to
>> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
>> [duplicate 5]
>> 14/04/09 14:50:49 WARN scheduler.TaskSetManager: Lost TID 6 (task 0.0:0)
>> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Loss was due to
>> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
>> [duplicate 6]
>> 14/04/09 14:50:49 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 4
>> times; aborting job
>> 14/04/09 14:50:49 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0
>> from pool
>> 14/04/09 14:50:49 INFO scheduler.DAGScheduler: Failed to run collect at
>> SparkLinearRegression.scala:34
>> ^[[0m[^[[31merror^[[0m] ^[[0m(run-main) org.apache.spark.SparkException:
>> Job aborted: Task 0.0:0 failed 4 times (most recent failure: Exception
>> failure: java.lang.ClassNotFoundException:
>> SparkLinearRegression$$anonfun$2)^[[0m
>> org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 4 times
>> (most recent failure: Exception failure: java.lang.ClassNotFoundException:
>> SparkLinearRegression$$anonfun$2)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>         at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>         at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>>         at scala.Option.foreach(Option.scala:236)
>>         at
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>>         at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>         at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>         at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>         at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ^[[0m[^[[31mtrace^[[0m] ^[[0mStack trace suppressed: run ^[[34mlast
>> compile:run-main^[[0m for the full output.^[[0m
>> 14/04/09 14:50:49 INFO network.ConnectionManager: Selector thread was
>> interrupted!
>> java.lang.RuntimeException: Nonzero exit code: 1
>>         at scala.sys.package$.error(package.scala:27)
>>
>>
>

Re: Problem with running LogisticRegression in spark cluster mode

Posted by Jagat Singh <ja...@gmail.com>.
Hi Jenny,

How are you packaging your jar.

Can you please confirm if you have included the Mlib jar inside the fat jar
you have created for your code.

libraryDependencies += "org.apache.spark" % "spark-mllib_2.9.3" %
"0.8.1-incubating"

Thanks,

Jagat Singh


On Thu, Apr 10, 2014 at 8:05 AM, Jenny Zhao <li...@gmail.com> wrote:

>
> Hi all,
>
> I have been able to run LR in local mode,  but I am facing problem to run
> it in cluster mode,  below is the source script, and stack trace when
> running it cluster mode, I used sbt package to build the project, not sure
> what it is complaining?
>
> another question I have is for LogisticRegression itself:
>
> 1) I noticed, the LogisticRegressionWithSGD doesn't ask information about
> the input features, for instance, if the feature is scale, norminal or
> ordinal, or if MLLib only supports scale features?
>
> 2) Trainning error is pretty high even when the iteration is set to very
> high, do we have number about the accuracy rate of LR model?
>
> Thank you for your help!
>
> /**
>  * Logistic regression
>  */
> object SparkLogisticRegression {
>
>
>   def main(args: Array[String]) {
>     if ( args.length != 3) {
>       System.err.println("Usage: SparkLogisticRegression <master> <input
> file path> <number of iterations]  ")
>       System.exit(1)
>     }
>
>     val numIterations = args(2).toInt;
>
>     val sc = new SparkContext(args(0), "SparkLogisticRegression",
>                               System.getenv("SPARK_HOME"),
>                               SparkContext.jarOfClass(this.getClass))
>
>     // parse in the input data
>     val data = sc.textFile(args(1))
>     val lpoints = data.map{ line =>
>       val parts = line.split(',')
>       LabeledPoint(parts(0).toDouble, parts.tail.map( x =>
> x.toDouble).toArray)
>     }
>
>     // setup LR
>     val model = LogisticRegressionWithSGD.train(lpoints, numIterations)
>
>     val labelPred = lpoints.map { p =>
>           val pred = model.predict(p.features)
>           (p.label, pred)
>     }
>
>     val predErr = labelPred.filter (r => r._1 != r._2).count
>     println("Training Error: " + predErr.toDouble/lpoints.count + " " +
> predErr + "/" + lpoints.count)
>  }
>
> }
>
> 14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
> 14/04/09 14:50:48 WARN scheduler.TaskSetManager: Loss was due to
> java.lang.ClassNotFoundException
> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
>         at java.lang.Class.forName(Class.java:211)
>         at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>         at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609)
>         at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1768)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1988)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1795)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)
>         at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>         at
> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
>         at
> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
>         at
> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1834)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1793)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364)
>         at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>         at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195)
>         at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:906)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:929)
>         at java.lang.Thread.run(Thread.java:796)
> 14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 1 (task 0.0:1)
> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Loss was due to
> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
> [duplicate 1]
> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Starting task 0.0:1 as
> TID 2 on executor 1: hdtest022.svl.ibm.com (NODE_LOCAL)
> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
> 1696 bytes in 0 ms
> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Starting task 0.0:0 as
> TID 3 on executor 0: hdtest023.svl.ibm.com (NODE_LOCAL)
> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
> 1696 bytes in 0 ms
> 14/04/09 14:50:48 WARN scheduler.TaskSetManager: Lost TID 3 (task 0.0:0)
> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Loss was due to
> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
> [duplicate 2]
> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Starting task 0.0:0 as
> TID 4 on executor 1: hdtest022.svl.ibm.com (NODE_LOCAL)
> 14/04/09 14:50:48 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
> 1696 bytes in 1 ms
> 14/04/09 14:50:49 WARN scheduler.TaskSetManager: Lost TID 4 (task 0.0:0)
> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Loss was due to
> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
> [duplicate 3]
> 14/04/09 14:50:49 WARN scheduler.TaskSetManager: Lost TID 2 (task 0.0:1)
> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Loss was due to
> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
> [duplicate 4]
> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Starting task 0.0:1 as
> TID 5 on executor 1: hdtest022.svl.ibm.com (NODE_LOCAL)
> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
> 1696 bytes in 1 ms
> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Starting task 0.0:0 as
> TID 6 on executor 0: hdtest023.svl.ibm.com (NODE_LOCAL)
> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
> 1696 bytes in 1 ms
> 14/04/09 14:50:49 WARN scheduler.TaskSetManager: Lost TID 5 (task 0.0:1)
> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Loss was due to
> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
> [duplicate 5]
> 14/04/09 14:50:49 WARN scheduler.TaskSetManager: Lost TID 6 (task 0.0:0)
> 14/04/09 14:50:49 INFO scheduler.TaskSetManager: Loss was due to
> java.lang.ClassNotFoundException: SparkLinearRegression$$anonfun$2
> [duplicate 6]
> 14/04/09 14:50:49 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 4
> times; aborting job
> 14/04/09 14:50:49 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0
> from pool
> 14/04/09 14:50:49 INFO scheduler.DAGScheduler: Failed to run collect at
> SparkLinearRegression.scala:34
> ^[[0m[^[[31merror^[[0m] ^[[0m(run-main) org.apache.spark.SparkException:
> Job aborted: Task 0.0:0 failed 4 times (most recent failure: Exception
> failure: java.lang.ClassNotFoundException:
> SparkLinearRegression$$anonfun$2)^[[0m
> org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 4 times
> (most recent failure: Exception failure: java.lang.ClassNotFoundException:
> SparkLinearRegression$$anonfun$2)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>         at scala.Option.foreach(Option.scala:236)
>         at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ^[[0m[^[[31mtrace^[[0m] ^[[0mStack trace suppressed: run ^[[34mlast
> compile:run-main^[[0m for the full output.^[[0m
> 14/04/09 14:50:49 INFO network.ConnectionManager: Selector thread was
> interrupted!
> java.lang.RuntimeException: Nonzero exit code: 1
>         at scala.sys.package$.error(package.scala:27)
>
>