You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tyson <ty...@gmail.com> on 2014/12/04 04:11:56 UTC

reading dynamoDB with spark

Hi,

I try to read data from DynamoDB table with Spark, but after I run this 
code I got an error massege like in below.
I use Spark 1.1.1 and emr-core-1.1.jar, emr-ddb-hive-1.0.jar and 
emr-ddb-hadoop-1.0.jar.

valsparkConf = SparkConf().setAppName("DynamoRdeader").setMaster("local[4]")

valctx = JavaSparkContext(sparkConf)
valjobConf = JobConf(ctx.hadoopConfiguration())
jobConf.set("dynamodb.servicename","dynamodb")
jobConf.set("dynamodb.input.tableName","<...>")
jobConf.set("dynamodb.endpoint","<...>")
jobConf.set("dynamodb.regionid","<...>")
jobConf.set("dynamodb.throughput.read","1")
jobConf.set("dynamodb.throughput.read.percent","1.5")
jobConf.set("dynamodb.max.map.tasks","2")

jobConf.set("dynamodb.awsAccessKeyId","<...>")

jobConf.set("dynamodb.awsSecretAccessKey","<...>")

jobConf.set("mapred.input.format.class", javaClass<DynamoDBInputFormat>().getName())




varusers = ctx.hadoopRDD(jobConf,javaClass<DynamoDBInputFormat>(),javaClass<Text>(),javaClass<DynamoDBItemWritable>())

users.collect().forEach{
println(it)
}




Exception in thread "main" org.apache.spark.SparkException: Job aborted 
due to stage failure: Task serialization failed: java.io.IOException: 
java.lang.NullPointerException
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:930)
org.apache.spark.SerializableWritable.writeObject(SerializableWritable.scala:34)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:867)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753)
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
akka.actor.ActorCell.invoke(ActorCell.scala:456)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
akka.dispatch.Mailbox.run(Mailbox.scala:219)
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
     at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
     at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:874)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769)
     at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753)
     at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360)
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
     at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
     at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
     at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
     at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Could you give me any suggastions how can I fix it?

Thank you,
Istvan