You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/11/25 23:19:59 UTC

spark git commit: [SPARK-4196][SPARK-4602][Streaming] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles

Repository: spark
Updated Branches:
  refs/heads/master bf1a6aaac -> 8838ad7c1


[SPARK-4196][SPARK-4602][Streaming] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles

Solves two JIRAs in one shot
- Makes the ForechDStream created by saveAsNewAPIHadoopFiles serializable for checkpoints
- Makes the default configuration object used saveAsNewAPIHadoopFiles be the Spark's hadoop configuration

Author: Tathagata Das <ta...@gmail.com>

Closes #3457 from tdas/savefiles-fix and squashes the following commits:

bb4729a [Tathagata Das] Same treatment for saveAsHadoopFiles
b382ea9 [Tathagata Das] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8838ad7c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8838ad7c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8838ad7c

Branch: refs/heads/master
Commit: 8838ad7c135a585cde015dc38b5cb23314502dd9
Parents: bf1a6aa
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Nov 25 14:16:27 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Nov 25 14:16:27 2014 -0800

----------------------------------------------------------------------
 .../dstream/PairDStreamFunctions.scala          | 30 ++++++-----
 .../spark/streaming/CheckpointSuite.scala       | 56 +++++++++++++++++++-
 2 files changed, 70 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8838ad7c/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 3f03f42..98539e0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -17,20 +17,17 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.streaming.StreamingContext._
-
-import org.apache.spark.{Partitioner, HashPartitioner}
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.mapred.OutputFormat
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.streaming.{Time, Duration}
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+
+import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Duration, Time}
+import org.apache.spark.streaming.StreamingContext._
 
 /**
  * Extra functions available on DStream of (key, value) pairs through an implicit conversion.
@@ -671,11 +668,13 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       keyClass: Class[_],
       valueClass: Class[_],
       outputFormatClass: Class[_ <: OutputFormat[_, _]],
-      conf: JobConf = new JobConf
+      conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
     ) {
+    // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
+    val serializableConf = new SerializableWritable(conf)
     val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
       val file = rddToFileName(prefix, suffix, time)
-      rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
+      rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
     }
     self.foreachRDD(saveFunc)
   }
@@ -702,11 +701,14 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
       keyClass: Class[_],
       valueClass: Class[_],
       outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
-      conf: Configuration = new Configuration
+      conf: Configuration = ssc.sparkContext.hadoopConfiguration
     ) {
+    // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
+    val serializableConf = new SerializableWritable(conf)
     val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
       val file = rddToFileName(prefix, suffix, time)
-      rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
+      rdd.saveAsNewAPIHadoopFile(
+        file, keyClass, valueClass, outputFormatClass, serializableConf.value)
     }
     self.foreachRDD(saveFunc)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8838ad7c/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 77ff1ca..c97998a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -22,9 +22,14 @@ import java.nio.charset.Charset
 
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
+
 import com.google.common.io.Files
-import org.apache.hadoop.fs.{Path, FileSystem}
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.{IntWritable, Text}
+import org.apache.hadoop.mapred.TextOutputFormat
+import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
 import org.apache.spark.streaming.util.ManualClock
@@ -205,6 +210,51 @@ class CheckpointSuite extends TestSuiteBase {
     testCheckpointedOperation(input, operation, output, 7)
   }
 
+  test("recovery with saveAsHadoopFiles operation") {
+    val tempDir = Files.createTempDir()
+    try {
+      testCheckpointedOperation(
+        Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
+        (s: DStream[String]) => {
+          val output = s.map(x => (x, 1)).reduceByKey(_ + _)
+          output.saveAsHadoopFiles(
+            tempDir.toURI.toString,
+            "result",
+            classOf[Text],
+            classOf[IntWritable],
+            classOf[TextOutputFormat[Text, IntWritable]])
+          output
+        },
+        Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
+        3
+      )
+    } finally {
+      Utils.deleteRecursively(tempDir)
+    }
+  }
+
+  test("recovery with saveAsNewAPIHadoopFiles operation") {
+    val tempDir = Files.createTempDir()
+    try {
+      testCheckpointedOperation(
+        Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
+        (s: DStream[String]) => {
+          val output = s.map(x => (x, 1)).reduceByKey(_ + _)
+          output.saveAsNewAPIHadoopFiles(
+            tempDir.toURI.toString,
+            "result",
+            classOf[Text],
+            classOf[IntWritable],
+            classOf[NewTextOutputFormat[Text, IntWritable]])
+          output
+        },
+        Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
+        3
+      )
+    } finally {
+      Utils.deleteRecursively(tempDir)
+    }
+  }
 
   // This tests whether the StateDStream's RDD checkpoints works correctly such
   // that the system can recover from a master failure. This assumes as reliable,
@@ -391,7 +441,9 @@ class CheckpointSuite extends TestSuiteBase {
     logInfo("Manual clock after advancing = " + clock.time)
     Thread.sleep(batchDuration.milliseconds)
 
-    val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
+    val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
+      dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
+    }.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
     outputStream.output.map(_.flatten)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org