You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kanwaldeep <ka...@gmail.com> on 2014/01/09 21:46:35 UTC

Error Handling on calling saveAsHadoopDataset

I'm using Spark Streaming 0.8 that reads data from Kafka does certain
aggregates and persists the data in Kafka.

I'm trying to understand what is the best handle failure conditions when we
have errors connecting to HBase. Currently the code reads the messages from
Kafka, does the aggregates and then fails on writing the DStream to HBase.
The data that we received from Kafka is not getting persisted in HBase and
gets lost. As HBase is available again we do get the new messages in HBase
but we lost the messages in HBase during the outage.

What is the best way to handle this scenario when our target database is
unavailable?

Also I'm running into an issue with setting up checkpointing on the context
as it is unable to serialize the org.apache.hadoop.mapred.JobConf object

Exception in thread "pool-6-thread-1" java.io.NotSerializableException:
org.apache.hadoop.mapred.JobConf
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1165)
	at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
	at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
	at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
	at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
	at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
	at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
	at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1359)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1155)
	at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
	at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
	at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
	at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:422)
	at
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:152)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
	at java.lang.reflect.Method.invoke(Method.java:597)
	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:950)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1482)
	at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
	at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
	at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:329)
	at org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:112)
	at org.apache.spark.streaming.Scheduler.doCheckpoint(Scheduler.scala:127)
	at
org.apache.spark.streaming.Scheduler.clearOldMetadata(Scheduler.scala:119)
	at
org.apache.spark.streaming.JobManager.org$apache$spark$streaming$JobManager$$clearJob(JobManager.scala:79)
	at
org.apache.spark.streaming.JobManager$JobHandler.run(JobManager.scala:41)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
	at java.lang.Thread.run(Thread.java:695) 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-Handling-on-calling-saveAsHadoopDataset-tp418.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Error Handling on calling saveAsHadoopDataset

Posted by Kanwaldeep <ka...@gmail.com>.
Any help on this would be great.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-Handling-on-calling-saveAsHadoopDataset-tp418p518.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.