You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/06/19 04:36:15 UTC

spark git commit: [SPARK-8135] Don't load defaults when reconstituting Hadoop Configurations

Repository: spark
Updated Branches:
  refs/heads/master dc4131389 -> 43f50decd


[SPARK-8135] Don't load defaults when reconstituting Hadoop Configurations

Author: Sandy Ryza <sa...@cloudera.com>

Closes #6679 from sryza/sandy-spark-8135 and squashes the following commits:

c5554ff [Sandy Ryza] SPARK-8135. In SerializableWritable, don't load defaults when instantiating Configuration


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43f50dec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43f50dec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43f50dec

Branch: refs/heads/master
Commit: 43f50decdd20fafc55913c56ffa30f56040090e4
Parents: dc41313
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Thu Jun 18 19:36:05 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Jun 18 19:36:05 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SerializableWritable.scala |  2 +-
 .../scala/org/apache/spark/SparkContext.scala   |  2 +-
 .../org/apache/spark/SparkHadoopWriter.scala    |  3 +-
 .../spark/api/python/PythonHadoopUtil.scala     |  6 ++--
 .../org/apache/spark/api/python/PythonRDD.scala | 12 +++----
 .../org/apache/spark/rdd/CheckpointRDD.scala    | 11 +++---
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  8 ++---
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |  4 +--
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  6 ++--
 .../apache/spark/rdd/RDDCheckpointData.scala    |  3 +-
 .../spark/util/SerializableConfiguration.scala  | 36 +++++++++++++++++++
 .../apache/spark/util/SerializableJobConf.scala | 37 ++++++++++++++++++++
 .../sql/parquet/ParquetTableOperations.scala    |  5 +--
 .../apache/spark/sql/parquet/newParquet.scala   |  7 ++--
 .../spark/sql/sources/DataSourceStrategy.scala  |  8 ++---
 .../spark/sql/sources/SqlNewHadoopRDD.scala     |  4 +--
 .../org/apache/spark/sql/sources/commands.scala |  3 +-
 .../apache/spark/sql/sources/interfaces.scala   |  6 ++--
 .../org/apache/spark/sql/hive/TableReader.scala |  9 +++--
 .../hive/execution/InsertIntoHiveTable.scala    |  7 ++--
 .../spark/sql/hive/hiveWriterContainers.scala   |  3 +-
 .../apache/spark/sql/hive/orc/OrcRelation.scala |  5 +--
 .../streaming/dstream/FileInputDStream.scala    |  5 ++-
 .../dstream/PairDStreamFunctions.scala          |  7 ++--
 .../rdd/WriteAheadLogBackedBlockRDD.scala       |  5 ++-
 .../streaming/scheduler/ReceiverTracker.scala   |  9 +++--
 26 files changed, 146 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/core/src/main/scala/org/apache/spark/SerializableWritable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
index cb2cae1..beb2e27 100644
--- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala
+++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
@@ -41,7 +41,7 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
   private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
     in.defaultReadObject()
     val ow = new ObjectWritable()
-    ow.setConf(new Configuration())
+    ow.setConf(new Configuration(false))
     ow.readFields(in)
     t = ow.get().asInstanceOf[T]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a453c9b..141276a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -974,7 +974,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
     assertNotStopped()
     // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
-    val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
+    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
     val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
     new HadoopRDD(
       this,

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 59ac82c..f5dd36c 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.rdd.HadoopRDD
+import org.apache.spark.util.SerializableJobConf
 
 /**
  * Internal helper class that saves an RDD using a Hadoop OutputFormat.
@@ -42,7 +43,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
   with Serializable {
 
   private val now = new Date()
-  private val conf = new SerializableWritable(jobConf)
+  private val conf = new SerializableJobConf(jobConf)
 
   private var jobID = 0
   private var splitID = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index c9181a2..b959b68 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -19,8 +19,8 @@ package org.apache.spark.api.python
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
-import org.apache.spark.util.Utils
-import org.apache.spark.{Logging, SerializableWritable, SparkException}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.{Logging, SparkException}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io._
 import scala.util.{Failure, Success, Try}
@@ -61,7 +61,7 @@ private[python] object Converter extends Logging {
  * Other objects are passed through without conversion.
  */
 private[python] class WritableToJavaConverter(
-    conf: Broadcast[SerializableWritable[Configuration]]) extends Converter[Any, Any] {
+    conf: Broadcast[SerializableConfiguration]) extends Converter[Any, Any] {
 
   /**
    * Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 55a37f8..dc9f62f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -36,7 +36,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.input.PortableDataStream
 import org.apache.spark.rdd.RDD
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 import scala.util.control.NonFatal
 
@@ -445,7 +445,7 @@ private[spark] object PythonRDD extends Logging {
     val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
     val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
     val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
-    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
+    val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration()))
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
       new WritableToJavaConverter(confBroadcasted))
     JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -471,7 +471,7 @@ private[spark] object PythonRDD extends Logging {
     val rdd =
       newAPIHadoopRDDFromClassNames[K, V, F](sc,
         Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
-    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+    val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
       new WritableToJavaConverter(confBroadcasted))
     JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -497,7 +497,7 @@ private[spark] object PythonRDD extends Logging {
     val rdd =
       newAPIHadoopRDDFromClassNames[K, V, F](sc,
         None, inputFormatClass, keyClass, valueClass, conf)
-    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+    val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
       new WritableToJavaConverter(confBroadcasted))
     JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -540,7 +540,7 @@ private[spark] object PythonRDD extends Logging {
     val rdd =
       hadoopRDDFromClassNames[K, V, F](sc,
         Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
-    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+    val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
       new WritableToJavaConverter(confBroadcasted))
     JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -566,7 +566,7 @@ private[spark] object PythonRDD extends Logging {
     val rdd =
       hadoopRDDFromClassNames[K, V, F](sc,
         None, inputFormatClass, keyClass, valueClass, conf)
-    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+    val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
       new WritableToJavaConverter(confBroadcasted))
     JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index a4715e3..33e6998 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -21,13 +21,12 @@ import java.io.IOException
 
 import scala.reflect.ClassTag
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
 
@@ -38,7 +37,7 @@ private[spark]
 class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
   extends RDD[T](sc, Nil) {
 
-  val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
+  val broadcastedConf = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
 
   @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
 
@@ -87,7 +86,7 @@ private[spark] object CheckpointRDD extends Logging {
 
   def writeToFile[T: ClassTag](
       path: String,
-      broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+      broadcastedConf: Broadcast[SerializableConfiguration],
       blockSize: Int = -1
     )(ctx: TaskContext, iterator: Iterator[T]) {
     val env = SparkEnv.get
@@ -135,7 +134,7 @@ private[spark] object CheckpointRDD extends Logging {
 
   def readFromFile[T](
       path: Path,
-      broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+      broadcastedConf: Broadcast[SerializableConfiguration],
       context: TaskContext
     ): Iterator[T] = {
     val env = SparkEnv.get
@@ -164,7 +163,7 @@ private[spark] object CheckpointRDD extends Logging {
     val path = new Path(hdfsPath, "temp")
     val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
     val fs = path.getFileSystem(conf)
-    val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
+    val broadcastedConf = sc.broadcast(new SerializableConfiguration(conf))
     sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _)
     val cpRDD = new CheckpointRDD[Int](sc, path.toString)
     assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2cefe63..bee59a4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.{NextIterator, Utils}
+import org.apache.spark.util.{SerializableConfiguration, NextIterator, Utils}
 import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
 import org.apache.spark.storage.StorageLevel
 
@@ -100,7 +100,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
 @DeveloperApi
 class HadoopRDD[K, V](
     @transient sc: SparkContext,
-    broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+    broadcastedConf: Broadcast[SerializableConfiguration],
     initLocalJobConfFuncOpt: Option[JobConf => Unit],
     inputFormatClass: Class[_ <: InputFormat[K, V]],
     keyClass: Class[K],
@@ -121,8 +121,8 @@ class HadoopRDD[K, V](
       minPartitions: Int) = {
     this(
       sc,
-      sc.broadcast(new SerializableWritable(conf))
-        .asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+      sc.broadcast(new SerializableConfiguration(conf))
+        .asInstanceOf[Broadcast[SerializableConfiguration]],
       None /* initLocalJobConfFuncOpt */,
       inputFormatClass,
       keyClass,

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 84456d6..f827270 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.storage.StorageLevel
 
@@ -74,7 +74,7 @@ class NewHadoopRDD[K, V](
   with Logging {
 
   // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
-  private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+  private val confBroadcast = sc.broadcast(new SerializableConfiguration(conf))
   // private val serializableConf = new SerializableWritable(conf)
 
   private val jobTrackerId: String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index cfd3e26..91a6a2d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -44,7 +44,7 @@ import org.apache.spark.executor.{DataWriteMethod, OutputMetrics}
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 import org.apache.spark.util.collection.CompactBuffer
 import org.apache.spark.util.random.StratifiedSamplingUtils
 
@@ -1002,7 +1002,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     val jobtrackerID = formatter.format(new Date())
     val stageId = self.id
-    val wrappedConf = new SerializableWritable(job.getConfiguration)
+    val wrappedConf = new SerializableConfiguration(job.getConfiguration)
     val outfmt = job.getOutputFormatClass
     val jobFormat = outfmt.newInstance
 
@@ -1065,7 +1065,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
   def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
     // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
     val hadoopConf = conf
-    val wrappedConf = new SerializableWritable(hadoopConf)
+    val wrappedConf = new SerializableConfiguration(hadoopConf)
     val outputFormatInstance = hadoopConf.getOutputFormat
     val keyClass = hadoopConf.getOutputKeyClass
     val valueClass = hadoopConf.getOutputValueClass

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 1722c27..acbd31a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
 import org.apache.spark.scheduler.{ResultTask, ShuffleMapTask}
+import org.apache.spark.util.SerializableConfiguration
 
 /**
  * Enumeration to manage state transitions of an RDD through checkpointing
@@ -91,7 +92,7 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
 
     // Save to file, and reload it as an RDD
     val broadcastedConf = rdd.context.broadcast(
-      new SerializableWritable(rdd.context.hadoopConfiguration))
+      new SerializableConfiguration(rdd.context.hadoopConfiguration))
     val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
     if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
       rdd.context.cleaner.foreach { cleaner =>

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala b/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala
new file mode 100644
index 0000000..30bcf1d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.util.Utils
+
+private[spark]
+class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
+    out.defaultWriteObject()
+    value.write(out)
+  }
+
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
+    value = new Configuration(false)
+    value.readFields(in)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala b/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala
new file mode 100644
index 0000000..afbcc6e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import org.apache.hadoop.mapred.JobConf
+
+import org.apache.spark.util.Utils
+
+private[spark]
+class SerializableJobConf(@transient var value: JobConf) extends Serializable {
+  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
+    out.defaultWriteObject()
+    value.write(out)
+  }
+
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
+    value = new JobConf(false)
+    value.readFields(in)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 65ecad9..b30fc17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -49,7 +49,8 @@ import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, InternalRow, _}
 import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.{Logging, SerializableWritable, TaskContext}
+import org.apache.spark.{Logging, TaskContext}
+import org.apache.spark.util.SerializableConfiguration
 
 /**
  * :: DeveloperApi ::
@@ -329,7 +330,7 @@ private[sql] case class InsertIntoParquetTable(
     job.setOutputKeyClass(keyType)
     job.setOutputValueClass(classOf[InternalRow])
     NewFileOutputFormat.setOutputPath(job, new Path(path))
-    val wrappedConf = new SerializableWritable(job.getConfiguration)
+    val wrappedConf = new SerializableConfiguration(job.getConfiguration)
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     val jobtrackerID = formatter.format(new Date())
     val stageId = sqlContext.sparkContext.newRddId()

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 4c702c3..c9de45e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConversions._
 import scala.util.Try
 
 import com.google.common.base.Objects
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
@@ -42,8 +41,8 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.util.Utils
-import org.apache.spark.{Logging, SerializableWritable, SparkException, Partition => SparkPartition}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.{Logging, SparkException, Partition => SparkPartition}
 
 private[sql] class DefaultSource extends HadoopFsRelationProvider {
   override def createRelation(
@@ -258,7 +257,7 @@ private[sql] class ParquetRelation2(
       requiredColumns: Array[String],
       filters: Array[Filter],
       inputFiles: Array[FileStatus],
-      broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
+      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
     val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
     val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
     // Create the function to set variable Parquet confs at both driver and executor side.

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index 4cf6743..a8f56f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.sources
 
+import org.apache.spark.{Logging, TaskContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
 import org.apache.spark.sql._
@@ -27,9 +28,8 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.{Logging, SerializableWritable, TaskContext}
 
 /**
  * A Strategy for planning scans over data sources defined using the sources API.
@@ -91,7 +91,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       // broadcast HadoopConf.
       val sharedHadoopConf = SparkHadoopUtil.get.conf
       val confBroadcast =
-        t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
+        t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
       pruneFilterProject(
         l,
         projects,
@@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
     // Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
     val sharedHadoopConf = SparkHadoopUtil.get.conf
     val confBroadcast =
-      relation.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
+      relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
 
     // Builds RDD[Row]s for each selected partition.
     val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
index ebad0c1..2bdc341 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
@@ -34,7 +34,7 @@ import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.rdd.{RDD, HadoopRDD}
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 import scala.reflect.ClassTag
 
@@ -65,7 +65,7 @@ private[spark] class SqlNewHadoopPartition(
  */
 private[sql] class SqlNewHadoopRDD[K, V](
     @transient sc : SparkContext,
-    broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+    broadcastedConf: Broadcast[SerializableConfiguration],
     @transient initDriverSideJobFuncOpt: Option[Job => Unit],
     initLocalJobFuncOpt: Option[Job => Unit],
     inputFormatClass: Class[_ <: InputFormat[K, V]],

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index d39a20b..c16bd9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode}
+import org.apache.spark.util.SerializableConfiguration
 
 private[sql] case class InsertIntoDataSource(
     logicalRelation: LogicalRelation,
@@ -260,7 +261,7 @@ private[sql] abstract class BaseWriterContainer(
   with Logging
   with Serializable {
 
-  protected val serializableConf = new SerializableWritable(ContextUtil.getConfiguration(job))
+  protected val serializableConf = new SerializableConfiguration(ContextUtil.getConfiguration(job))
 
   // This is only used on driver side.
   @transient private val jobContext: JobContext = job

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 43d3507..7005c70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -27,12 +27,12 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
-import org.apache.spark.SerializableWritable
 import org.apache.spark.sql.execution.RDDConversions
 import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
 
 /**
  * ::DeveloperApi::
@@ -518,7 +518,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
       requiredColumns: Array[String],
       filters: Array[Filter],
       inputPaths: Array[String],
-      broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
+      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
     val inputStatuses = inputPaths.flatMap { input =>
       val path = new Path(input)
 
@@ -648,7 +648,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
       requiredColumns: Array[String],
       filters: Array[Filter],
       inputFiles: Array[FileStatus],
-      broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
+      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
     buildScan(requiredColumns, filters, inputFiles)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 4858103..439f39b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
@@ -30,12 +29,12 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters,
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
 
-import org.apache.spark.{Logging, SerializableWritable}
+import org.apache.spark.{Logging}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.DateUtils
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 /**
  * A trait for subclasses that handle table scans.
@@ -72,7 +71,7 @@ class HadoopTableReader(
   // TODO: set aws s3 credentials.
 
   private val _broadcastedHiveConf =
-    sc.sparkContext.broadcast(new SerializableWritable(hiveExtraConf))
+    sc.sparkContext.broadcast(new SerializableConfiguration(hiveExtraConf))
 
   override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
     makeRDDForTable(
@@ -276,7 +275,7 @@ class HadoopTableReader(
 
     val rdd = new HadoopRDD(
       sc.sparkContext,
-      _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+      _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableConfiguration]],
       Some(initializeJobConfFunc),
       inputFormatClass,
       classOf[Writable],

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 1d306c5..404bb93 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -35,9 +35,10 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow}
 import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive._
-import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
+import org.apache.spark.{SparkException, TaskContext}
 
 import scala.collection.JavaConversions._
+import org.apache.spark.util.SerializableJobConf
 
 private[hive]
 case class InsertIntoHiveTable(
@@ -64,7 +65,7 @@ case class InsertIntoHiveTable(
       rdd: RDD[InternalRow],
       valueClass: Class[_],
       fileSinkConf: FileSinkDesc,
-      conf: SerializableWritable[JobConf],
+      conf: SerializableJobConf,
       writerContainer: SparkHiveWriterContainer): Unit = {
     assert(valueClass != null, "Output value class not set")
     conf.value.setOutputValueClass(valueClass)
@@ -172,7 +173,7 @@ case class InsertIntoHiveTable(
     }
 
     val jobConf = new JobConf(sc.hiveconf)
-    val jobConfSer = new SerializableWritable(jobConf)
+    val jobConfSer = new SerializableJobConf(jobConf)
 
     val writerContainer = if (numDynamicPartitions > 0) {
       val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index ee440e3..0bc69c0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -37,6 +37,7 @@ import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
 import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableJobConf
 
 /**
  * Internal helper class that saves an RDD using a Hive OutputFormat.
@@ -57,7 +58,7 @@ private[hive] class SparkHiveWriterContainer(
     PlanUtils.configureOutputJobPropertiesForStorageHandler(tableDesc)
     Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
   }
-  protected val conf = new SerializableWritable(jobConf)
+  protected val conf = new SerializableJobConf(jobConf)
 
   private var jobID = 0
   private var splitID = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index f03c4cd..77f1ca9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -39,7 +39,8 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreType
 import org.apache.spark.sql.sources.{Filter, _}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.{Logging, SerializableWritable}
+import org.apache.spark.{Logging}
+import org.apache.spark.util.SerializableConfiguration
 
 /* Implicit conversions */
 import scala.collection.JavaConversions._
@@ -283,7 +284,7 @@ private[orc] case class OrcTableScan(
       classOf[Writable]
     ).asInstanceOf[HadoopRDD[NullWritable, Writable]]
 
-    val wrappedConf = new SerializableWritable(conf)
+    val wrappedConf = new SerializableConfiguration(conf)
 
     rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
       val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 6c1fab5..86a8e2b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -26,10 +26,9 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
-import org.apache.spark.{SparkConf, SerializableWritable}
 import org.apache.spark.rdd.{RDD, UnionRDD}
 import org.apache.spark.streaming._
-import org.apache.spark.util.{TimeStampedHashMap, Utils}
+import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils}
 
 /**
  * This class represents an input stream that monitors a Hadoop-compatible filesystem for new
@@ -78,7 +77,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
     (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
   extends InputDStream[(K, V)](ssc_) {
 
-  private val serializableConfOpt = conf.map(new SerializableWritable(_))
+  private val serializableConfOpt = conf.map(new SerializableConfiguration(_))
 
   /**
    * Minimum duration of remembering the information of selected files. Defaults to 60 seconds.

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 358e4c6..71bec96 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -24,10 +24,11 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.{JobConf, OutputFormat}
 import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
 
-import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
+import org.apache.spark.{HashPartitioner, Partitioner}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.streaming.StreamingContext.rddToFileName
+import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf}
 
 /**
  * Extra functions available on DStream of (key, value) pairs through an implicit conversion.
@@ -688,7 +689,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
       conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
     ): Unit = ssc.withScope {
     // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
-    val serializableConf = new SerializableWritable(conf)
+    val serializableConf = new SerializableJobConf(conf)
     val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
       val file = rddToFileName(prefix, suffix, time)
       rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
@@ -721,7 +722,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
       conf: Configuration = ssc.sparkContext.hadoopConfiguration
     ): Unit = ssc.withScope {
     // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
-    val serializableConf = new SerializableWritable(conf)
+    val serializableConf = new SerializableConfiguration(conf)
     val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
       val file = rddToFileName(prefix, suffix, time)
       rdd.saveAsNewAPIHadoopFile(

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index ffce6a4..31ce8e1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -23,12 +23,11 @@ import java.util.UUID
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
-import org.apache.commons.io.FileUtils
-
 import org.apache.spark._
 import org.apache.spark.rdd.BlockRDD
 import org.apache.spark.storage.{BlockId, StorageLevel}
 import org.apache.spark.streaming.util._
+import org.apache.spark.util.SerializableConfiguration
 
 /**
  * Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
@@ -94,7 +93,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
 
   // Hadoop configuration is not serializable, so broadcast it as a serializable.
   @transient private val hadoopConfig = sc.hadoopConfiguration
-  private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)
+  private val broadcastedHadoopConf = new SerializableConfiguration(hadoopConfig)
 
   override def isValid(): Boolean = true
 

http://git-wip-us.apache.org/repos/asf/spark/blob/43f50dec/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index f1504b0..e6cdbec 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -21,10 +21,12 @@ import scala.collection.mutable.{HashMap, SynchronizedMap}
 import scala.language.existentials
 
 import org.apache.spark.streaming.util.WriteAheadLogUtils
-import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException}
+import org.apache.spark.{Logging, SparkEnv, SparkException}
 import org.apache.spark.rpc._
 import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl, StopReceiver}
+import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl,
+  StopReceiver}
+import org.apache.spark.util.SerializableConfiguration
 
 /**
  * Messages used by the NetworkReceiver and the ReceiverTracker to communicate
@@ -294,7 +296,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
         }
 
       val checkpointDirOption = Option(ssc.checkpointDir)
-      val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)
+      val serializableHadoopConf =
+        new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
 
       // Function to start the receiver on the worker node
       val startReceiver = (iterator: Iterator[Receiver[_]]) => {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org