You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Robert Fink <rf...@robertfink.de> on 2013/11/11 04:45:30 UTC

Spark & Avro in Scala

Hi,

I am trying to get the following minimal Scala example work: Using Spark to
process Avro records. Here's my dummy Avro definition:

{
  "namespace": "com.avrotest",
  "type": "record",
  "name": "AvroTest",
  "fields": [
    {"name": "field1", "type": ["string", "null"]}
  ]
}

I experiment with a simple job that creates three AvroTest objects, writes
them out to a file through a SparkContext, and then reads in the thus
generated Avro file and performs a simple grouping operation:

//
---------------------------------------------------------------------------------------------------------
import org.apache.spark.SparkContext._
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.file.DataFileWriter
import org.apache.avro._
import org.apache.avro.generic._
import org.apache.hadoop.mapreduce.Job
import com.avrotest.AvroTest
import java.io.File

object SparkTest{
  def main(args: Array[String]) {

    def avrofile = "output.avro"
    def sc = new SparkContext("local", "Simple App")
    val job = new Job()

    val record1 = new AvroTest()
    record1.setField1("value1")
    val record2 = new AvroTest()
    record2.setField1("value1")
    val record3 = new AvroTest()
    record3.setField1("value2")

    def userDatumWriter = new SpecificDatumWriter[AvroTest]()
    val dataFileWriter = new DataFileWriter[AvroTest](userDatumWriter)
    def file = new File(avrofile)
    dataFileWriter.create(record1.getSchema(), file)
    dataFileWriter.append(record1)
    dataFileWriter.append(record2)
    dataFileWriter.append(record3)
    dataFileWriter.close()

    def rdd = sc.newAPIHadoopFile(
      avrofile,
      classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[AvroTest]],
      classOf[org.apache.avro.mapred.AvroKey[AvroTest]],
      classOf[org.apache.hadoop.io.NullWritable],
      job.getConfiguration)
    // rdd.foreach( x => println(x._1.datum.getField1) ) // Prints value1,
value1, value2
    val numGroups= rdd.groupBy(x => x._1.datum.getField1).count()
  }
}
//
---------------------------------------------------------------------------------------------------------

I would expect numGroups==2 in the last step, because record1 and record2
share the getField1()=="value1", and record3 has getField1() == "value2".
However, the script fails to execute with the following error (see below).
Can anyone give me a hint what could be wrong in the above code, or post an
example of reading from an Avro file and performing some simple
computations on the retrieved objects? Thank you so much! Robert.

11650 [pool-109-thread-1] WARN org.apache.avro.mapreduce.AvroKeyInputFormat
- Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.
11661 [pool-109-thread-1] INFO org.apache.avro.mapreduce.AvroKeyInputFormat
- Using a reader schema equal to the writer schema.
12293 [spark-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.local.LocalTaskSetManager - Loss was due to
java.io.NotSerializableException
java.io.NotSerializableException: org.apache.avro.mapred.AvroKey
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27)
        at
org.apache.spark.storage.DiskStore$DiskBlockObjectWriter.write(DiskStore.scala:109)
        at
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:152)
        at
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$run$1.apply(ShuffleMapTask.scala:149)
        at scala.collection.Iterator$class.foreach(Iterator.scala:772)
        at scala.collection.Iterator$$anon$19.foreach(Iterator.scala:399)
        at
org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:149)
        at
org.apache.spark.scheduler.ShuffleMapTask.run(ShuffleMapTask.scala:88)
        at
org.apache.spark.scheduler.local.LocalScheduler.runTask(LocalScheduler.scala:198)
        at
org.apache.spark.scheduler.local.LocalActor$$anonfun$launchTask$1$$anon$1.run(LocalScheduler.scala:68)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
12304 [spark-akka.actor.default-dispatcher-5] INFO
org.apache.spark.scheduler.local.LocalScheduler - Remove TaskSet 1.0 from
pool
12311 [run-main] INFO org.apache.spark.scheduler.DAGScheduler - Failed to
run count at SparkTest.scala:41
[error] (run-main) org.apache.spark.SparkException: Job failed: Task 1.0:0
failed more than 4 times; aborting job java.io.NotSerializableException:
org.apache.avro.mapred.AvroKey
org.apache.spark.SparkException: Job failed: Task 1.0:0 failed more than 4
times; aborting job java.io.NotSerializableException:
org.apache.avro.mapred.AvroKey
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
        at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
        at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:379)
        at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
        at
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)