You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sameer Tilak <ss...@live.com> on 2014/06/26 18:30:31 UTC

Serialization of objects

Hi everyone,
Aaron, thanks for your help so far. I am trying to serialize objects that I instantiate from a 3rd party library namely instances of com.wcohen.ss.Jaccard, and com.wcohen.ss.BasicStringWrapper. However, I am having problems with serialization. I am (at least trying to) using Kryo for serialization. I  am still facing the serialization issue. I get "org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper" Any help with this will be great.  Scala code:
package approxstrmatch
import com.wcohen.ss.BasicStringWrapper;import com.wcohen.ss.Jaccard;
import java.util.Iterator;
import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.SparkConf
import org.apache.spark.rdd;import org.apache.spark.rdd.RDD;
import com.esotericsoftware.kryo.Kryoimport org.apache.spark.serializer.KryoRegistrator
class MyRegistrator extends KryoRegistrator {  override def registerClasses(kryo: Kryo) {    kryo.register(classOf[approxstrmatch.JaccardScore])    kryo.register(classOf[com.wcohen.ss.BasicStringWrapper])    kryo.register(classOf[com.wcohen.ss.Jaccard])
  }}
class JaccardScore  {
  val mjc = new Jaccard()  with Serializable  val conf = new SparkConf().setMaster("spark://pzxnvm2018:7077").setAppName("ApproxStrMatch")  conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  conf.set("spark.kryo.registrator", "approxstrmatch.MyRegistrator")
  val sc = new SparkContext(conf)
  def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String])  {  val jc_ = this.mjc
  var i: Int = 0  for (sentence <- sourcerdd.toLocalIterator)   {    val str1 = new BasicStringWrapper (sentence)        var scorevector = destrdd.map(x => jc_.score(str1, new BasicStringWrapper(x)))        val fileName = new String("/apps/software/scala-approsstrmatch-sentence" + i)        scorevector.saveAsTextFile(fileName)        i += 1   }
  }
Here is the script: val distFile = sc.textFile("hdfs://serverip:54310/data/dummy/sample.txt"); val srcFile = sc.textFile("hdfs://serverip:54310/data/dummy/test.txt"); val score = new approxstrmatch.JaccardScore() score.calculateScoreSecond(srcFile, distFile) 
O/P:
14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at textFile at <console>:12), which has no missing parents14/06/25 12:32:05 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at textFile at <console>:12)14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)14/06/25 12:32:05 INFO TaskSetManager: Serialized task 0.0:0 as 1879 bytes in 4 ms14/06/25 12:32:05 INFO Executor: Running task ID 014/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/approxstrmatch.jar with timestamp 140372470156414/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/approxstrmatch.jar to /tmp/fetchFileTemp8194323811657370518.tmp14/06/25 12:32:05 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar to class loader14/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/secondstring-20140618.jar with timestamp 140372470156214/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/secondstring-20140618.jar to /tmp/fetchFileTemp8711755318201511766.tmp14/06/25 12:32:06 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar to class loader14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 locally14/06/25 12:32:06 INFO HadoopRDD: Input split: hdfs://serverip:54310/data/dummy/test.txt:0+14014/06/25 12:32:06 INFO Executor: Serialized size of result for 0 is 71714/06/25 12:32:06 INFO Executor: Sending result for 0 directly to driver14/06/25 12:32:06 INFO Executor: Finished task ID 014/06/25 12:32:06 INFO TaskSetManager: Finished TID 0 in 227 ms on localhost (progress: 1/1)14/06/25 12:32:06 INFO DAGScheduler: Completed ResultTask(0, 0)14/06/25 12:32:06 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool14/06/25 12:32:06 INFO DAGScheduler: Stage 0 (apply at Iterator.scala:371) finished in 0.242 s14/06/25 12:32:06 INFO SparkContext: Job finished: apply at Iterator.scala:371, took 0.34204941 s14/06/25 12:32:06 INFO FileInputFormat: Total input paths to process : 114/06/25 12:32:06 INFO SparkContext: Starting job: saveAsTextFile at JaccardScore.scala:5214/06/25 12:32:06 INFO DAGScheduler: Got job 1 (saveAsTextFile at JaccardScore.scala:52) with 2 output partitions (allowLocal=false)14/06/25 12:32:06 INFO DAGScheduler: Final stage: Stage 1(saveAsTextFile at JaccardScore.scala:52)14/06/25 12:32:06 INFO DAGScheduler: Parents of final stage: List()14/06/25 12:32:06 INFO DAGScheduler: Missing parents: List()14/06/25 12:32:06 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[5] at saveAsTextFile at JaccardScore.scala:52), which has no missing parents14/06/25 12:32:06 INFO DAGScheduler: Failed to run saveAsTextFile at JaccardScore.scala:52org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)	at akka.actor.ActorCell.invoke(ActorCell.scala:456)	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)	at akka.dispatch.Mailbox.run(Mailbox.scala:219)	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 		 	   		  

Re: Serialization of objects

Posted by Aaron Davidson <il...@gmail.com>.
If you want to stick with Java serialization and need to serialize a
non-Serializable object, your best choices are probably to either subclass
it with a Serializable one or wrap it in a class of your own which
implements its own writeObject/readObject methods (see here:
http://stackoverflow.com/questions/6163872/how-to-serialize-a-non-serializable-in-java
)

Otherwise you can use Kryo to register custom serializers for other
people's objects.


On Mon, Jun 30, 2014 at 1:52 PM, Sameer Tilak <ss...@live.com> wrote:

> Hi everyone,
> I was able to solve this issue. For now I changed the library code and
> added the following to the class com.wcohen.ss.BasicStringWrapper:
>
> public class BasicStringWrapper implements  Serializable
>
> However, I am still curious to know ho to get around the issue when you
> don't have access to the code and you are using a 3rd party jar.
>
>
> ------------------------------
> From: sstilak@live.com
> To: user@spark.incubator.apache.org
> Subject: Serialization of objects
> Date: Thu, 26 Jun 2014 09:30:31 -0700
>
>
> Hi everyone,
>
> Aaron, thanks for your help so far. I am trying to serialize objects that
> I instantiate from a 3rd party library namely instances of com.wcohen.ss.Jaccard,
> and com.wcohen.ss.BasicStringWrapper. However, I am having problems with
> serialization. I am (at least trying to) using Kryo for serialization. I
>  am still facing the serialization issue. I get "org.apache.spark.SparkException:
> Job aborted due to stage failure: Task not serializable:
> java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper" Any
> help with this will be great.
> Scala code:
>
> package approxstrmatch
>
> import com.wcohen.ss.BasicStringWrapper;
> import com.wcohen.ss.Jaccard;
>
> import java.util.Iterator;
>
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> import org.apache.spark.SparkConf
>
> import org.apache.spark.rdd;
> import org.apache.spark.rdd.RDD;
>
> import com.esotericsoftware.kryo.Kryo
> import org.apache.spark.serializer.KryoRegistrator
>
> class MyRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
>     kryo.register(classOf[approxstrmatch.JaccardScore])
>     kryo.register(classOf[com.wcohen.ss.BasicStringWrapper])
>     kryo.register(classOf[com.wcohen.ss.Jaccard])
>
>   }
> }
>
>  class JaccardScore  {
>
>   val mjc = new Jaccard()  with Serializable
>   val conf = new
> SparkConf().setMaster("spark://pzxnvm2018:7077").setAppName("ApproxStrMatch")
>   conf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>   conf.set("spark.kryo.registrator", "approxstrmatch.MyRegistrator")
>
>   val sc = new SparkContext(conf)
>
>   def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String])
>  {
>   val jc_ = this.mjc
>
>   var i: Int = 0
>   for (sentence <- sourcerdd.toLocalIterator)
>    {    val str1 = new BasicStringWrapper (sentence)
>         var scorevector = destrdd.map(x => jc_.score(str1, new
> BasicStringWrapper(x)))
>         val fileName = new
> String("/apps/software/scala-approsstrmatch-sentence" + i)
>         scorevector.saveAsTextFile(fileName)
>         i += 1
>    }
>
>   }
>
> Here is the script:
>  val distFile = sc.textFile("hdfs://serverip:54310/data/dummy/sample.txt");
>  val srcFile = sc.textFile("hdfs://serverip:54310/data/dummy/test.txt");
>  val score = new approxstrmatch.JaccardScore()
>  score.calculateScoreSecond(srcFile, distFile)
>
> O/P:
>
> 14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
> textFile at <console>:12), which has no missing parents
> 14/06/25 12:32:05 INFO DAGScheduler: Submitting 1 missing tasks from Stage
> 0 (MappedRDD[3] at textFile at <console>:12)
> 14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
> 14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
> executor localhost: localhost (PROCESS_LOCAL)
> 14/06/25 12:32:05 INFO TaskSetManager: Serialized task 0.0:0 as 1879 bytes
> in 4 ms
> 14/06/25 12:32:05 INFO Executor: Running task ID 0
> 14/06/25 12:32:05 INFO Executor: Fetching
> http://serverip:47417/jars/approxstrmatch.jar with timestamp 1403724701564
> 14/06/25 12:32:05 INFO Utils: Fetching
> http://serverip:47417/jars/approxstrmatch.jar to
> /tmp/fetchFileTemp8194323811657370518.tmp
> 14/06/25 12:32:05 INFO Executor: Adding
> file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar to
> class loader
> 14/06/25 12:32:05 INFO Executor: Fetching
> http://serverip:47417/jars/secondstring-20140618.jar with timestamp
> 1403724701562
> 14/06/25 12:32:05 INFO Utils: Fetching
> http://serverip:47417/jars/secondstring-20140618.jar to
> /tmp/fetchFileTemp8711755318201511766.tmp
> 14/06/25 12:32:06 INFO Executor: Adding
> file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar
> to class loader
> 14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 locally
> 14/06/25 12:32:06 INFO HadoopRDD: Input split:
> hdfs://serverip:54310/data/dummy/test.txt:0+140
> 14/06/25 12:32:06 INFO Executor: Serialized size of result for 0 is 717
> 14/06/25 12:32:06 INFO Executor: Sending result for 0 directly to driver
> 14/06/25 12:32:06 INFO Executor: Finished task ID 0
> 14/06/25 12:32:06 INFO TaskSetManager: Finished TID 0 in 227 ms on
> localhost (progress: 1/1)
> 14/06/25 12:32:06 INFO DAGScheduler: Completed ResultTask(0, 0)
> 14/06/25 12:32:06 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
> 14/06/25 12:32:06 INFO DAGScheduler: Stage 0 (apply at Iterator.scala:371)
> finished in 0.242 s
> 14/06/25 12:32:06 INFO SparkContext: Job finished: apply at
> Iterator.scala:371, took 0.34204941 s
> 14/06/25 12:32:06 INFO FileInputFormat: Total input paths to process : 1
> 14/06/25 12:32:06 INFO SparkContext: Starting job: saveAsTextFile at
> JaccardScore.scala:52
> 14/06/25 12:32:06 INFO DAGScheduler: Got job 1 (saveAsTextFile at
> JaccardScore.scala:52) with 2 output partitions (allowLocal=false)
>  14/06/25 12:32:06 INFO DAGScheduler: Final stage: Stage 1(saveAsTextFile
> at JaccardScore.scala:52)
> 14/06/25 12:32:06 INFO DAGScheduler: Parents of final stage: List()
> 14/06/25 12:32:06 INFO DAGScheduler: Missing parents: List()
> 14/06/25 12:32:06 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[5] at
> saveAsTextFile at JaccardScore.scala:52), which has no missing parents
> 14/06/25 12:32:06 INFO DAGScheduler: Failed to run saveAsTextFile at
> JaccardScore.scala:52
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> not serializable: java.io.NotSerializableException:
> com.wcohen.ss.BasicStringWrapper
>  at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>  at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>  at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>  at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
>  at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
>  at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
>  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>

RE: Serialization of objects

Posted by Sameer Tilak <ss...@live.com>.
Hi everyone,I was able to solve this issue. For now I changed the library code and added the following to the class com.wcohen.ss.BasicStringWrapper: 
public class BasicStringWrapper implements  Serializable

However, I am still curious to know ho to get around the issue when you don't have access to the code and you are using a 3rd party jar.

From: sstilak@live.com
To: user@spark.incubator.apache.org
Subject: Serialization of objects
Date: Thu, 26 Jun 2014 09:30:31 -0700




Hi everyone,
Aaron, thanks for your help so far. I am trying to serialize objects that I instantiate from a 3rd party library namely instances of com.wcohen.ss.Jaccard, and com.wcohen.ss.BasicStringWrapper. However, I am having problems with serialization. I am (at least trying to) using Kryo for serialization. I  am still facing the serialization issue. I get "org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper" Any help with this will be great.  Scala code:
package approxstrmatch
import com.wcohen.ss.BasicStringWrapper;import com.wcohen.ss.Jaccard;
import java.util.Iterator;
import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.SparkConf
import org.apache.spark.rdd;import org.apache.spark.rdd.RDD;
import com.esotericsoftware.kryo.Kryoimport org.apache.spark.serializer.KryoRegistrator
class MyRegistrator extends KryoRegistrator {  override def registerClasses(kryo: Kryo) {    kryo.register(classOf[approxstrmatch.JaccardScore])    kryo.register(classOf[com.wcohen.ss.BasicStringWrapper])    kryo.register(classOf[com.wcohen.ss.Jaccard])
  }}
class JaccardScore  {
  val mjc = new Jaccard()  with Serializable  val conf = new SparkConf().setMaster("spark://pzxnvm2018:7077").setAppName("ApproxStrMatch")  conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  conf.set("spark.kryo.registrator", "approxstrmatch.MyRegistrator")
  val sc = new SparkContext(conf)
  def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String])  {  val jc_ = this.mjc
  var i: Int = 0  for (sentence <- sourcerdd.toLocalIterator)   {    val str1 = new BasicStringWrapper (sentence)        var scorevector = destrdd.map(x => jc_.score(str1, new BasicStringWrapper(x)))        val fileName = new String("/apps/software/scala-approsstrmatch-sentence" + i)        scorevector.saveAsTextFile(fileName)        i += 1   }
  }
Here is the script: val distFile = sc.textFile("hdfs://serverip:54310/data/dummy/sample.txt"); val srcFile = sc.textFile("hdfs://serverip:54310/data/dummy/test.txt"); val score = new approxstrmatch.JaccardScore() score.calculateScoreSecond(srcFile, distFile) 
O/P:
14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at textFile at <console>:12), which has no missing parents14/06/25 12:32:05 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at textFile at <console>:12)14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)14/06/25 12:32:05 INFO TaskSetManager: Serialized task 0.0:0 as 1879 bytes in 4 ms14/06/25 12:32:05 INFO Executor: Running task ID 014/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/approxstrmatch.jar with timestamp 140372470156414/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/approxstrmatch.jar to /tmp/fetchFileTemp8194323811657370518.tmp14/06/25 12:32:05 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar to class loader14/06/25 12:32:05 INFO Executor: Fetching http://serverip:47417/jars/secondstring-20140618.jar with timestamp 140372470156214/06/25 12:32:05 INFO Utils: Fetching http://serverip:47417/jars/secondstring-20140618.jar to /tmp/fetchFileTemp8711755318201511766.tmp14/06/25 12:32:06 INFO Executor: Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar to class loader14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 locally14/06/25 12:32:06 INFO HadoopRDD: Input split: hdfs://serverip:54310/data/dummy/test.txt:0+14014/06/25 12:32:06 INFO Executor: Serialized size of result for 0 is 71714/06/25 12:32:06 INFO Executor: Sending result for 0 directly to driver14/06/25 12:32:06 INFO Executor: Finished task ID 014/06/25 12:32:06 INFO TaskSetManager: Finished TID 0 in 227 ms on localhost (progress: 1/1)14/06/25 12:32:06 INFO DAGScheduler: Completed ResultTask(0, 0)14/06/25 12:32:06 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool14/06/25 12:32:06 INFO DAGScheduler: Stage 0 (apply at Iterator.scala:371) finished in 0.242 s14/06/25 12:32:06 INFO SparkContext: Job finished: apply at Iterator.scala:371, took 0.34204941 s14/06/25 12:32:06 INFO FileInputFormat: Total input paths to process : 114/06/25 12:32:06 INFO SparkContext: Starting job: saveAsTextFile at JaccardScore.scala:5214/06/25 12:32:06 INFO DAGScheduler: Got job 1 (saveAsTextFile at JaccardScore.scala:52) with 2 output partitions (allowLocal=false)14/06/25 12:32:06 INFO DAGScheduler: Final stage: Stage 1(saveAsTextFile at JaccardScore.scala:52)14/06/25 12:32:06 INFO DAGScheduler: Parents of final stage: List()14/06/25 12:32:06 INFO DAGScheduler: Missing parents: List()14/06/25 12:32:06 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[5] at saveAsTextFile at JaccardScore.scala:52), which has no missing parents14/06/25 12:32:06 INFO DAGScheduler: Failed to run saveAsTextFile at JaccardScore.scala:52org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)	at akka.actor.ActorCell.invoke(ActorCell.scala:456)	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)	at akka.dispatch.Mailbox.run(Mailbox.scala:219)	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)