You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2014/11/11 13:54:33 UTC

[jira] [Updated] (SPARK-4196) Streaming + checkpointing + saveAsNewAPIHadoopFiles = NotSerializableException for Hadoop Configuration

     [ https://issues.apache.org/jira/browse/SPARK-4196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen updated SPARK-4196:
-----------------------------
    Summary: Streaming + checkpointing + saveAsNewAPIHadoopFiles = NotSerializableException for Hadoop Configuration  (was: Streaming + checkpointing yields NotSerializableException for Hadoop Configuration from saveAsNewAPIHadoopFiles ?)

More info. The problem is that {{CheckpointWriter}} serializes the {{DStreamGraph}} when checkpointing is enabled. In the case of, for example, {{saveAsNewAPIHadoopFiles}}, this includes a {{ForEachDStream}} with a reference to a Hadoop {{Configuration}}.

This isn't a problem without checkpointing because Spark is not going to need to serialize this {{ForEachDStream}} closure to execute it in general. But it does to checkpoint it.

Does that make sense? I'm not sure what to do but this is presenting a significant problem to me as I can't see a sly workaround to make streaming, with saving Hadoop files, with checkpointing, to work.


Here's a cobbled-together test that shows the problem:

{code}
  test("recovery with save to HDFS stream") {
    // Set up the streaming context and input streams
    val testDir = Utils.createTempDir()
    val outDir = Utils.createTempDir()
    var ssc = new StreamingContext(master, framework, Seconds(1))
    ssc.checkpoint(checkpointDir)
    val fileStream = ssc.textFileStream(testDir.toString)
    for (i <- Seq(1, 2, 3)) {
      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
      // wait to make sure that the file is written such that it gets shown in the file listings
    }

    val reducedStream = fileStream.map(x => (x, x)).saveAsNewAPIHadoopFiles(
      outDir.toURI.toString,
      "saveAsNewAPIHadoopFilesTest",
      classOf[Text],
      classOf[Text],
      classOf[TextOutputFormat[Text,Text]],
      ssc.sparkContext.hadoopConfiguration)

    ssc.start()
    ssc.awaitTermination(5000)
    ssc.stop()

    val checkpointDirFile = new File(checkpointDir)
    assert(outDir.listFiles().length > 0)
    assert(checkpointDirFile.listFiles().length == 1)
    assert(checkpointDirFile.listFiles()(0).listFiles().length > 0)

    Utils.deleteRecursively(testDir)
    Utils.deleteRecursively(outDir)
  }
{code}

You'll see the {{NotSerializableException}} clearly if you hack {{Checkpoint.write()}}:

{code}
  def write(checkpoint: Checkpoint) {
    val bos = new ByteArrayOutputStream()
    val zos = compressionCodec.compressedOutputStream(bos)
    val oos = new ObjectOutputStream(zos)
    try {
      oos.writeObject(checkpoint)
    } catch {
      case e: Exception =>
        e.printStackTrace()
        throw e
    }
    ...
{code}

> Streaming + checkpointing + saveAsNewAPIHadoopFiles = NotSerializableException for Hadoop Configuration
> -------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-4196
>                 URL: https://issues.apache.org/jira/browse/SPARK-4196
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.1.0
>            Reporter: Sean Owen
>
> I am reasonably sure there is some issue here in Streaming and that I'm not missing something basic, but not 100%. I went ahead and posted it as a JIRA to track, since it's come up a few times before without resolution, and right now I can't get checkpointing to work at all.
> When Spark Streaming checkpointing is enabled, I see a NotSerializableException thrown for a Hadoop Configuration object, and it seems like it is not one from my user code.
> Before I post my particular instance see http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3C1408135046777-12202.post@n3.nabble.com%3E for another occurrence.
> I was also on customer site last week debugging an identical issue with checkpointing in a Scala-based program and they also could not enable checkpointing without hitting exactly this error.
> The essence of my code is:
> {code}
>     final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
>     JavaStreamingContextFactory streamingContextFactory = new
> JavaStreamingContextFactory() {
>       @Override
>       public JavaStreamingContext create() {
>         return new JavaStreamingContext(sparkContext, new
> Duration(batchDurationMS));
>       }
>     };
>       streamingContext = JavaStreamingContext.getOrCreate(
>           checkpointDirString, sparkContext.hadoopConfiguration(),
> streamingContextFactory, false);
>       streamingContext.checkpoint(checkpointDirString);
> {code}
> It yields:
> {code}
> 2014-10-31 14:29:00,211 ERROR OneForOneStrategy:66
> org.apache.hadoop.conf.Configuration
> - field (class "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9",
> name: "conf$2", type: "class org.apache.hadoop.conf.Configuration")
> - object (class
> "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9",
> <function2>)
> - field (class "org.apache.spark.streaming.dstream.ForEachDStream",
> name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc",
> type: "interface scala.Function2")
> - object (class "org.apache.spark.streaming.dstream.ForEachDStream",
> org.apache.spark.streaming.dstream.ForEachDStream@cb8016a)
> ...
> {code}
> This looks like it's due to PairRDDFunctions, as this saveFunc seems
> to be  org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9
> :
> {code}
> def saveAsNewAPIHadoopFiles(
>     prefix: String,
>     suffix: String,
>     keyClass: Class[_],
>     valueClass: Class[_],
>     outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
>     conf: Configuration = new Configuration
>   ) {
>   val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
>     val file = rddToFileName(prefix, suffix, time)
>     rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass,
> outputFormatClass, conf)
>   }
>   self.foreachRDD(saveFunc)
> }
> {code}
> Is that not a problem? but then I don't know how it would ever work in Spark. But then again I don't see why this is an issue and only when checkpointing is enabled.
> Long-shot, but I wonder if it is related to closure issues like https://issues.apache.org/jira/browse/SPARK-1866



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org