You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/07/30 22:19:27 UTC

[2/2] git commit: [SPARK-2024] Add saveAsSequenceFile to PySpark

[SPARK-2024] Add saveAsSequenceFile to PySpark

JIRA issue: https://issues.apache.org/jira/browse/SPARK-2024

This PR is a followup to #455 and adds capabilities for saving PySpark RDDs using SequenceFile or any Hadoop OutputFormats.

* Added RDD methods ```saveAsSequenceFile```, ```saveAsHadoopFile``` and ```saveAsHadoopDataset```, for both old and new MapReduce APIs.

* Default converter for converting common data types to Writables. Users may specify custom converters to convert to desired data types.

* No out-of-box support for reading/writing arrays, since ArrayWritable itself doesn't have a no-arg constructor for creating an empty instance upon reading. Users need to provide ArrayWritable subtypes. Custom converters for converting arrays to suitable ArrayWritable subtypes are also needed when writing. When reading, the default converter will convert any custom ArrayWritable subtypes to ```Object[]``` and they get pickled to Python tuples.

* Added HBase and Cassandra output examples to show how custom output formats and converters can be used.

cc MLnick mateiz ahirreddy pwendell

Author: Kan Zhang <kz...@apache.org>

Closes #1338 from kanzhang/SPARK-2024 and squashes the following commits:

c01e3ef [Kan Zhang] [SPARK-2024] code formatting
6591e37 [Kan Zhang] [SPARK-2024] renaming pickled -> pickledRDD
d998ad6 [Kan Zhang] [SPARK-2024] refectoring to get method params below 10
57a7a5e [Kan Zhang] [SPARK-2024] correcting typo
75ca5bd [Kan Zhang] [SPARK-2024] Better type checking for batch serialized RDD
0bdec55 [Kan Zhang] [SPARK-2024] Refactoring newly added tests
9f39ff4 [Kan Zhang] [SPARK-2024] Adding 2 saveAsHadoopDataset tests
0c134f3 [Kan Zhang] [SPARK-2024] Test refactoring and adding couple unbatched cases
7a176df [Kan Zhang] [SPARK-2024] Add saveAsSequenceFile to PySpark


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94d1f46f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94d1f46f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94d1f46f

Branch: refs/heads/master
Commit: 94d1f46fc43c0cb85125f757fb40db9271caf1f4
Parents: 437dc8c
Author: Kan Zhang <kz...@apache.org>
Authored: Wed Jul 30 13:19:05 2014 -0700
Committer: Josh Rosen <jo...@apache.org>
Committed: Wed Jul 30 13:19:05 2014 -0700

----------------------------------------------------------------------
 .../spark/api/python/PythonHadoopUtil.scala     |  82 ++++-
 .../org/apache/spark/api/python/PythonRDD.scala | 247 +++++++++++----
 .../org/apache/spark/api/python/SerDeUtil.scala |  61 +++-
 .../WriteInputFormatTestDataGenerator.scala     |  69 +++-
 docs/programming-guide.md                       |  52 ++-
 .../src/main/python/cassandra_outputformat.py   |  83 +++++
 examples/src/main/python/hbase_inputformat.py   |   3 +-
 examples/src/main/python/hbase_outputformat.py  |  65 ++++
 .../pythonconverters/CassandraConverters.scala  |  24 +-
 .../pythonconverters/HBaseConverter.scala       |  33 --
 .../pythonconverters/HBaseConverters.scala      |  70 ++++
 python/pyspark/context.py                       |  51 ++-
 python/pyspark/rdd.py                           | 114 +++++++
 python/pyspark/tests.py                         | 317 ++++++++++++++++++-
 14 files changed, 1085 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index adaa1ef..f3b05e1 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.api.python
 
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SerializableWritable, SparkException}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io._
 import scala.util.{Failure, Success, Try}
@@ -31,13 +32,14 @@ import org.apache.spark.annotation.Experimental
  * transformation code by overriding the convert method.
  */
 @Experimental
-trait Converter[T, U] extends Serializable {
+trait Converter[T, + U] extends Serializable {
   def convert(obj: T): U
 }
 
 private[python] object Converter extends Logging {
 
-  def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
+  def getInstance(converterClass: Option[String],
+                  defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
     converterClass.map { cc =>
       Try {
         val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
@@ -49,7 +51,7 @@ private[python] object Converter extends Logging {
           logError(s"Failed to load converter: $cc")
           throw err
       }
-    }.getOrElse { new DefaultConverter }
+    }.getOrElse { defaultConverter }
   }
 }
 
@@ -57,7 +59,9 @@ private[python] object Converter extends Logging {
  * A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
  * Other objects are passed through without conversion.
  */
-private[python] class DefaultConverter extends Converter[Any, Any] {
+private[python] class WritableToJavaConverter(
+    conf: Broadcast[SerializableWritable[Configuration]],
+    batchSize: Int) extends Converter[Any, Any] {
 
   /**
    * Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
@@ -72,17 +76,30 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
       case fw: FloatWritable => fw.get()
       case t: Text => t.toString
       case bw: BooleanWritable => bw.get()
-      case byw: BytesWritable => byw.getBytes
+      case byw: BytesWritable =>
+        val bytes = new Array[Byte](byw.getLength)
+        System.arraycopy(byw.getBytes(), 0, bytes, 0, byw.getLength)
+        bytes
       case n: NullWritable => null
-      case aw: ArrayWritable => aw.get().map(convertWritable(_))
-      case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
-        (convertWritable(k), convertWritable(v))
-      }.toMap)
+      case aw: ArrayWritable =>
+        // Due to erasure, all arrays appear as Object[] and they get pickled to Python tuples.
+        // Since we can't determine element types for empty arrays, we will not attempt to
+        // convert to primitive arrays (which get pickled to Python arrays). Users may want
+        // write custom converters for arrays if they know the element types a priori.
+        aw.get().map(convertWritable(_))
+      case mw: MapWritable =>
+        val map = new java.util.HashMap[Any, Any]()
+        mw.foreach { case (k, v) =>
+          map.put(convertWritable(k), convertWritable(v))
+        }
+        map
+      case w: Writable =>
+        if (batchSize > 1) WritableUtils.clone(w, conf.value.value) else w
       case other => other
     }
   }
 
-  def convert(obj: Any): Any = {
+  override def convert(obj: Any): Any = {
     obj match {
       case writable: Writable =>
         convertWritable(writable)
@@ -92,6 +109,47 @@ private[python] class DefaultConverter extends Converter[Any, Any] {
   }
 }
 
+/**
+ * A converter that converts common types to [[org.apache.hadoop.io.Writable]]. Note that array
+ * types are not supported since the user needs to subclass [[org.apache.hadoop.io.ArrayWritable]]
+ * to set the type properly. See [[org.apache.spark.api.python.DoubleArrayWritable]] and
+ * [[org.apache.spark.api.python.DoubleArrayToWritableConverter]] for an example. They are used in
+ * PySpark RDD `saveAsNewAPIHadoopFile` doctest.
+ */
+private[python] class JavaToWritableConverter extends Converter[Any, Writable] {
+
+  /**
+   * Converts common data types to [[org.apache.hadoop.io.Writable]]. Note that array types are not
+   * supported out-of-the-box.
+   */
+  private def convertToWritable(obj: Any): Writable = {
+    import collection.JavaConversions._
+    obj match {
+      case i: java.lang.Integer => new IntWritable(i)
+      case d: java.lang.Double => new DoubleWritable(d)
+      case l: java.lang.Long => new LongWritable(l)
+      case f: java.lang.Float => new FloatWritable(f)
+      case s: java.lang.String => new Text(s)
+      case b: java.lang.Boolean => new BooleanWritable(b)
+      case aob: Array[Byte] => new BytesWritable(aob)
+      case null => NullWritable.get()
+      case map: java.util.Map[_, _] =>
+        val mapWritable = new MapWritable()
+        map.foreach { case (k, v) =>
+          mapWritable.put(convertToWritable(k), convertToWritable(v))
+        }
+        mapWritable
+      case other => throw new SparkException(
+        s"Data of type ${other.getClass.getName} cannot be used")
+    }
+  }
+
+  override def convert(obj: Any): Writable = obj match {
+    case writable: Writable => writable
+    case other => convertToWritable(other)
+  }
+}
+
 /** Utilities for working with Python objects <-> Hadoop-related objects */
 private[python] object PythonHadoopUtil {
 
@@ -118,7 +176,7 @@ private[python] object PythonHadoopUtil {
 
   /**
    * Converts an RDD of key-value pairs, where key and/or value could be instances of
-   * [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
+   * [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice versa.
    */
   def convertRDD[K, V](rdd: RDD[(K, V)],
                        keyConverter: Converter[Any, Any],

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index f551a59..a9d758b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -23,15 +23,18 @@ import java.nio.charset.Charset
 import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
 
 import scala.collection.JavaConversions._
+import scala.language.existentials
 import scala.reflect.ClassTag
 import scala.util.Try
 
 import net.razorvine.pickle.{Pickler, Unpickler}
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.{InputFormat, JobConf}
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat}
 import org.apache.spark._
+import org.apache.spark.SparkContext._
 import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
@@ -365,19 +368,17 @@ private[spark] object PythonRDD extends Logging {
       valueClassMaybeNull: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      minSplits: Int) = {
+      minSplits: Int,
+      batchSize: Int) = {
     val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
     val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
-    implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
-    implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
-    val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
-    val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
-
+    val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+    val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
     val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   /**
@@ -394,17 +395,16 @@ private[spark] object PythonRDD extends Logging {
       valueClass: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      confAsMap: java.util.HashMap[String, String]) = {
-    val conf = PythonHadoopUtil.mapToConf(confAsMap)
-    val baseConf = sc.hadoopConfiguration()
-    val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
+      confAsMap: java.util.HashMap[String, String],
+      batchSize: Int) = {
+    val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
     val rdd =
       newAPIHadoopRDDFromClassNames[K, V, F](sc,
         Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   /**
@@ -421,15 +421,16 @@ private[spark] object PythonRDD extends Logging {
       valueClass: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      confAsMap: java.util.HashMap[String, String]) = {
+      confAsMap: java.util.HashMap[String, String],
+      batchSize: Int) = {
     val conf = PythonHadoopUtil.mapToConf(confAsMap)
     val rdd =
       newAPIHadoopRDDFromClassNames[K, V, F](sc,
         None, inputFormatClass, keyClass, valueClass, conf)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
@@ -439,18 +440,14 @@ private[spark] object PythonRDD extends Logging {
       keyClass: String,
       valueClass: String,
       conf: Configuration) = {
-    implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
-    implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
-    implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
-    val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
-    val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
-    val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
-    val rdd = if (path.isDefined) {
+    val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+    val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+    val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+    if (path.isDefined) {
       sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
     } else {
       sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc)
     }
-    rdd
   }
 
   /**
@@ -467,17 +464,16 @@ private[spark] object PythonRDD extends Logging {
       valueClass: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      confAsMap: java.util.HashMap[String, String]) = {
-    val conf = PythonHadoopUtil.mapToConf(confAsMap)
-    val baseConf = sc.hadoopConfiguration()
-    val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
+      confAsMap: java.util.HashMap[String, String],
+      batchSize: Int) = {
+    val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
     val rdd =
       hadoopRDDFromClassNames[K, V, F](sc,
         Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   /**
@@ -494,15 +490,16 @@ private[spark] object PythonRDD extends Logging {
       valueClass: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      confAsMap: java.util.HashMap[String, String]) = {
+      confAsMap: java.util.HashMap[String, String],
+      batchSize: Int) = {
     val conf = PythonHadoopUtil.mapToConf(confAsMap)
     val rdd =
       hadoopRDDFromClassNames[K, V, F](sc,
         None, inputFormatClass, keyClass, valueClass, conf)
-    val keyConverter = Converter.getInstance(Option(keyConverterClass))
-    val valueConverter = Converter.getInstance(Option(valueConverterClass))
-    val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
-    JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
+    val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new WritableToJavaConverter(confBroadcasted, batchSize))
+    JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
   }
 
   private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
@@ -512,18 +509,14 @@ private[spark] object PythonRDD extends Logging {
       keyClass: String,
       valueClass: String,
       conf: Configuration) = {
-    implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
-    implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
-    implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
-    val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
-    val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
-    val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
-    val rdd = if (path.isDefined) {
+    val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
+    val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+    val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+    if (path.isDefined) {
       sc.sc.hadoopFile(path.get, fc, kc, vc)
     } else {
       sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc)
     }
-    rdd
   }
 
   def writeUTF(str: String, dataOut: DataOutputStream) {
@@ -562,6 +555,152 @@ private[spark] object PythonRDD extends Logging {
     }
   }
 
+  private def getMergedConf(confAsMap: java.util.HashMap[String, String],
+      baseConf: Configuration): Configuration = {
+    val conf = PythonHadoopUtil.mapToConf(confAsMap)
+    PythonHadoopUtil.mergeConfs(baseConf, conf)
+  }
+
+  private def inferKeyValueTypes[K, V](rdd: RDD[(K, V)], keyConverterClass: String = null,
+      valueConverterClass: String = null): (Class[_], Class[_]) = {
+    // Peek at an element to figure out key/value types. Since Writables are not serializable,
+    // we cannot call first() on the converted RDD. Instead, we call first() on the original RDD
+    // and then convert locally.
+    val (key, value) = rdd.first()
+    val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
+      new JavaToWritableConverter)
+    (kc.convert(key).getClass, vc.convert(value).getClass)
+  }
+
+  private def getKeyValueTypes(keyClass: String, valueClass: String):
+      Option[(Class[_], Class[_])] = {
+    for {
+      k <- Option(keyClass)
+      v <- Option(valueClass)
+    } yield (Class.forName(k), Class.forName(v))
+  }
+
+  private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
+      defaultConverter: Converter[Any, Any]): (Converter[Any, Any], Converter[Any, Any]) = {
+    val keyConverter = Converter.getInstance(Option(keyConverterClass), defaultConverter)
+    val valueConverter = Converter.getInstance(Option(valueConverterClass), defaultConverter)
+    (keyConverter, valueConverter)
+  }
+
+  /**
+   * Convert an RDD of key-value pairs from internal types to serializable types suitable for
+   * output, or vice versa.
+   */
+  private def convertRDD[K, V](rdd: RDD[(K, V)],
+                               keyConverterClass: String,
+                               valueConverterClass: String,
+                               defaultConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
+    val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
+      defaultConverter)
+    PythonHadoopUtil.convertRDD(rdd, kc, vc)
+  }
+
+  /**
+   * Output a Python RDD of key-value pairs as a Hadoop SequenceFile using the Writable types
+   * we convert from the RDD's key and value types. Note that keys and values can't be
+   * [[org.apache.hadoop.io.Writable]] types already, since Writables are not Java
+   * `Serializable` and we can't peek at them. The `path` can be on any Hadoop file system.
+   */
+  def saveAsSequenceFile[K, V, C <: CompressionCodec](
+      pyRDD: JavaRDD[Array[Byte]],
+      batchSerialized: Boolean,
+      path: String,
+      compressionCodecClass: String) = {
+    saveAsHadoopFile(
+      pyRDD, batchSerialized, path, "org.apache.hadoop.mapred.SequenceFileOutputFormat",
+      null, null, null, null, new java.util.HashMap(), compressionCodecClass)
+  }
+
+  /**
+   * Output a Python RDD of key-value pairs to any Hadoop file system, using old Hadoop
+   * `OutputFormat` in mapred package. Keys and values are converted to suitable output
+   * types using either user specified converters or, if not specified,
+   * [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types
+   * `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in
+   * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
+   * this RDD.
+   */
+  def saveAsHadoopFile[K, V, F <: OutputFormat[_, _], C <: CompressionCodec](
+      pyRDD: JavaRDD[Array[Byte]],
+      batchSerialized: Boolean,
+      path: String,
+      outputFormatClass: String,
+      keyClass: String,
+      valueClass: String,
+      keyConverterClass: String,
+      valueConverterClass: String,
+      confAsMap: java.util.HashMap[String, String],
+      compressionCodecClass: String) = {
+    val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
+    val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
+      inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
+    val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
+    val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new JavaToWritableConverter)
+    val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+    converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
+  }
+
+  /**
+   * Output a Python RDD of key-value pairs to any Hadoop file system, using new Hadoop
+   * `OutputFormat` in mapreduce package. Keys and values are converted to suitable output
+   * types using either user specified converters or, if not specified,
+   * [[org.apache.spark.api.python.JavaToWritableConverter]]. Post-conversion types
+   * `keyClass` and `valueClass` are automatically inferred if not specified. The passed-in
+   * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
+   * this RDD.
+   */
+  def saveAsNewAPIHadoopFile[K, V, F <: NewOutputFormat[_, _]](
+      pyRDD: JavaRDD[Array[Byte]],
+      batchSerialized: Boolean,
+      path: String,
+      outputFormatClass: String,
+      keyClass: String,
+      valueClass: String,
+      keyConverterClass: String,
+      valueConverterClass: String,
+      confAsMap: java.util.HashMap[String, String]) = {
+    val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
+    val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
+      inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
+    val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
+    val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
+      new JavaToWritableConverter)
+    val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+    converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
+  }
+
+  /**
+   * Output a Python RDD of key-value pairs to any Hadoop file system, using a Hadoop conf
+   * converted from the passed-in `confAsMap`. The conf should set relevant output params (
+   * e.g., output path, output format, etc), in the same way as it would be configured for
+   * a Hadoop MapReduce job. Both old and new Hadoop OutputFormat APIs are supported
+   * (mapred vs. mapreduce). Keys/values are converted for output using either user specified
+   * converters or, by default, [[org.apache.spark.api.python.JavaToWritableConverter]].
+   */
+  def saveAsHadoopDataset[K, V](
+      pyRDD: JavaRDD[Array[Byte]],
+      batchSerialized: Boolean,
+      confAsMap: java.util.HashMap[String, String],
+      keyConverterClass: String,
+      valueConverterClass: String,
+      useNewAPI: Boolean) = {
+    val conf = PythonHadoopUtil.mapToConf(confAsMap)
+    val converted = convertRDD(SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized),
+      keyConverterClass, valueConverterClass, new JavaToWritableConverter)
+    if (useNewAPI) {
+      converted.saveAsNewAPIHadoopDataset(conf)
+    } else {
+      converted.saveAsHadoopDataset(new JobConf(conf))
+    }
+  }
+
   /**
    * Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by
    * PySpark.

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 9a012e7..efc9009 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.api.python
 
-import scala.util.Try
-import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
-import scala.util.Success
+import scala.collection.JavaConversions._
 import scala.util.Failure
-import net.razorvine.pickle.Pickler
+import scala.util.Try
 
+import net.razorvine.pickle.{Unpickler, Pickler}
+
+import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.rdd.RDD
 
 /** Utilities for serialization / deserialization between Python and Java, using Pickle. */
 private[python] object SerDeUtil extends Logging {
@@ -65,20 +66,52 @@ private[python] object SerDeUtil extends Logging {
    * by PySpark. By default, if serialization fails, toString is called and the string
    * representation is serialized
    */
-  def rddToPython(rdd: RDD[(Any, Any)]): RDD[Array[Byte]] = {
+  def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] = {
     val (keyFailed, valueFailed) = checkPickle(rdd.first())
     rdd.mapPartitions { iter =>
       val pickle = new Pickler
-      iter.map { case (k, v) =>
-        if (keyFailed && valueFailed) {
-          pickle.dumps(Array(k.toString, v.toString))
-        } else if (keyFailed) {
-          pickle.dumps(Array(k.toString, v))
-        } else if (!keyFailed && valueFailed) {
-          pickle.dumps(Array(k, v.toString))
+      val cleaned = iter.map { case (k, v) =>
+        val key = if (keyFailed) k.toString else k
+        val value = if (valueFailed) v.toString else v
+        Array[Any](key, value)
+      }
+      if (batchSize > 1) {
+        cleaned.grouped(batchSize).map(batched => pickle.dumps(seqAsJavaList(batched)))
+      } else {
+        cleaned.map(pickle.dumps(_))
+      }
+    }
+  }
+
+  /**
+   * Convert an RDD of serialized Python tuple (K, V) to RDD[(K, V)].
+   */
+  def pythonToPairRDD[K, V](pyRDD: RDD[Array[Byte]], batchSerialized: Boolean): RDD[(K, V)] = {
+    def isPair(obj: Any): Boolean = {
+      Option(obj.getClass.getComponentType).map(!_.isPrimitive).getOrElse(false) &&
+        obj.asInstanceOf[Array[_]].length == 2
+    }
+    pyRDD.mapPartitions { iter =>
+      val unpickle = new Unpickler
+      val unpickled =
+        if (batchSerialized) {
+          iter.flatMap { batch =>
+            unpickle.loads(batch) match {
+              case objs: java.util.List[_] => collectionAsScalaIterable(objs)
+              case other => throw new SparkException(
+                s"Unexpected type ${other.getClass.getName} for batch serialized Python RDD")
+            }
+          }
         } else {
-          pickle.dumps(Array(k, v))
+          iter.map(unpickle.loads(_))
         }
+      unpickled.map {
+        case obj if isPair(obj) =>
+          // we only accept (K, V)
+          val arr = obj.asInstanceOf[Array[_]]
+          (arr.head.asInstanceOf[K], arr.last.asInstanceOf[V])
+        case other => throw new SparkException(
+          s"RDD element of type ${other.getClass.getName} cannot be used")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index f0e3fb9..d11db97 100644
--- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -17,15 +17,16 @@
 
 package org.apache.spark.api.python
 
-import org.apache.spark.SparkContext
-import org.apache.hadoop.io._
-import scala.Array
 import java.io.{DataOutput, DataInput}
+import java.nio.charset.Charset
+
+import org.apache.hadoop.io._
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
 import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.{SparkContext, SparkException}
 
 /**
- * A class to test MsgPack serialization on the Scala side, that will be deserialized
+ * A class to test Pyrolite serialization on the Scala side, that will be deserialized
  * in Python
  * @param str
  * @param int
@@ -54,7 +55,13 @@ case class TestWritable(var str: String, var int: Int, var double: Double) exten
   }
 }
 
-class TestConverter extends Converter[Any, Any] {
+private[python] class TestInputKeyConverter extends Converter[Any, Any] {
+  override def convert(obj: Any) = {
+    obj.asInstanceOf[IntWritable].get().toChar
+  }
+}
+
+private[python] class TestInputValueConverter extends Converter[Any, Any] {
   import collection.JavaConversions._
   override def convert(obj: Any) = {
     val m = obj.asInstanceOf[MapWritable]
@@ -62,6 +69,38 @@ class TestConverter extends Converter[Any, Any] {
   }
 }
 
+private[python] class TestOutputKeyConverter extends Converter[Any, Any] {
+  override def convert(obj: Any) = {
+    new Text(obj.asInstanceOf[Int].toString)
+  }
+}
+
+private[python] class TestOutputValueConverter extends Converter[Any, Any] {
+  import collection.JavaConversions._
+  override def convert(obj: Any) = {
+    new DoubleWritable(obj.asInstanceOf[java.util.Map[Double, _]].keySet().head)
+  }
+}
+
+private[python] class DoubleArrayWritable extends ArrayWritable(classOf[DoubleWritable])
+
+private[python] class DoubleArrayToWritableConverter extends Converter[Any, Writable] {
+  override def convert(obj: Any) = obj match {
+    case arr if arr.getClass.isArray && arr.getClass.getComponentType == classOf[Double] =>
+      val daw = new DoubleArrayWritable
+      daw.set(arr.asInstanceOf[Array[Double]].map(new DoubleWritable(_)))
+      daw
+    case other => throw new SparkException(s"Data of type $other is not supported")
+  }
+}
+
+private[python] class WritableToDoubleArrayConverter extends Converter[Any, Array[Double]] {
+  override def convert(obj: Any): Array[Double] = obj match {
+    case daw : DoubleArrayWritable => daw.get().map(_.asInstanceOf[DoubleWritable].get())
+    case other => throw new SparkException(s"Data of type $other is not supported")
+  }
+}
+
 /**
  * This object contains method to generate SequenceFile test data and write it to a
  * given directory (probably a temp directory)
@@ -97,7 +136,8 @@ object WriteInputFormatTestDataGenerator {
     sc.parallelize(intKeys).saveAsSequenceFile(intPath)
     sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
     sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)
-    sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath)
+    sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(Charset.forName("UTF-8"))) }
+      ).saveAsSequenceFile(bytesPath)
     val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false))
     sc.parallelize(bools).saveAsSequenceFile(boolPath)
     sc.parallelize(intKeys).map{ case (k, v) =>
@@ -106,19 +146,20 @@ object WriteInputFormatTestDataGenerator {
 
     // Create test data for ArrayWritable
     val data = Seq(
-      (1, Array(1.0, 2.0, 3.0)),
+      (1, Array()),
       (2, Array(3.0, 4.0, 5.0)),
       (3, Array(4.0, 5.0, 6.0))
     )
     sc.parallelize(data, numSlices = 2)
       .map{ case (k, v) =>
-      (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_))))
-    }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
+        val va = new DoubleArrayWritable
+        va.set(v.map(new DoubleWritable(_)))
+        (new IntWritable(k), va)
+    }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, DoubleArrayWritable]](arrPath)
 
     // Create test data for MapWritable, with keys DoubleWritable and values Text
     val mapData = Seq(
-      (1, Map(2.0 -> "aa")),
-      (2, Map(3.0 -> "bb")),
+      (1, Map()),
       (2, Map(1.0 -> "cc")),
       (3, Map(2.0 -> "dd")),
       (2, Map(1.0 -> "aa")),
@@ -126,9 +167,9 @@ object WriteInputFormatTestDataGenerator {
     )
     sc.parallelize(mapData, numSlices = 2).map{ case (i, m) =>
       val mw = new MapWritable()
-      val k = m.keys.head
-      val v = m.values.head
-      mw.put(new DoubleWritable(k), new Text(v))
+      m.foreach { case (k, v) =>
+        mw.put(new DoubleWritable(k), new Text(v))
+      }
       (new IntWritable(i), mw)
     }.saveAsSequenceFile(mapPath)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/docs/programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 90c6971..a88bf27 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -383,16 +383,16 @@ Apart from text files, Spark's Python API also supports several other data forma
 
 * `RDD.saveAsPickleFile` and `SparkContext.pickleFile` support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.
 
-* Details on reading `SequenceFile` and arbitrary Hadoop `InputFormat` are given below.
-
-### SequenceFile and Hadoop InputFormats
+* SequenceFile and Hadoop Input/Output Formats
 
 **Note** this feature is currently marked ```Experimental``` and is intended for advanced users. It may be replaced in future with read/write support based on SparkSQL, in which case SparkSQL is the preferred approach.
 
-#### Writable Support
+**Writable Support**
 
-PySpark SequenceFile support loads an RDD within Java, and pickles the resulting Java objects using
-[Pyrolite](https://github.com/irmen/Pyrolite/). The following Writables are automatically converted:
+PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles the 
+resulting Java objects using [Pyrolite](https://github.com/irmen/Pyrolite/). When saving an RDD of key-value pairs to SequenceFile, 
+PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables. The following 
+Writables are automatically converted:
 
 <table class="table">
 <tr><th>Writable Type</th><th>Python Type</th></tr>
@@ -403,32 +403,30 @@ PySpark SequenceFile support loads an RDD within Java, and pickles the resulting
 <tr><td>BooleanWritable</td><td>bool</td></tr>
 <tr><td>BytesWritable</td><td>bytearray</td></tr>
 <tr><td>NullWritable</td><td>None</td></tr>
-<tr><td>ArrayWritable</td><td>list of primitives, or tuple of objects</td></tr>
 <tr><td>MapWritable</td><td>dict</td></tr>
-<tr><td>Custom Class conforming to Java Bean conventions</td>
-    <td>dict of public properties (via JavaBean getters and setters) + __class__ for the class type</td></tr>
 </table>
 
-#### Loading SequenceFiles
+Arrays are not handled out-of-the-box. Users need to specify custom `ArrayWritable` subtypes when reading or writing. When writing, 
+users also need to specify custom converters that convert arrays to custom `ArrayWritable` subtypes. When reading, the default 
+converter will convert custom `ArrayWritable` subtypes to Java `Object[]`, which then get pickled to Python tuples. To get 
+Python `array.array` for arrays of primitive types, users need to specify custom converters.
+
+**Saving and Loading SequenceFiles**
 
-Similarly to text files, SequenceFiles can be loaded by specifying the path. The key and value
+Similarly to text files, SequenceFiles can be saved and loaded by specifying the path. The key and value
 classes can be specified, but for standard Writables this is not required.
 
 {% highlight python %}
->>> rdd = sc.sequenceFile("path/to/sequencefile/of/doubles")
->>> rdd.collect()         # this example has DoubleWritable keys and Text values
-[(1.0, u'aa'),
- (2.0, u'bb'),
- (2.0, u'aa'),
- (3.0, u'cc'),
- (2.0, u'bb'),
- (1.0, u'aa')]
+>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
+>>> rdd.saveAsSequenceFile("path/to/file")
+>>> sorted(sc.sequenceFile("path/to/file").collect())
+[(1, u'a'), (2, u'aa'), (3, u'aaa')]
 {% endhighlight %}
 
-#### Loading Other Hadoop InputFormats
+**Saving and Loading Other Hadoop Input/Output Formats**
 
-PySpark can also read any Hadoop InputFormat, for both 'new' and 'old' Hadoop APIs. If required,
-a Hadoop configuration can be passed in as a Python dict. Here is an example using the
+PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both 'new' and 'old' Hadoop MapReduce APIs. 
+If required, a Hadoop configuration can be passed in as a Python dict. Here is an example using the
 Elasticsearch ESInputFormat:
 
 {% highlight python %}
@@ -447,8 +445,7 @@ Note that, if the InputFormat simply depends on a Hadoop configuration and/or in
 the key and value classes can easily be converted according to the above table,
 then this approach should work well for such cases.
 
-If you have custom serialized binary data (such as loading data from Cassandra / HBase) or custom
-classes that don't conform to the JavaBean requirements, then you will first need to 
+If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to 
 transform that data on the Scala/Java side to something which can be handled by Pyrolite's pickler.
 A [Converter](api/scala/index.html#org.apache.spark.api.python.Converter) trait is provided 
 for this. Simply extend this trait and implement your transformation code in the ```convert``` 
@@ -456,11 +453,8 @@ method. Remember to ensure that this class, along with any dependencies required
 classpath.
 
 See the [Python examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python) and 
-the [Converter examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/pythonconverters) 
-for examples of using HBase and Cassandra ```InputFormat```.
-
-Future support for writing data out as ```SequenceFileOutputFormat``` and other ```OutputFormats```, 
-is forthcoming.
+the [Converter examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/pythonconverters) 
+for examples of using Cassandra / HBase ```InputFormat``` and ```OutputFormat``` with custom converters.
 
 </div>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/examples/src/main/python/cassandra_outputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py
new file mode 100644
index 0000000..1dfbf98
--- /dev/null
+++ b/examples/src/main/python/cassandra_outputformat.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Create data in Cassandra fist
+(following: https://wiki.apache.org/cassandra/GettingStarted)
+
+cqlsh> CREATE KEYSPACE test
+   ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
+cqlsh> use test;
+cqlsh:test> CREATE TABLE users (
+        ...   user_id int PRIMARY KEY,
+        ...   fname text,
+        ...   lname text
+        ... );
+
+> cassandra_outputformat <host> test users 1745 john smith
+> cassandra_outputformat <host> test users 1744 john doe
+> cassandra_outputformat <host> test users 1746 john smith
+
+cqlsh:test> SELECT * FROM users;
+
+ user_id | fname | lname
+---------+-------+-------
+    1745 |  john | smith
+    1744 |  john |   doe
+    1746 |  john | smith
+"""
+if __name__ == "__main__":
+    if len(sys.argv) != 7:
+        print >> sys.stderr, """
+        Usage: cassandra_outputformat <host> <keyspace> <cf> <user_id> <fname> <lname>
+
+        Run with example jar:
+        ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_outputformat.py <args>
+        Assumes you have created the following table <cf> in Cassandra already,
+        running on <host>, in <keyspace>.
+
+        cqlsh:<keyspace>> CREATE TABLE <cf> (
+           ...   user_id int PRIMARY KEY,
+           ...   fname text,
+           ...   lname text
+           ... );
+        """
+        exit(-1)
+
+    host = sys.argv[1]
+    keyspace = sys.argv[2]
+    cf = sys.argv[3]
+    sc = SparkContext(appName="CassandraOutputFormat")
+
+    conf = {"cassandra.output.thrift.address":host,
+            "cassandra.output.thrift.port":"9160",
+            "cassandra.output.keyspace":keyspace,
+            "cassandra.output.partitioner.class":"Murmur3Partitioner",
+            "cassandra.output.cql":"UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?",
+            "mapreduce.output.basename":cf,
+            "mapreduce.outputformat.class":"org.apache.cassandra.hadoop.cql3.CqlOutputFormat",
+            "mapreduce.job.output.key.class":"java.util.Map",
+            "mapreduce.job.output.value.class":"java.util.List"}
+    key = {"user_id" : int(sys.argv[4])}
+    sc.parallelize([(key, sys.argv[5:])]).saveAsNewAPIHadoopDataset(
+        conf=conf,
+        keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter",
+        valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter")

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/examples/src/main/python/hbase_inputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py
index 3289d98..c9fa8e1 100644
--- a/examples/src/main/python/hbase_inputformat.py
+++ b/examples/src/main/python/hbase_inputformat.py
@@ -65,7 +65,8 @@ if __name__ == "__main__":
         "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
         "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
         "org.apache.hadoop.hbase.client.Result",
-        valueConverter="org.apache.spark.examples.pythonconverters.HBaseConverter",
+        keyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter",
+        valueConverter="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter",
         conf=conf)
     output = hbase_rdd.collect()
     for (k, v) in output:

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/examples/src/main/python/hbase_outputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py
new file mode 100644
index 0000000..5e11548
--- /dev/null
+++ b/examples/src/main/python/hbase_outputformat.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Create test table in HBase first:
+
+hbase(main):001:0> create 'test', 'f1'
+0 row(s) in 0.7840 seconds
+
+> hbase_outputformat <host> test row1 f1 q1 value1
+> hbase_outputformat <host> test row2 f1 q1 value2
+> hbase_outputformat <host> test row3 f1 q1 value3
+> hbase_outputformat <host> test row4 f1 q1 value4
+
+hbase(main):002:0> scan 'test'
+ROW                   COLUMN+CELL
+ row1                 column=f1:q1, timestamp=1405659615726, value=value1
+ row2                 column=f1:q1, timestamp=1405659626803, value=value2
+ row3                 column=f1:q1, timestamp=1405659640106, value=value3
+ row4                 column=f1:q1, timestamp=1405659650292, value=value4
+4 row(s) in 0.0780 seconds
+"""
+if __name__ == "__main__":
+    if len(sys.argv) != 7:
+        print >> sys.stderr, """
+        Usage: hbase_outputformat <host> <table> <row> <family> <qualifier> <value>
+
+        Run with example jar:
+        ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_outputformat.py <args>
+        Assumes you have created <table> with column family <family> in HBase running on <host> already
+        """
+        exit(-1)
+
+    host = sys.argv[1]
+    table = sys.argv[2]
+    sc = SparkContext(appName="HBaseOutputFormat")
+
+    conf = {"hbase.zookeeper.quorum": host,
+            "hbase.mapred.outputtable": table,
+            "mapreduce.outputformat.class" : "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
+            "mapreduce.job.output.key.class" : "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
+            "mapreduce.job.output.value.class" : "org.apache.hadoop.io.Writable"}
+
+    sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(
+        conf=conf,
+        keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
+        valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter")

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
index 29a65c7..83feb57 100644
--- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
@@ -20,7 +20,7 @@ package org.apache.spark.examples.pythonconverters
 import org.apache.spark.api.python.Converter
 import java.nio.ByteBuffer
 import org.apache.cassandra.utils.ByteBufferUtil
-import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap}
+import collection.JavaConversions._
 
 
 /**
@@ -44,3 +44,25 @@ class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, St
     mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
   }
 }
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * Map[String, Int] to Cassandra key
+ */
+class ToCassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, ByteBuffer]] {
+  override def convert(obj: Any): java.util.Map[String, ByteBuffer] = {
+    val input = obj.asInstanceOf[java.util.Map[String, Int]]
+    mapAsJavaMap(input.mapValues(i => ByteBufferUtil.bytes(i)))
+  }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * List[String] to Cassandra value
+ */
+class ToCassandraCQLValueConverter extends Converter[Any, java.util.List[ByteBuffer]] {
+  override def convert(obj: Any): java.util.List[ByteBuffer] = {
+    val input = obj.asInstanceOf[java.util.List[String]]
+    seqAsJavaList(input.map(s => ByteBufferUtil.bytes(s)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
deleted file mode 100644
index 42ae960..0000000
--- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.pythonconverters
-
-import org.apache.spark.api.python.Converter
-import org.apache.hadoop.hbase.client.Result
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * Implementation of [[org.apache.spark.api.python.Converter]] that converts a HBase Result
- * to a String
- */
-class HBaseConverter extends Converter[Any, String] {
-  override def convert(obj: Any): String = {
-    val result = obj.asInstanceOf[Result]
-    Bytes.toStringBinary(result.value())
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
new file mode 100644
index 0000000..273bee0
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.pythonconverters
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.api.python.Converter
+import org.apache.hadoop.hbase.client.{Put, Result}
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts an
+ * HBase Result to a String
+ */
+class HBaseResultToStringConverter extends Converter[Any, String] {
+  override def convert(obj: Any): String = {
+    val result = obj.asInstanceOf[Result]
+    Bytes.toStringBinary(result.value())
+  }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts an
+ * ImmutableBytesWritable to a String
+ */
+class ImmutableBytesWritableToStringConverter extends Converter[Any, String] {
+  override def convert(obj: Any): String = {
+    val key = obj.asInstanceOf[ImmutableBytesWritable]
+    Bytes.toStringBinary(key.get())
+  }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * String to an ImmutableBytesWritable
+ */
+class StringToImmutableBytesWritableConverter extends Converter[Any, ImmutableBytesWritable] {
+  override def convert(obj: Any): ImmutableBytesWritable = {
+    val bytes = Bytes.toBytes(obj.asInstanceOf[String])
+    new ImmutableBytesWritable(bytes)
+  }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * list of Strings to HBase Put
+ */
+class StringListToPutConverter extends Converter[Any, Put] {
+  override def convert(obj: Any): Put = {
+    val output = obj.asInstanceOf[java.util.ArrayList[String]].map(Bytes.toBytes(_)).toArray
+    val put = new Put(output(0))
+    put.add(output(1), output(2), output(3))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 830a6ee..7b0f8d8 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -60,6 +60,7 @@ class SparkContext(object):
     _active_spark_context = None
     _lock = Lock()
     _python_includes = None  # zip and egg files that need to be added to PYTHONPATH
+    _default_batch_size_for_serialized_input = 10
 
     def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
                  environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
@@ -378,7 +379,7 @@ class SparkContext(object):
         return jm
 
     def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None,
-                     valueConverter=None, minSplits=None):
+                     valueConverter=None, minSplits=None, batchSize=None):
         """
         Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
         a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -398,14 +399,18 @@ class SparkContext(object):
         @param valueConverter:
         @param minSplits: minimum splits in dataset
                (default min(2, sc.defaultParallelism))
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default sc._default_batch_size_for_serialized_input)
         """
         minSplits = minSplits or min(self.defaultParallelism, 2)
+        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
-                                                keyConverter, valueConverter, minSplits)
-        return RDD(jrdd, self, PickleSerializer())
+                    keyConverter, valueConverter, minSplits, batchSize)
+        return RDD(jrdd, self, ser)
 
     def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
-                         valueConverter=None, conf=None):
+                         valueConverter=None, conf=None, batchSize=None):
         """
         Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
         a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -425,14 +430,18 @@ class SparkContext(object):
         @param valueConverter: (None by default)
         @param conf: Hadoop configuration, passed in as a dict
                (None by default)
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default sc._default_batch_size_for_serialized_input)
         """
         jconf = self._dictToJavaMap(conf)
+        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
-                                                    valueClass, keyConverter, valueConverter, jconf)
-        return RDD(jrdd, self, PickleSerializer())
+                    valueClass, keyConverter, valueConverter, jconf, batchSize)
+        return RDD(jrdd, self, ser)
 
     def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
-                        valueConverter=None, conf=None):
+                        valueConverter=None, conf=None, batchSize=None):
         """
         Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
         Hadoop configuration, which is passed in as a Python dict.
@@ -449,14 +458,18 @@ class SparkContext(object):
         @param valueConverter: (None by default)
         @param conf: Hadoop configuration, passed in as a dict
                (None by default)
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default sc._default_batch_size_for_serialized_input)
         """
         jconf = self._dictToJavaMap(conf)
+        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
-                                                   valueClass, keyConverter, valueConverter, jconf)
-        return RDD(jrdd, self, PickleSerializer())
+                    valueClass, keyConverter, valueConverter, jconf, batchSize)
+        return RDD(jrdd, self, ser)
 
     def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None,
-                   valueConverter=None, conf=None):
+                   valueConverter=None, conf=None, batchSize=None):
         """
         Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
         a local file system (available on all nodes), or any Hadoop-supported file system URI.
@@ -476,14 +489,18 @@ class SparkContext(object):
         @param valueConverter: (None by default)
         @param conf: Hadoop configuration, passed in as a dict
                (None by default)
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default sc._default_batch_size_for_serialized_input)
         """
         jconf = self._dictToJavaMap(conf)
+        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
-                                              valueClass, keyConverter, valueConverter, jconf)
-        return RDD(jrdd, self, PickleSerializer())
+                    valueClass, keyConverter, valueConverter, jconf, batchSize)
+        return RDD(jrdd, self, ser)
 
     def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
-                  valueConverter=None, conf=None):
+                  valueConverter=None, conf=None, batchSize=None):
         """
         Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
         Hadoop configuration, which is passed in as a Python dict.
@@ -500,11 +517,15 @@ class SparkContext(object):
         @param valueConverter: (None by default)
         @param conf: Hadoop configuration, passed in as a dict
                (None by default)
+        @param batchSize: The number of Python objects represented as a single
+               Java object. (default sc._default_batch_size_for_serialized_input)
         """
         jconf = self._dictToJavaMap(conf)
+        batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input)
+        ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer()
         jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass,
-                                             keyConverter, valueConverter, jconf)
-        return RDD(jrdd, self, PickleSerializer())
+                    keyConverter, valueConverter, jconf, batchSize)
+        return RDD(jrdd, self, ser)
 
     def _checkpointFile(self, name, input_deserializer):
         jrdd = self._jsc.checkpointFile(name)

http://git-wip-us.apache.org/repos/asf/spark/blob/94d1f46f/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index b84d976..e8fcc90 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -231,6 +231,13 @@ class RDD(object):
         self._jrdd_deserializer = jrdd_deserializer
         self._id = jrdd.id()
 
+    def _toPickleSerialization(self):
+        if (self._jrdd_deserializer == PickleSerializer() or
+            self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
+            return self
+        else:
+            return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
+
     def id(self):
         """
         A unique ID for this RDD (within its SparkContext).
@@ -1030,6 +1037,113 @@ class RDD(object):
         """
         return self.take(1)[0]
 
+    def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+        system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are
+        converted for output using either user specified converters or, by default,
+        L{org.apache.spark.api.python.JavaToWritableConverter}.
+
+        @param conf: Hadoop job configuration, passed in as a dict
+        @param keyConverter: (None by default)
+        @param valueConverter: (None by default)
+        """
+        jconf = self.ctx._dictToJavaMap(conf)
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+                                                    keyConverter, valueConverter, True)
+
+    def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
+                               keyConverter=None, valueConverter=None, conf=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+        system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types
+        will be inferred if not specified. Keys and values are converted for output using either
+        user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
+        C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
+        of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
+
+        @param path: path to Hadoop file
+        @param outputFormatClass: fully qualified classname of Hadoop OutputFormat
+               (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
+        @param keyClass: fully qualified classname of key Writable class
+               (e.g. "org.apache.hadoop.io.IntWritable", None by default)
+        @param valueClass: fully qualified classname of value Writable class
+               (e.g. "org.apache.hadoop.io.Text", None by default)
+        @param keyConverter: (None by default)
+        @param valueConverter: (None by default)
+        @param conf: Hadoop job configuration, passed in as a dict (None by default)
+        """
+        jconf = self.ctx._dictToJavaMap(conf)
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
+            outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf)
+
+    def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+        system, using the old Hadoop OutputFormat API (mapred package). Keys/values are
+        converted for output using either user specified converters or, by default,
+        L{org.apache.spark.api.python.JavaToWritableConverter}.
+
+        @param conf: Hadoop job configuration, passed in as a dict
+        @param keyConverter: (None by default)
+        @param valueConverter: (None by default)
+        """
+        jconf = self.ctx._dictToJavaMap(conf)
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
+                                                    keyConverter, valueConverter, False)
+
+    def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
+                         keyConverter=None, valueConverter=None, conf=None,
+                         compressionCodecClass=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+        system, using the old Hadoop OutputFormat API (mapred package). Key and value types
+        will be inferred if not specified. Keys and values are converted for output using either
+        user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
+        C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
+        of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
+
+        @param path: path to Hadoop file
+        @param outputFormatClass: fully qualified classname of Hadoop OutputFormat
+               (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat")
+        @param keyClass: fully qualified classname of key Writable class
+               (e.g. "org.apache.hadoop.io.IntWritable", None by default)
+        @param valueClass: fully qualified classname of value Writable class
+               (e.g. "org.apache.hadoop.io.Text", None by default)
+        @param keyConverter: (None by default)
+        @param valueConverter: (None by default)
+        @param conf: (None by default)
+        @param compressionCodecClass: (None by default)
+        """
+        jconf = self.ctx._dictToJavaMap(conf)
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
+            outputFormatClass, keyClass, valueClass, keyConverter, valueConverter,
+            jconf, compressionCodecClass)
+
+    def saveAsSequenceFile(self, path, compressionCodecClass=None):
+        """
+        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
+        system, using the L{org.apache.hadoop.io.Writable} types that we convert from the
+        RDD's key and value types. The mechanism is as follows:
+            1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
+            2. Keys and values of this Java RDD are converted to Writables and written out.
+
+        @param path: path to sequence file
+        @param compressionCodecClass: (None by default)
+        """
+        pickledRDD = self._toPickleSerialization()
+        batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
+        self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched,
+                                                   path, compressionCodecClass)
+
     def saveAsPickleFile(self, path, batchSize=10):
         """
         Save this RDD as a SequenceFile of serialized objects. The serializer