You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2014/11/02 18:51:33 UTC

[jira] [Created] (SPARK-4196) Streaming + checkpointing yields NotSerializableException for Hadoop Configuration from saveAsNewAPIHadoopFiles ?

Sean Owen created SPARK-4196:
--------------------------------

             Summary: Streaming + checkpointing yields NotSerializableException for Hadoop Configuration from saveAsNewAPIHadoopFiles ?
                 Key: SPARK-4196
                 URL: https://issues.apache.org/jira/browse/SPARK-4196
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 1.1.0
            Reporter: Sean Owen


I am reasonably sure there is some issue here in Streaming and that I'm not missing something basic, but not 100%. I went ahead and posted it as a JIRA to track, since it's come up a few times before without resolution, and right now I can't get checkpointing to work at all.

When Spark Streaming checkpointing is enabled, I see a NotSerializableException thrown for a Hadoop Configuration object, and it seems like it is not one from my user code.

Before I post my particular instance see http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3C1408135046777-12202.post@n3.nabble.com%3E for another occurrence.

I was also on customer site last week debugging an identical issue with checkpointing in a Scala-based program and they also could not enable checkpointing without hitting exactly this error.

The essence of my code is:

{code}
    final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

    JavaStreamingContextFactory streamingContextFactory = new
JavaStreamingContextFactory() {
      @Override
      public JavaStreamingContext create() {
        return new JavaStreamingContext(sparkContext, new
Duration(batchDurationMS));
      }
    };

      streamingContext = JavaStreamingContext.getOrCreate(
          checkpointDirString, sparkContext.hadoopConfiguration(),
streamingContextFactory, false);
      streamingContext.checkpoint(checkpointDirString);
{code}

It yields:

{code}
2014-10-31 14:29:00,211 ERROR OneForOneStrategy:66
org.apache.hadoop.conf.Configuration
- field (class "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9",
name: "conf$2", type: "class org.apache.hadoop.conf.Configuration")
- object (class
"org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9",
<function2>)
- field (class "org.apache.spark.streaming.dstream.ForEachDStream",
name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc",
type: "interface scala.Function2")
- object (class "org.apache.spark.streaming.dstream.ForEachDStream",
org.apache.spark.streaming.dstream.ForEachDStream@cb8016a)
...
{code}


This looks like it's due to PairRDDFunctions, as this saveFunc seems
to be  org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9
:

{code}
def saveAsNewAPIHadoopFiles(
    prefix: String,
    suffix: String,
    keyClass: Class[_],
    valueClass: Class[_],
    outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
    conf: Configuration = new Configuration
  ) {
  val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
    val file = rddToFileName(prefix, suffix, time)
    rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass,
outputFormatClass, conf)
  }
  self.foreachRDD(saveFunc)
}
{code}

Is that not a problem? but then I don't know how it would ever work in Spark. But then again I don't see why this is an issue and only when checkpointing is enabled.

Long-shot, but I wonder if it is related to closure issues like https://issues.apache.org/jira/browse/SPARK-1866



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org