You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2014/11/11 13:54:33 UTC
[jira] [Updated] (SPARK-4196) Streaming + checkpointing +
saveAsNewAPIHadoopFiles = NotSerializableException for Hadoop Configuration
[ https://issues.apache.org/jira/browse/SPARK-4196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen updated SPARK-4196:
-----------------------------
Summary: Streaming + checkpointing + saveAsNewAPIHadoopFiles = NotSerializableException for Hadoop Configuration (was: Streaming + checkpointing yields NotSerializableException for Hadoop Configuration from saveAsNewAPIHadoopFiles ?)
More info. The problem is that {{CheckpointWriter}} serializes the {{DStreamGraph}} when checkpointing is enabled. In the case of, for example, {{saveAsNewAPIHadoopFiles}}, this includes a {{ForEachDStream}} with a reference to a Hadoop {{Configuration}}.
This isn't a problem without checkpointing because Spark is not going to need to serialize this {{ForEachDStream}} closure to execute it in general. But it does to checkpoint it.
Does that make sense? I'm not sure what to do but this is presenting a significant problem to me as I can't see a sly workaround to make streaming, with saving Hadoop files, with checkpointing, to work.
Here's a cobbled-together test that shows the problem:
{code}
test("recovery with save to HDFS stream") {
// Set up the streaming context and input streams
val testDir = Utils.createTempDir()
val outDir = Utils.createTempDir()
var ssc = new StreamingContext(master, framework, Seconds(1))
ssc.checkpoint(checkpointDir)
val fileStream = ssc.textFileStream(testDir.toString)
for (i <- Seq(1, 2, 3)) {
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
// wait to make sure that the file is written such that it gets shown in the file listings
}
val reducedStream = fileStream.map(x => (x, x)).saveAsNewAPIHadoopFiles(
outDir.toURI.toString,
"saveAsNewAPIHadoopFilesTest",
classOf[Text],
classOf[Text],
classOf[TextOutputFormat[Text,Text]],
ssc.sparkContext.hadoopConfiguration)
ssc.start()
ssc.awaitTermination(5000)
ssc.stop()
val checkpointDirFile = new File(checkpointDir)
assert(outDir.listFiles().length > 0)
assert(checkpointDirFile.listFiles().length == 1)
assert(checkpointDirFile.listFiles()(0).listFiles().length > 0)
Utils.deleteRecursively(testDir)
Utils.deleteRecursively(outDir)
}
{code}
You'll see the {{NotSerializableException}} clearly if you hack {{Checkpoint.write()}}:
{code}
def write(checkpoint: Checkpoint) {
val bos = new ByteArrayOutputStream()
val zos = compressionCodec.compressedOutputStream(bos)
val oos = new ObjectOutputStream(zos)
try {
oos.writeObject(checkpoint)
} catch {
case e: Exception =>
e.printStackTrace()
throw e
}
...
{code}
> Streaming + checkpointing + saveAsNewAPIHadoopFiles = NotSerializableException for Hadoop Configuration
> -------------------------------------------------------------------------------------------------------
>
> Key: SPARK-4196
> URL: https://issues.apache.org/jira/browse/SPARK-4196
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.1.0
> Reporter: Sean Owen
>
> I am reasonably sure there is some issue here in Streaming and that I'm not missing something basic, but not 100%. I went ahead and posted it as a JIRA to track, since it's come up a few times before without resolution, and right now I can't get checkpointing to work at all.
> When Spark Streaming checkpointing is enabled, I see a NotSerializableException thrown for a Hadoop Configuration object, and it seems like it is not one from my user code.
> Before I post my particular instance see http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3C1408135046777-12202.post@n3.nabble.com%3E for another occurrence.
> I was also on customer site last week debugging an identical issue with checkpointing in a Scala-based program and they also could not enable checkpointing without hitting exactly this error.
> The essence of my code is:
> {code}
> final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
> JavaStreamingContextFactory streamingContextFactory = new
> JavaStreamingContextFactory() {
> @Override
> public JavaStreamingContext create() {
> return new JavaStreamingContext(sparkContext, new
> Duration(batchDurationMS));
> }
> };
> streamingContext = JavaStreamingContext.getOrCreate(
> checkpointDirString, sparkContext.hadoopConfiguration(),
> streamingContextFactory, false);
> streamingContext.checkpoint(checkpointDirString);
> {code}
> It yields:
> {code}
> 2014-10-31 14:29:00,211 ERROR OneForOneStrategy:66
> org.apache.hadoop.conf.Configuration
> - field (class "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9",
> name: "conf$2", type: "class org.apache.hadoop.conf.Configuration")
> - object (class
> "org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9",
> <function2>)
> - field (class "org.apache.spark.streaming.dstream.ForEachDStream",
> name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc",
> type: "interface scala.Function2")
> - object (class "org.apache.spark.streaming.dstream.ForEachDStream",
> org.apache.spark.streaming.dstream.ForEachDStream@cb8016a)
> ...
> {code}
> This looks like it's due to PairRDDFunctions, as this saveFunc seems
> to be org.apache.spark.streaming.dstream.PairDStreamFunctions$$anonfun$9
> :
> {code}
> def saveAsNewAPIHadoopFiles(
> prefix: String,
> suffix: String,
> keyClass: Class[_],
> valueClass: Class[_],
> outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
> conf: Configuration = new Configuration
> ) {
> val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
> val file = rddToFileName(prefix, suffix, time)
> rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass,
> outputFormatClass, conf)
> }
> self.foreachRDD(saveFunc)
> }
> {code}
> Is that not a problem? but then I don't know how it would ever work in Spark. But then again I don't see why this is an issue and only when checkpointing is enabled.
> Long-shot, but I wonder if it is related to closure issues like https://issues.apache.org/jira/browse/SPARK-1866
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org