You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2014/11/02 18:51:33 UTC
[jira] [Created] (SPARK-4196) Streaming + checkpointing yields
NotSerializableException for Hadoop Configuration from
saveAsNewAPIHadoopFiles ?
Sean Owen created SPARK-4196:
--------------------------------
Summary: Streaming + checkpointing yields NotSerializableException for Hadoop Configuration from saveAsNewAPIHadoopFiles ?
Key: SPARK-4196
URL: https://issues.apache.org/jira/browse/SPARK-4196
Project: Spark
Issue Type: Bug
Components: Streaming
Affects Versions: 1.1.0
Reporter: Sean Owen
I am reasonably sure there is some issue here in Streaming and that I'm not missing something basic, but not 100%. I went ahead and posted it as a JIRA to track, since it's come up a few times before without resolution, and right now I can't get checkpointing to work at all.
When Spark Streaming checkpointing is enabled, I see a NotSerializableException thrown for a Hadoop Configuration object, and it seems like it is not one from my user code.
Before I post my particular instance see http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3C1408135046777-12202.post@n3.nabble.com%3E for another occurrence.
I was also on customer site last week debugging an identical issue with checkpointing in a Scala-based program and they also could not enable checkpointing without hitting exactly this error.
The essence of my code is:
{code}
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
JavaStreamingContextFactory streamingContextFactory = new
JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
return new JavaStreamingContext(sparkContext, new
Duration(batchDurationMS));
}
};
streamingContext = JavaStreamingContext.getOrCreate(
checkpointDirString, sparkContext.hadoopConfiguration(),
streamingContextFactory, false);
streamingContext.checkpoint(checkpointDirString);
{code}
It yields:
{code}
2014-10-31 14:29:00,211 ERROR OneForOneStrategy:66
org.apache.hadoop.conf.Configuration
- field (class "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9",
name: "conf$2", type: "class org.apache.hadoop.conf.Configuration")
- object (class
"org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9",
<function2>)
- field (class "org.apache.spark.streaming.dstream.ForEachDStream",
name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc",
type: "interface scala.Function2")
- object (class "org.apache.spark.streaming.dstream.ForEachDStream",
org.apache.spark.streaming.dstream.ForEachDStream@cb8016a)
...
{code}
This looks like it's due to PairRDDFunctions, as this saveFunc seems
to be org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9
:
{code}
def saveAsNewAPIHadoopFiles(
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = new Configuration
) {
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass,
outputFormatClass, conf)
}
self.foreachRDD(saveFunc)
}
{code}
Is that not a problem? but then I don't know how it would ever work in Spark. But then again I don't see why this is an issue and only when checkpointing is enabled.
Long-shot, but I wonder if it is related to closure issues like https://issues.apache.org/jira/browse/SPARK-1866
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org