You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/04/17 01:21:16 UTC

[5/6] spark git commit: [SPARK-4897] [PySpark] Python 3 support

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/sort.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sort.py b/examples/src/main/python/sort.py
index bb686f1..f6b0ecb 100755
--- a/examples/src/main/python/sort.py
+++ b/examples/src/main/python/sort.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 import sys
 
 from pyspark import SparkContext
@@ -22,7 +24,7 @@ from pyspark import SparkContext
 
 if __name__ == "__main__":
     if len(sys.argv) != 2:
-        print >> sys.stderr, "Usage: sort <file>"
+        print("Usage: sort <file>", file=sys.stderr)
         exit(-1)
     sc = SparkContext(appName="PythonSort")
     lines = sc.textFile(sys.argv[1], 1)
@@ -33,6 +35,6 @@ if __name__ == "__main__":
     # In reality, we wouldn't want to collect all the data to the driver node.
     output = sortedCount.collect()
     for (num, unitcount) in output:
-        print num
+        print(num)
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/sql.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sql.py b/examples/src/main/python/sql.py
index d89361f..87d7b08 100644
--- a/examples/src/main/python/sql.py
+++ b/examples/src/main/python/sql.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 import os
 
 from pyspark import SparkContext
@@ -68,6 +70,6 @@ if __name__ == "__main__":
     teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
 
     for each in teenagers.collect():
-        print each[0]
+        print(each[0])
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/status_api_demo.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/status_api_demo.py b/examples/src/main/python/status_api_demo.py
index a33bdc4..49b7902 100644
--- a/examples/src/main/python/status_api_demo.py
+++ b/examples/src/main/python/status_api_demo.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 import time
 import threading
 import Queue
@@ -52,15 +54,15 @@ def main():
         ids = status.getJobIdsForGroup()
         for id in ids:
             job = status.getJobInfo(id)
-            print "Job", id, "status: ", job.status
+            print("Job", id, "status: ", job.status)
             for sid in job.stageIds:
                 info = status.getStageInfo(sid)
                 if info:
-                    print "Stage %d: %d tasks total (%d active, %d complete)" % \
-                          (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks)
+                    print("Stage %d: %d tasks total (%d active, %d complete)" %
+                          (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks))
         time.sleep(1)
 
-    print "Job results are:", result.get()
+    print("Job results are:", result.get())
     sc.stop()
 
 if __name__ == "__main__":

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/streaming/hdfs_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/streaming/hdfs_wordcount.py b/examples/src/main/python/streaming/hdfs_wordcount.py
index f7ffb53..f815dd2 100644
--- a/examples/src/main/python/streaming/hdfs_wordcount.py
+++ b/examples/src/main/python/streaming/hdfs_wordcount.py
@@ -25,6 +25,7 @@
 
  Then create a text file in `localdir` and the words in the file will get counted.
 """
+from __future__ import print_function
 
 import sys
 
@@ -33,7 +34,7 @@ from pyspark.streaming import StreamingContext
 
 if __name__ == "__main__":
     if len(sys.argv) != 2:
-        print >> sys.stderr, "Usage: hdfs_wordcount.py <directory>"
+        print("Usage: hdfs_wordcount.py <directory>", file=sys.stderr)
         exit(-1)
 
     sc = SparkContext(appName="PythonStreamingHDFSWordCount")

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/streaming/kafka_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/streaming/kafka_wordcount.py b/examples/src/main/python/streaming/kafka_wordcount.py
index 51e1ff8..b178e78 100644
--- a/examples/src/main/python/streaming/kafka_wordcount.py
+++ b/examples/src/main/python/streaming/kafka_wordcount.py
@@ -27,6 +27,7 @@
       spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py \
       localhost:2181 test`
 """
+from __future__ import print_function
 
 import sys
 
@@ -36,7 +37,7 @@ from pyspark.streaming.kafka import KafkaUtils
 
 if __name__ == "__main__":
     if len(sys.argv) != 3:
-        print >> sys.stderr, "Usage: kafka_wordcount.py <zk> <topic>"
+        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
         exit(-1)
 
     sc = SparkContext(appName="PythonStreamingKafkaWordCount")

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/streaming/network_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py
index cfa9c1f..2b48bcf 100644
--- a/examples/src/main/python/streaming/network_wordcount.py
+++ b/examples/src/main/python/streaming/network_wordcount.py
@@ -25,6 +25,7 @@
  and then run the example
     `$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999`
 """
+from __future__ import print_function
 
 import sys
 
@@ -33,7 +34,7 @@ from pyspark.streaming import StreamingContext
 
 if __name__ == "__main__":
     if len(sys.argv) != 3:
-        print >> sys.stderr, "Usage: network_wordcount.py <hostname> <port>"
+        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
         exit(-1)
     sc = SparkContext(appName="PythonStreamingNetworkWordCount")
     ssc = StreamingContext(sc, 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/streaming/recoverable_network_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/streaming/recoverable_network_wordcount.py b/examples/src/main/python/streaming/recoverable_network_wordcount.py
index fc6827c..ac91f0a 100644
--- a/examples/src/main/python/streaming/recoverable_network_wordcount.py
+++ b/examples/src/main/python/streaming/recoverable_network_wordcount.py
@@ -35,6 +35,7 @@
  checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
  the checkpoint data.
 """
+from __future__ import print_function
 
 import os
 import sys
@@ -46,7 +47,7 @@ from pyspark.streaming import StreamingContext
 def createContext(host, port, outputPath):
     # If you do not see this printed, that means the StreamingContext has been loaded
     # from the new checkpoint
-    print "Creating new context"
+    print("Creating new context")
     if os.path.exists(outputPath):
         os.remove(outputPath)
     sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
@@ -60,8 +61,8 @@ def createContext(host, port, outputPath):
 
     def echo(time, rdd):
         counts = "Counts at time %s %s" % (time, rdd.collect())
-        print counts
-        print "Appending to " + os.path.abspath(outputPath)
+        print(counts)
+        print("Appending to " + os.path.abspath(outputPath))
         with open(outputPath, 'a') as f:
             f.write(counts + "\n")
 
@@ -70,8 +71,8 @@ def createContext(host, port, outputPath):
 
 if __name__ == "__main__":
     if len(sys.argv) != 5:
-        print >> sys.stderr, "Usage: recoverable_network_wordcount.py <hostname> <port> "\
-                             "<checkpoint-directory> <output-file>"
+        print("Usage: recoverable_network_wordcount.py <hostname> <port> "
+              "<checkpoint-directory> <output-file>", file=sys.stderr)
         exit(-1)
     host, port, checkpoint, output = sys.argv[1:]
     ssc = StreamingContext.getOrCreate(checkpoint,

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/streaming/sql_network_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/streaming/sql_network_wordcount.py b/examples/src/main/python/streaming/sql_network_wordcount.py
index f89bc56..da90c07 100644
--- a/examples/src/main/python/streaming/sql_network_wordcount.py
+++ b/examples/src/main/python/streaming/sql_network_wordcount.py
@@ -27,6 +27,7 @@
  and then run the example
     `$ bin/spark-submit examples/src/main/python/streaming/sql_network_wordcount.py localhost 9999`
 """
+from __future__ import print_function
 
 import os
 import sys
@@ -44,7 +45,7 @@ def getSqlContextInstance(sparkContext):
 
 if __name__ == "__main__":
     if len(sys.argv) != 3:
-        print >> sys.stderr, "Usage: sql_network_wordcount.py <hostname> <port> "
+        print("Usage: sql_network_wordcount.py <hostname> <port> ", file=sys.stderr)
         exit(-1)
     host, port = sys.argv[1:]
     sc = SparkContext(appName="PythonSqlNetworkWordCount")
@@ -57,7 +58,7 @@ if __name__ == "__main__":
 
     # Convert RDDs of the words DStream to DataFrame and run SQL query
     def process(time, rdd):
-        print "========= %s =========" % str(time)
+        print("========= %s =========" % str(time))
 
         try:
             # Get the singleton instance of SQLContext

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/streaming/stateful_network_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/streaming/stateful_network_wordcount.py b/examples/src/main/python/streaming/stateful_network_wordcount.py
index 18a9a5a..16ef646 100644
--- a/examples/src/main/python/streaming/stateful_network_wordcount.py
+++ b/examples/src/main/python/streaming/stateful_network_wordcount.py
@@ -29,6 +29,7 @@
     `$ bin/spark-submit examples/src/main/python/streaming/stateful_network_wordcount.py \
         localhost 9999`
 """
+from __future__ import print_function
 
 import sys
 
@@ -37,7 +38,7 @@ from pyspark.streaming import StreamingContext
 
 if __name__ == "__main__":
     if len(sys.argv) != 3:
-        print >> sys.stderr, "Usage: stateful_network_wordcount.py <hostname> <port>"
+        print("Usage: stateful_network_wordcount.py <hostname> <port>", file=sys.stderr)
         exit(-1)
     sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
     ssc = StreamingContext(sc, 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/transitive_closure.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/transitive_closure.py b/examples/src/main/python/transitive_closure.py
index 00a281b..7bf5fb6 100755
--- a/examples/src/main/python/transitive_closure.py
+++ b/examples/src/main/python/transitive_closure.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 import sys
 from random import Random
 
@@ -49,20 +51,20 @@ if __name__ == "__main__":
     # the graph to obtain the path (x, z).
 
     # Because join() joins on keys, the edges are stored in reversed order.
-    edges = tc.map(lambda (x, y): (y, x))
+    edges = tc.map(lambda x_y: (x_y[1], x_y[0]))
 
-    oldCount = 0L
+    oldCount = 0
     nextCount = tc.count()
     while True:
         oldCount = nextCount
         # Perform the join, obtaining an RDD of (y, (z, x)) pairs,
         # then project the result to obtain the new (x, z) paths.
-        new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a))
+        new_edges = tc.join(edges).map(lambda __a_b: (__a_b[1][1], __a_b[1][0]))
         tc = tc.union(new_edges).distinct().cache()
         nextCount = tc.count()
         if nextCount == oldCount:
             break
 
-    print "TC has %i edges" % tc.count()
+    print("TC has %i edges" % tc.count())
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/wordcount.py b/examples/src/main/python/wordcount.py
index ae6cd13..7c01436 100755
--- a/examples/src/main/python/wordcount.py
+++ b/examples/src/main/python/wordcount.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 import sys
 from operator import add
 
@@ -23,7 +25,7 @@ from pyspark import SparkContext
 
 if __name__ == "__main__":
     if len(sys.argv) != 2:
-        print >> sys.stderr, "Usage: wordcount <file>"
+        print("Usage: wordcount <file>", file=sys.stderr)
         exit(-1)
     sc = SparkContext(appName="PythonWordCount")
     lines = sc.textFile(sys.argv[1], 1)
@@ -32,6 +34,6 @@ if __name__ == "__main__":
                   .reduceByKey(add)
     output = counts.collect()
     for (word, count) in output:
-        print "%s: %i" % (word, count)
+        print("%s: %i" % (word, count))
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/mllib/src/main/scala/org/apache/spark/mllib/api/python/MatrixFactorizationModelWrapper.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/MatrixFactorizationModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/MatrixFactorizationModelWrapper.scala
index ecd3b16..534edac 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/MatrixFactorizationModelWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/MatrixFactorizationModelWrapper.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.mllib.api.python
 
 import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, Rating}
 import org.apache.spark.rdd.RDD
 
@@ -31,10 +32,14 @@ private[python] class MatrixFactorizationModelWrapper(model: MatrixFactorization
     predict(SerDe.asTupleRDD(userAndProducts.rdd))
 
   def getUserFeatures: RDD[Array[Any]] = {
-    SerDe.fromTuple2RDD(userFeatures.asInstanceOf[RDD[(Any, Any)]])
+    SerDe.fromTuple2RDD(userFeatures.map {
+      case (user, feature) => (user, Vectors.dense(feature))
+    }.asInstanceOf[RDD[(Any, Any)]])
   }
 
   def getProductFeatures: RDD[Array[Any]] = {
-    SerDe.fromTuple2RDD(productFeatures.asInstanceOf[RDD[(Any, Any)]])
+    SerDe.fromTuple2RDD(productFeatures.map {
+      case (product, feature) => (product, Vectors.dense(feature))
+    }.asInstanceOf[RDD[(Any, Any)]])
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index ab15f0f..f976d2f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -28,7 +28,6 @@ import scala.reflect.ClassTag
 
 import net.razorvine.pickle._
 
-import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.api.python.SerDeUtil
 import org.apache.spark.mllib.classification._
@@ -40,15 +39,15 @@ import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.random.{RandomRDDs => RG}
 import org.apache.spark.mllib.recommendation._
 import org.apache.spark.mllib.regression._
-import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
 import org.apache.spark.mllib.stat.correlation.CorrelationNames
 import org.apache.spark.mllib.stat.distribution.MultivariateGaussian
 import org.apache.spark.mllib.stat.test.ChiSqTestResult
-import org.apache.spark.mllib.tree.{GradientBoostedTrees, RandomForest, DecisionTree}
-import org.apache.spark.mllib.tree.configuration.{BoostingStrategy, Algo, Strategy}
+import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
+import org.apache.spark.mllib.tree.configuration.{Algo, BoostingStrategy, Strategy}
 import org.apache.spark.mllib.tree.impurity._
 import org.apache.spark.mllib.tree.loss.Losses
-import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel, RandomForestModel, DecisionTreeModel}
+import org.apache.spark.mllib.tree.model.{DecisionTreeModel, GradientBoostedTreesModel, RandomForestModel}
+import org.apache.spark.mllib.tree.{DecisionTree, GradientBoostedTrees, RandomForest}
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
@@ -279,7 +278,7 @@ private[python] class PythonMLLibAPI extends Serializable {
       data: JavaRDD[LabeledPoint],
       lambda: Double): JList[Object] = {
     val model = NaiveBayes.train(data.rdd, lambda)
-    List(Vectors.dense(model.labels), Vectors.dense(model.pi), model.theta).
+    List(Vectors.dense(model.labels), Vectors.dense(model.pi), model.theta.map(Vectors.dense)).
       map(_.asInstanceOf[Object]).asJava
   }
 
@@ -335,7 +334,7 @@ private[python] class PythonMLLibAPI extends Serializable {
           mu += model.gaussians(i).mu
           sigma += model.gaussians(i).sigma
       }    
-      List(wt.toArray, mu.toArray, sigma.toArray).map(_.asInstanceOf[Object]).asJava
+      List(Vectors.dense(wt.toArray), mu.toArray, sigma.toArray).map(_.asInstanceOf[Object]).asJava
     } finally {
       data.rdd.unpersist(blocking = false)
     }
@@ -346,20 +345,20 @@ private[python] class PythonMLLibAPI extends Serializable {
    */
   def predictSoftGMM(
       data: JavaRDD[Vector],
-      wt: Object,
+      wt: Vector,
       mu: Array[Object],
-      si: Array[Object]):  RDD[Array[Double]]  = {
+      si: Array[Object]):  RDD[Vector]  = {
 
-      val weight = wt.asInstanceOf[Array[Double]]
+      val weight = wt.toArray
       val mean = mu.map(_.asInstanceOf[DenseVector])
       val sigma = si.map(_.asInstanceOf[DenseMatrix])
       val gaussians = Array.tabulate(weight.length){
         i => new MultivariateGaussian(mean(i), sigma(i))
       }      
       val model = new GaussianMixtureModel(weight, gaussians)
-      model.predictSoft(data)
+      model.predictSoft(data).map(Vectors.dense)
   }
-  
+
   /**
    * Java stub for Python mllib ALS.train().  This stub returns a handle
    * to the Java object instead of the content of the Java object.  Extra care
@@ -936,6 +935,14 @@ private[spark] object SerDe extends Serializable {
       out.write(code)
     }
 
+    protected def getBytes(obj: Object): Array[Byte] = {
+      if (obj.getClass.isArray) {
+        obj.asInstanceOf[Array[Byte]]
+      } else {
+        obj.asInstanceOf[String].getBytes(LATIN1)
+      }
+    }
+
     private[python] def saveState(obj: Object, out: OutputStream, pickler: Pickler)
   }
 
@@ -961,7 +968,7 @@ private[spark] object SerDe extends Serializable {
       if (args.length != 1) {
         throw new PickleException("should be 1")
       }
-      val bytes = args(0).asInstanceOf[String].getBytes(LATIN1)
+      val bytes = getBytes(args(0))
       val bb = ByteBuffer.wrap(bytes, 0, bytes.length)
       bb.order(ByteOrder.nativeOrder())
       val db = bb.asDoubleBuffer()
@@ -994,7 +1001,7 @@ private[spark] object SerDe extends Serializable {
       if (args.length != 3) {
         throw new PickleException("should be 3")
       }
-      val bytes = args(2).asInstanceOf[String].getBytes(LATIN1)
+      val bytes = getBytes(args(2))
       val n = bytes.length / 8
       val values = new Array[Double](n)
       val order = ByteOrder.nativeOrder()
@@ -1031,8 +1038,8 @@ private[spark] object SerDe extends Serializable {
         throw new PickleException("should be 3")
       }
       val size = args(0).asInstanceOf[Int]
-      val indiceBytes = args(1).asInstanceOf[String].getBytes(LATIN1)
-      val valueBytes = args(2).asInstanceOf[String].getBytes(LATIN1)
+      val indiceBytes = getBytes(args(1))
+      val valueBytes = getBytes(args(2))
       val n = indiceBytes.length / 4
       val indices = new Array[Int](n)
       val values = new Array[Double](n)

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/accumulators.py
----------------------------------------------------------------------
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index ccbca67..7271809 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -54,7 +54,7 @@
 ...     def zero(self, value):
 ...         return [0.0] * len(value)
 ...     def addInPlace(self, val1, val2):
-...         for i in xrange(len(val1)):
+...         for i in range(len(val1)):
 ...              val1[i] += val2[i]
 ...         return val1
 >>> va = sc.accumulator([1.0, 2.0, 3.0], VectorAccumulatorParam())
@@ -86,9 +86,13 @@ Traceback (most recent call last):
 Exception:...
 """
 
+import sys
 import select
 import struct
-import SocketServer
+if sys.version < '3':
+    import SocketServer
+else:
+    import socketserver as SocketServer
 import threading
 from pyspark.cloudpickle import CloudPickler
 from pyspark.serializers import read_int, PickleSerializer
@@ -247,6 +251,7 @@ class AccumulatorServer(SocketServer.TCPServer):
     def shutdown(self):
         self.server_shutdown = True
         SocketServer.TCPServer.shutdown(self)
+        self.server_close()
 
 
 def _start_update_server():

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/broadcast.py
----------------------------------------------------------------------
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index 6b8a8b2..3de4615 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -16,10 +16,15 @@
 #
 
 import os
-import cPickle
+import sys
 import gc
 from tempfile import NamedTemporaryFile
 
+if sys.version < '3':
+    import cPickle as pickle
+else:
+    import pickle
+    unicode = str
 
 __all__ = ['Broadcast']
 
@@ -70,33 +75,19 @@ class Broadcast(object):
             self._path = path
 
     def dump(self, value, f):
-        if isinstance(value, basestring):
-            if isinstance(value, unicode):
-                f.write('U')
-                value = value.encode('utf8')
-            else:
-                f.write('S')
-            f.write(value)
-        else:
-            f.write('P')
-            cPickle.dump(value, f, 2)
+        pickle.dump(value, f, 2)
         f.close()
         return f.name
 
     def load(self, path):
         with open(path, 'rb', 1 << 20) as f:
-            flag = f.read(1)
-            data = f.read()
-            if flag == 'P':
-                # cPickle.loads() may create lots of objects, disable GC
-                # temporary for better performance
-                gc.disable()
-                try:
-                    return cPickle.loads(data)
-                finally:
-                    gc.enable()
-            else:
-                return data.decode('utf8') if flag == 'U' else data
+            # pickle.load() may create lots of objects, disable GC
+            # temporary for better performance
+            gc.disable()
+            try:
+                return pickle.load(f)
+            finally:
+                gc.enable()
 
     @property
     def value(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/cloudpickle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index bb07835..9ef9307 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -40,164 +40,126 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 """
-
+from __future__ import print_function
 
 import operator
 import os
+import io
 import pickle
 import struct
 import sys
 import types
 from functools import partial
 import itertools
-from copy_reg import _extension_registry, _inverted_registry, _extension_cache
-import new
 import dis
 import traceback
-import platform
-
-PyImp = platform.python_implementation()
-
 
-import logging
-cloudLog = logging.getLogger("Cloud.Transport")
+if sys.version < '3':
+    from pickle import Pickler
+    try:
+        from cStringIO import StringIO
+    except ImportError:
+        from StringIO import StringIO
+    PY3 = False
+else:
+    types.ClassType = type
+    from pickle import _Pickler as Pickler
+    from io import BytesIO as StringIO
+    PY3 = True
 
 #relevant opcodes
-STORE_GLOBAL = chr(dis.opname.index('STORE_GLOBAL'))
-DELETE_GLOBAL = chr(dis.opname.index('DELETE_GLOBAL'))
-LOAD_GLOBAL = chr(dis.opname.index('LOAD_GLOBAL'))
+STORE_GLOBAL = dis.opname.index('STORE_GLOBAL')
+DELETE_GLOBAL = dis.opname.index('DELETE_GLOBAL')
+LOAD_GLOBAL = dis.opname.index('LOAD_GLOBAL')
 GLOBAL_OPS = [STORE_GLOBAL, DELETE_GLOBAL, LOAD_GLOBAL]
+HAVE_ARGUMENT = dis.HAVE_ARGUMENT
+EXTENDED_ARG = dis.EXTENDED_ARG
 
-HAVE_ARGUMENT = chr(dis.HAVE_ARGUMENT)
-EXTENDED_ARG = chr(dis.EXTENDED_ARG)
-
-if PyImp == "PyPy":
-    # register builtin type in `new`
-    new.method = types.MethodType
-
-try:
-    from cStringIO import StringIO
-except ImportError:
-    from StringIO import StringIO
 
-# These helper functions were copied from PiCloud's util module.
 def islambda(func):
-    return getattr(func,'func_name') == '<lambda>'
+    return getattr(func,'__name__') == '<lambda>'
 
-def xrange_params(xrangeobj):
-    """Returns a 3 element tuple describing the xrange start, step, and len
-    respectively
 
-    Note: Only guarentees that elements of xrange are the same. parameters may
-    be different.
-    e.g. xrange(1,1) is interpretted as xrange(0,0); both behave the same
-    though w/ iteration
-    """
-
-    xrange_len = len(xrangeobj)
-    if not xrange_len: #empty
-        return (0,1,0)
-    start = xrangeobj[0]
-    if xrange_len == 1: #one element
-        return start, 1, 1
-    return (start, xrangeobj[1] - xrangeobj[0], xrange_len)
-
-#debug variables intended for developer use:
-printSerialization = False
-printMemoization = False
+_BUILTIN_TYPE_NAMES = {}
+for k, v in types.__dict__.items():
+    if type(v) is type:
+        _BUILTIN_TYPE_NAMES[v] = k
 
-useForcedImports = True #Should I use forced imports for tracking?
 
+def _builtin_type(name):
+    return getattr(types, name)
 
 
-class CloudPickler(pickle.Pickler):
+class CloudPickler(Pickler):
 
-    dispatch = pickle.Pickler.dispatch.copy()
-    savedForceImports = False
-    savedDjangoEnv = False #hack tro transport django environment
+    dispatch = Pickler.dispatch.copy()
 
-    def __init__(self, file, protocol=None, min_size_to_save= 0):
-        pickle.Pickler.__init__(self,file,protocol)
-        self.modules = set() #set of modules needed to depickle
-        self.globals_ref = {}  # map ids to dictionary. used to ensure that functions can share global env
+    def __init__(self, file, protocol=None):
+        Pickler.__init__(self, file, protocol)
+        # set of modules to unpickle
+        self.modules = set()
+        # map ids to dictionary. used to ensure that functions can share global env
+        self.globals_ref = {}
 
     def dump(self, obj):
-        # note: not thread safe
-        # minimal side-effects, so not fixing
-        recurse_limit = 3000
-        base_recurse = sys.getrecursionlimit()
-        if base_recurse < recurse_limit:
-            sys.setrecursionlimit(recurse_limit)
         self.inject_addons()
         try:
-            return pickle.Pickler.dump(self, obj)
-        except RuntimeError, e:
+            return Pickler.dump(self, obj)
+        except RuntimeError as e:
             if 'recursion' in e.args[0]:
-                msg = """Could not pickle object as excessively deep recursion required.
-                Try _fast_serialization=2 or contact PiCloud support"""
+                msg = """Could not pickle object as excessively deep recursion required."""
                 raise pickle.PicklingError(msg)
-        finally:
-            new_recurse = sys.getrecursionlimit()
-            if new_recurse == recurse_limit:
-                sys.setrecursionlimit(base_recurse)
+
+    def save_memoryview(self, obj):
+        """Fallback to save_string"""
+        Pickler.save_string(self, str(obj))
 
     def save_buffer(self, obj):
         """Fallback to save_string"""
-        pickle.Pickler.save_string(self,str(obj))
-    dispatch[buffer] = save_buffer
+        Pickler.save_string(self,str(obj))
+    if PY3:
+        dispatch[memoryview] = save_memoryview
+    else:
+        dispatch[buffer] = save_buffer
 
-    #block broken objects
-    def save_unsupported(self, obj, pack=None):
+    def save_unsupported(self, obj):
         raise pickle.PicklingError("Cannot pickle objects of type %s" % type(obj))
     dispatch[types.GeneratorType] = save_unsupported
 
-    #python2.6+ supports slice pickling. some py2.5 extensions might as well.  We just test it
-    try:
-        slice(0,1).__reduce__()
-    except TypeError: #can't pickle -
-        dispatch[slice] = save_unsupported
-
-    #itertools objects do not pickle!
+    # itertools objects do not pickle!
     for v in itertools.__dict__.values():
         if type(v) is type:
             dispatch[v] = save_unsupported
 
-
-    def save_dict(self, obj):
-        """hack fix
-        If the dict is a global, deal with it in a special way
-        """
-        #print 'saving', obj
-        if obj is __builtins__:
-            self.save_reduce(_get_module_builtins, (), obj=obj)
-        else:
-            pickle.Pickler.save_dict(self, obj)
-    dispatch[pickle.DictionaryType] = save_dict
-
-
-    def save_module(self, obj, pack=struct.pack):
+    def save_module(self, obj):
         """
         Save a module as an import
         """
-        #print 'try save import', obj.__name__
         self.modules.add(obj)
-        self.save_reduce(subimport,(obj.__name__,), obj=obj)
-    dispatch[types.ModuleType] = save_module    #new type
+        self.save_reduce(subimport, (obj.__name__,), obj=obj)
+    dispatch[types.ModuleType] = save_module
 
-    def save_codeobject(self, obj, pack=struct.pack):
+    def save_codeobject(self, obj):
         """
         Save a code object
         """
-        #print 'try to save codeobj: ', obj
-        args = (
-            obj.co_argcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code,
-            obj.co_consts, obj.co_names, obj.co_varnames, obj.co_filename, obj.co_name,
-            obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, obj.co_cellvars
-        )
+        if PY3:
+            args = (
+                obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize,
+                obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, obj.co_varnames,
+                obj.co_filename, obj.co_name, obj.co_firstlineno, obj.co_lnotab, obj.co_freevars,
+                obj.co_cellvars
+            )
+        else:
+            args = (
+                obj.co_argcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code,
+                obj.co_consts, obj.co_names, obj.co_varnames, obj.co_filename, obj.co_name,
+                obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, obj.co_cellvars
+            )
         self.save_reduce(types.CodeType, args, obj=obj)
-    dispatch[types.CodeType] = save_codeobject    #new type
+    dispatch[types.CodeType] = save_codeobject
 
-    def save_function(self, obj, name=None, pack=struct.pack):
+    def save_function(self, obj, name=None):
         """ Registered with the dispatch to handle all function types.
 
         Determines what kind of function obj is (e.g. lambda, defined at
@@ -205,12 +167,14 @@ class CloudPickler(pickle.Pickler):
         """
         write = self.write
 
-        name = obj.__name__
+        if name is None:
+            name = obj.__name__
         modname = pickle.whichmodule(obj, name)
-        #print 'which gives %s %s %s' % (modname, obj, name)
+        # print('which gives %s %s %s' % (modname, obj, name))
         try:
             themodule = sys.modules[modname]
-        except KeyError: # eval'd items such as namedtuple give invalid items for their function __module__
+        except KeyError:
+            # eval'd items such as namedtuple give invalid items for their function __module__
             modname = '__main__'
 
         if modname == '__main__':
@@ -221,37 +185,18 @@ class CloudPickler(pickle.Pickler):
             if getattr(themodule, name, None) is obj:
                 return self.save_global(obj, name)
 
-        if not self.savedDjangoEnv:
-            #hack for django - if we detect the settings module, we transport it
-            django_settings = os.environ.get('DJANGO_SETTINGS_MODULE', '')
-            if django_settings:
-                django_mod = sys.modules.get(django_settings)
-                if django_mod:
-                    cloudLog.debug('Transporting django settings %s during save of %s', django_mod, name)
-                    self.savedDjangoEnv = True
-                    self.modules.add(django_mod)
-                    write(pickle.MARK)
-                    self.save_reduce(django_settings_load, (django_mod.__name__,), obj=django_mod)
-                    write(pickle.POP_MARK)
-
-
         # if func is lambda, def'ed at prompt, is in main, or is nested, then
         # we'll pickle the actual function object rather than simply saving a
         # reference (as is done in default pickler), via save_function_tuple.
-        if islambda(obj) or obj.func_code.co_filename == '<stdin>' or themodule is None:
-            #Force server to import modules that have been imported in main
-            modList = None
-            if themodule is None and not self.savedForceImports:
-                mainmod = sys.modules['__main__']
-                if useForcedImports and hasattr(mainmod,'___pyc_forcedImports__'):
-                    modList = list(mainmod.___pyc_forcedImports__)
-                self.savedForceImports = True
-            self.save_function_tuple(obj, modList)
+        if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
+            #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
+            self.save_function_tuple(obj)
             return
-        else:   # func is nested
+        else:
+            # func is nested
             klass = getattr(themodule, name, None)
             if klass is None or klass is not obj:
-                self.save_function_tuple(obj, [themodule])
+                self.save_function_tuple(obj)
                 return
 
         if obj.__dict__:
@@ -266,7 +211,7 @@ class CloudPickler(pickle.Pickler):
             self.memoize(obj)
     dispatch[types.FunctionType] = save_function
 
-    def save_function_tuple(self, func, forced_imports):
+    def save_function_tuple(self, func):
         """  Pickles an actual func object.
 
         A func comprises: code, globals, defaults, closure, and dict.  We
@@ -281,19 +226,6 @@ class CloudPickler(pickle.Pickler):
         save = self.save
         write = self.write
 
-        # save the modules (if any)
-        if forced_imports:
-            write(pickle.MARK)
-            save(_modules_to_main)
-            #print 'forced imports are', forced_imports
-
-            forced_names = map(lambda m: m.__name__, forced_imports)
-            save((forced_names,))
-
-            #save((forced_imports,))
-            write(pickle.REDUCE)
-            write(pickle.POP_MARK)
-
         code, f_globals, defaults, closure, dct, base_globals = self.extract_func_data(func)
 
         save(_fill_function)  # skeleton function updater
@@ -318,6 +250,8 @@ class CloudPickler(pickle.Pickler):
         Find all globals names read or written to by codeblock co
         """
         code = co.co_code
+        if not PY3:
+            code = [ord(c) for c in code]
         names = co.co_names
         out_names = set()
 
@@ -327,18 +261,18 @@ class CloudPickler(pickle.Pickler):
         while i < n:
             op = code[i]
 
-            i = i+1
+            i += 1
             if op >= HAVE_ARGUMENT:
-                oparg = ord(code[i]) + ord(code[i+1])*256 + extended_arg
+                oparg = code[i] + code[i+1] * 256 + extended_arg
                 extended_arg = 0
-                i = i+2
+                i += 2
                 if op == EXTENDED_ARG:
-                    extended_arg = oparg*65536L
+                    extended_arg = oparg*65536
                 if op in GLOBAL_OPS:
                     out_names.add(names[oparg])
-        #print 'extracted', out_names, ' from ', names
 
-        if co.co_consts:   # see if nested function have any global refs
+        # see if nested function have any global refs
+        if co.co_consts:
             for const in co.co_consts:
                 if type(const) is types.CodeType:
                     out_names |= CloudPickler.extract_code_globals(const)
@@ -350,46 +284,28 @@ class CloudPickler(pickle.Pickler):
         Turn the function into a tuple of data necessary to recreate it:
             code, globals, defaults, closure, dict
         """
-        code = func.func_code
+        code = func.__code__
 
         # extract all global ref's
-        func_global_refs = CloudPickler.extract_code_globals(code)
+        func_global_refs = self.extract_code_globals(code)
 
         # process all variables referenced by global environment
         f_globals = {}
         for var in func_global_refs:
-            #Some names, such as class functions are not global - we don't need them
-            if func.func_globals.has_key(var):
-                f_globals[var] = func.func_globals[var]
+            if var in func.__globals__:
+                f_globals[var] = func.__globals__[var]
 
         # defaults requires no processing
-        defaults = func.func_defaults
-
-        def get_contents(cell):
-            try:
-                return cell.cell_contents
-            except ValueError, e: #cell is empty error on not yet assigned
-                raise pickle.PicklingError('Function to be pickled has free variables that are referenced before assignment in enclosing scope')
-
+        defaults = func.__defaults__
 
         # process closure
-        if func.func_closure:
-            closure = map(get_contents, func.func_closure)
-        else:
-            closure = []
+        closure = [c.cell_contents for c in func.__closure__] if func.__closure__ else []
 
         # save the dict
-        dct = func.func_dict
-
-        if printSerialization:
-            outvars = ['code: ' + str(code) ]
-            outvars.append('globals: ' + str(f_globals))
-            outvars.append('defaults: ' + str(defaults))
-            outvars.append('closure: ' + str(closure))
-            print 'function ', func, 'is extracted to: ', ', '.join(outvars)
+        dct = func.__dict__
 
-        base_globals = self.globals_ref.get(id(func.func_globals), {})
-        self.globals_ref[id(func.func_globals)] = base_globals
+        base_globals = self.globals_ref.get(id(func.__globals__), {})
+        self.globals_ref[id(func.__globals__)] = base_globals
 
         return (code, f_globals, defaults, closure, dct, base_globals)
 
@@ -400,8 +316,9 @@ class CloudPickler(pickle.Pickler):
     dispatch[types.BuiltinFunctionType] = save_builtin_function
 
     def save_global(self, obj, name=None, pack=struct.pack):
-        write = self.write
-        memo = self.memo
+        if obj.__module__ == "__builtin__" or obj.__module__ == "builtins":
+            if obj in _BUILTIN_TYPE_NAMES:
+                return self.save_reduce(_builtin_type, (_BUILTIN_TYPE_NAMES[obj],), obj=obj)
 
         if name is None:
             name = obj.__name__
@@ -410,98 +327,57 @@ class CloudPickler(pickle.Pickler):
         if modname is None:
             modname = pickle.whichmodule(obj, name)
 
-        try:
-            __import__(modname)
-            themodule = sys.modules[modname]
-        except (ImportError, KeyError, AttributeError):  #should never occur
-            raise pickle.PicklingError(
-                "Can't pickle %r: Module %s cannot be found" %
-                (obj, modname))
-
         if modname == '__main__':
             themodule = None
-
-        if themodule:
+        else:
+            __import__(modname)
+            themodule = sys.modules[modname]
             self.modules.add(themodule)
 
-        sendRef = True
-        typ = type(obj)
-        #print 'saving', obj, typ
-        try:
-            try: #Deal with case when getattribute fails with exceptions
-                klass = getattr(themodule, name)
-            except (AttributeError):
-                if modname == '__builtin__':  #new.* are misrepeported
-                    modname = 'new'
-                    __import__(modname)
-                    themodule = sys.modules[modname]
-                    try:
-                        klass = getattr(themodule, name)
-                    except AttributeError, a:
-                        # print themodule, name, obj, type(obj)
-                        raise pickle.PicklingError("Can't pickle builtin %s" % obj)
-                else:
-                    raise
+        if hasattr(themodule, name) and getattr(themodule, name) is obj:
+            return Pickler.save_global(self, obj, name)
 
-        except (ImportError, KeyError, AttributeError):
-            if typ == types.TypeType or typ == types.ClassType:
-                sendRef = False
-            else: #we can't deal with this
-                raise
-        else:
-            if klass is not obj and (typ == types.TypeType or typ == types.ClassType):
-                sendRef = False
-        if not sendRef:
-            #note: Third party types might crash this - add better checks!
-            d = dict(obj.__dict__) #copy dict proxy to a dict
-            if not isinstance(d.get('__dict__', None), property): # don't extract dict that are properties
-                d.pop('__dict__',None)
-            d.pop('__weakref__',None)
+        typ = type(obj)
+        if typ is not obj and isinstance(obj, (type, types.ClassType)):
+            d = dict(obj.__dict__)  # copy dict proxy to a dict
+            if not isinstance(d.get('__dict__', None), property):
+                # don't extract dict that are properties
+                d.pop('__dict__', None)
+            d.pop('__weakref__', None)
 
             # hack as __new__ is stored differently in the __dict__
             new_override = d.get('__new__', None)
             if new_override:
                 d['__new__'] = obj.__new__
 
-            self.save_reduce(type(obj),(obj.__name__,obj.__bases__,
-                                   d),obj=obj)
-            #print 'internal reduce dask %s %s'  % (obj, d)
-            return
-
-        if self.proto >= 2:
-            code = _extension_registry.get((modname, name))
-            if code:
-                assert code > 0
-                if code <= 0xff:
-                    write(pickle.EXT1 + chr(code))
-                elif code <= 0xffff:
-                    write("%c%c%c" % (pickle.EXT2, code&0xff, code>>8))
-                else:
-                    write(pickle.EXT4 + pack("<i", code))
-                return
+            self.save_reduce(typ, (obj.__name__, obj.__bases__, d), obj=obj)
+        else:
+            raise pickle.PicklingError("Can't pickle %r" % obj)
 
-        write(pickle.GLOBAL + modname + '\n' + name + '\n')
-        self.memoize(obj)
+    dispatch[type] = save_global
     dispatch[types.ClassType] = save_global
-    dispatch[types.TypeType] = save_global
 
     def save_instancemethod(self, obj):
-        #Memoization rarely is ever useful due to python bounding
-        self.save_reduce(types.MethodType, (obj.im_func, obj.im_self,obj.im_class), obj=obj)
+        # Memoization rarely is ever useful due to python bounding
+        if PY3:
+            self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
+        else:
+            self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__),
+                         obj=obj)
     dispatch[types.MethodType] = save_instancemethod
 
-    def save_inst_logic(self, obj):
+    def save_inst(self, obj):
         """Inner logic to save instance. Based off pickle.save_inst
         Supports __transient__"""
         cls = obj.__class__
 
-        memo  = self.memo
+        memo = self.memo
         write = self.write
-        save  = self.save
+        save = self.save
 
         if hasattr(obj, '__getinitargs__'):
             args = obj.__getinitargs__()
-            len(args) # XXX Assert it's a sequence
+            len(args)  # XXX Assert it's a sequence
             pickle._keep_alive(args, memo)
         else:
             args = ()
@@ -537,15 +413,8 @@ class CloudPickler(pickle.Pickler):
         save(stuff)
         write(pickle.BUILD)
 
-
-    def save_inst(self, obj):
-        # Hack to detect PIL Image instances without importing Imaging
-        # PIL can be loaded with multiple names, so we don't check sys.modules for it
-        if hasattr(obj,'im') and hasattr(obj,'palette') and 'Image' in obj.__module__:
-            self.save_image(obj)
-        else:
-            self.save_inst_logic(obj)
-    dispatch[types.InstanceType] = save_inst
+    if not PY3:
+        dispatch[types.InstanceType] = save_inst
 
     def save_property(self, obj):
         # properties not correctly saved in python
@@ -592,7 +461,7 @@ class CloudPickler(pickle.Pickler):
         """Modified to support __transient__ on new objects
         Change only affects protocol level 2 (which is always used by PiCloud"""
         # Assert that args is a tuple or None
-        if not isinstance(args, types.TupleType):
+        if not isinstance(args, tuple):
             raise pickle.PicklingError("args from reduce() should be a tuple")
 
         # Assert that func is callable
@@ -646,35 +515,23 @@ class CloudPickler(pickle.Pickler):
             self._batch_setitems(dictitems)
 
         if state is not None:
-            #print 'obj %s has state %s' % (obj, state)
             save(state)
             write(pickle.BUILD)
 
-
-    def save_xrange(self, obj):
-        """Save an xrange object in python 2.5
-        Python 2.6 supports this natively
-        """
-        range_params = xrange_params(obj)
-        self.save_reduce(_build_xrange,range_params)
-
-    #python2.6+ supports xrange pickling. some py2.5 extensions might as well.  We just test it
-    try:
-        xrange(0).__reduce__()
-    except TypeError: #can't pickle -- use PiCloud pickler
-        dispatch[xrange] = save_xrange
-
     def save_partial(self, obj):
         """Partial objects do not serialize correctly in python2.x -- this fixes the bugs"""
         self.save_reduce(_genpartial, (obj.func, obj.args, obj.keywords))
 
-    if sys.version_info < (2,7): #2.7 supports partial pickling
+    if sys.version_info < (2,7):  # 2.7 supports partial pickling
         dispatch[partial] = save_partial
 
 
     def save_file(self, obj):
         """Save a file"""
-        import StringIO as pystringIO #we can't use cStringIO as it lacks the name attribute
+        try:
+            import StringIO as pystringIO #we can't use cStringIO as it lacks the name attribute
+        except ImportError:
+            import io as pystringIO
 
         if not hasattr(obj, 'name') or  not hasattr(obj, 'mode'):
             raise pickle.PicklingError("Cannot pickle files that do not map to an actual file")
@@ -720,10 +577,14 @@ class CloudPickler(pickle.Pickler):
             retval.seek(curloc)
 
         retval.name = name
-        self.save(retval)  #save stringIO
+        self.save(retval)
         self.memoize(obj)
 
-    dispatch[file] = save_file
+    if PY3:
+        dispatch[io.TextIOWrapper] = save_file
+    else:
+        dispatch[file] = save_file
+
     """Special functions for Add-on libraries"""
 
     def inject_numpy(self):
@@ -732,76 +593,20 @@ class CloudPickler(pickle.Pickler):
             return
         self.dispatch[numpy.ufunc] = self.__class__.save_ufunc
 
-    numpy_tst_mods = ['numpy', 'scipy.special']
     def save_ufunc(self, obj):
         """Hack function for saving numpy ufunc objects"""
         name = obj.__name__
-        for tst_mod_name in self.numpy_tst_mods:
+        numpy_tst_mods = ['numpy', 'scipy.special']
+        for tst_mod_name in numpy_tst_mods:
             tst_mod = sys.modules.get(tst_mod_name, None)
-            if tst_mod:
-                if name in tst_mod.__dict__:
-                    self.save_reduce(_getobject, (tst_mod_name, name))
-                    return
-        raise pickle.PicklingError('cannot save %s. Cannot resolve what module it is defined in' % str(obj))
-
-    def inject_timeseries(self):
-        """Handle bugs with pickling scikits timeseries"""
-        tseries = sys.modules.get('scikits.timeseries.tseries')
-        if not tseries or not hasattr(tseries, 'Timeseries'):
-            return
-        self.dispatch[tseries.Timeseries] = self.__class__.save_timeseries
-
-    def save_timeseries(self, obj):
-        import scikits.timeseries.tseries as ts
-
-        func, reduce_args, state = obj.__reduce__()
-        if func != ts._tsreconstruct:
-            raise pickle.PicklingError('timeseries using unexpected reconstruction function %s' % str(func))
-        state = (1,
-                         obj.shape,
-                         obj.dtype,
-                         obj.flags.fnc,
-                         obj._data.tostring(),
-                         ts.getmaskarray(obj).tostring(),
-                         obj._fill_value,
-                         obj._dates.shape,
-                         obj._dates.__array__().tostring(),
-                         obj._dates.dtype, #added -- preserve type
-                         obj.freq,
-                         obj._optinfo,
-                         )
-        return self.save_reduce(_genTimeSeries, (reduce_args, state))
-
-    def inject_email(self):
-        """Block email LazyImporters from being saved"""
-        email = sys.modules.get('email')
-        if not email:
-            return
-        self.dispatch[email.LazyImporter] = self.__class__.save_unsupported
+            if tst_mod and name in tst_mod.__dict__:
+                return self.save_reduce(_getobject, (tst_mod_name, name))
+        raise pickle.PicklingError('cannot save %s. Cannot resolve what module it is defined in'
+                                   % str(obj))
 
     def inject_addons(self):
         """Plug in system. Register additional pickling functions if modules already loaded"""
         self.inject_numpy()
-        self.inject_timeseries()
-        self.inject_email()
-
-    """Python Imaging Library"""
-    def save_image(self, obj):
-        if not obj.im and obj.fp and 'r' in obj.fp.mode and obj.fp.name \
-            and not obj.fp.closed and (not hasattr(obj, 'isatty') or not obj.isatty()):
-            #if image not loaded yet -- lazy load
-            self.save_reduce(_lazyloadImage,(obj.fp,), obj=obj)
-        else:
-            #image is loaded - just transmit it over
-            self.save_reduce(_generateImage, (obj.size, obj.mode, obj.tostring()), obj=obj)
-
-    """
-    def memoize(self, obj):
-        pickle.Pickler.memoize(self, obj)
-        if printMemoization:
-            print 'memoizing ' + str(obj)
-    """
-
 
 
 # Shorthands for legacy support
@@ -809,14 +614,13 @@ class CloudPickler(pickle.Pickler):
 def dump(obj, file, protocol=2):
     CloudPickler(file, protocol).dump(obj)
 
+
 def dumps(obj, protocol=2):
     file = StringIO()
 
     cp = CloudPickler(file,protocol)
     cp.dump(obj)
 
-    #print 'cloud dumped', str(obj), str(cp.modules)
-
     return file.getvalue()
 
 
@@ -825,25 +629,6 @@ def subimport(name):
     __import__(name)
     return sys.modules[name]
 
-#hack to load django settings:
-def django_settings_load(name):
-    modified_env = False
-
-    if 'DJANGO_SETTINGS_MODULE' not in os.environ:
-        os.environ['DJANGO_SETTINGS_MODULE'] = name # must set name first due to circular deps
-        modified_env = True
-    try:
-        module = subimport(name)
-    except Exception, i:
-        print >> sys.stderr, 'Cloud not import django settings %s:' % (name)
-        print_exec(sys.stderr)
-        if modified_env:
-            del os.environ['DJANGO_SETTINGS_MODULE']
-    else:
-        #add project directory to sys,path:
-        if hasattr(module,'__file__'):
-            dirname = os.path.split(module.__file__)[0] + '/'
-            sys.path.append(dirname)
 
 # restores function attributes
 def _restore_attr(obj, attr):
@@ -851,13 +636,16 @@ def _restore_attr(obj, attr):
         setattr(obj, key, val)
     return obj
 
+
 def _get_module_builtins():
     return pickle.__builtins__
 
+
 def print_exec(stream):
     ei = sys.exc_info()
     traceback.print_exception(ei[0], ei[1], ei[2], None, stream)
 
+
 def _modules_to_main(modList):
     """Force every module in modList to be placed into main"""
     if not modList:
@@ -868,22 +656,16 @@ def _modules_to_main(modList):
         if type(modname) is str:
             try:
                 mod = __import__(modname)
-            except Exception, i: #catch all...
-                sys.stderr.write('warning: could not import %s\n.  Your function may unexpectedly error due to this import failing; \
-A version mismatch is likely.  Specific error was:\n' % modname)
+            except Exception as e:
+                sys.stderr.write('warning: could not import %s\n.  '
+                                 'Your function may unexpectedly error due to this import failing;'
+                                 'A version mismatch is likely.  Specific error was:\n' % modname)
                 print_exec(sys.stderr)
             else:
-                setattr(main,mod.__name__, mod)
-        else:
-            #REVERSE COMPATIBILITY FOR CLOUD CLIENT 1.5 (WITH EPD)
-            #In old version actual module was sent
-            setattr(main,modname.__name__, modname)
+                setattr(main, mod.__name__, mod)
 
-#object generators:
-def _build_xrange(start, step, len):
-    """Built xrange explicitly"""
-    return xrange(start, start + step*len, step)
 
+#object generators:
 def _genpartial(func, args, kwds):
     if not args:
         args = ()
@@ -891,22 +673,26 @@ def _genpartial(func, args, kwds):
         kwds = {}
     return partial(func, *args, **kwds)
 
+
 def _fill_function(func, globals, defaults, dict):
     """ Fills in the rest of function data into the skeleton function object
         that were created via _make_skel_func().
          """
-    func.func_globals.update(globals)
-    func.func_defaults = defaults
-    func.func_dict = dict
+    func.__globals__.update(globals)
+    func.__defaults__ = defaults
+    func.__dict__ = dict
 
     return func
 
+
 def _make_cell(value):
-    return (lambda: value).func_closure[0]
+    return (lambda: value).__closure__[0]
+
 
 def _reconstruct_closure(values):
     return tuple([_make_cell(v) for v in values])
 
+
 def _make_skel_func(code, closures, base_globals = None):
     """ Creates a skeleton function object that contains just the provided
         code and the correct number of cells in func_closure.  All other
@@ -928,40 +714,3 @@ Note: These can never be renamed due to client compatibility issues"""
 def _getobject(modname, attribute):
     mod = __import__(modname, fromlist=[attribute])
     return mod.__dict__[attribute]
-
-def _generateImage(size, mode, str_rep):
-    """Generate image from string representation"""
-    import Image
-    i = Image.new(mode, size)
-    i.fromstring(str_rep)
-    return i
-
-def _lazyloadImage(fp):
-    import Image
-    fp.seek(0)  #works in almost any case
-    return Image.open(fp)
-
-"""Timeseries"""
-def _genTimeSeries(reduce_args, state):
-    import scikits.timeseries.tseries as ts
-    from numpy import ndarray
-    from numpy.ma import MaskedArray
-
-
-    time_series = ts._tsreconstruct(*reduce_args)
-
-    #from setstate modified
-    (ver, shp, typ, isf, raw, msk, flv, dsh, dtm, dtyp, frq, infodict) = state
-    #print 'regenerating %s' % dtyp
-
-    MaskedArray.__setstate__(time_series, (ver, shp, typ, isf, raw, msk, flv))
-    _dates = time_series._dates
-    #_dates.__setstate__((ver, dsh, typ, isf, dtm, frq))  #use remote typ
-    ndarray.__setstate__(_dates,(dsh,dtyp, isf, dtm))
-    _dates.freq = frq
-    _dates._cachedinfo.update(dict(full=None, hasdups=None, steps=None,
-                                   toobj=None, toord=None, tostr=None))
-    # Update the _optinfo dictionary
-    time_series._optinfo.update(infodict)
-    return time_series
-

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/conf.py
----------------------------------------------------------------------
diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py
index dc7cd0b..924da3e 100644
--- a/python/pyspark/conf.py
+++ b/python/pyspark/conf.py
@@ -44,7 +44,7 @@ u'/path'
 <pyspark.conf.SparkConf object at ...>
 >>> conf.get("spark.executorEnv.VAR1")
 u'value1'
->>> print conf.toDebugString()
+>>> print(conf.toDebugString())
 spark.executorEnv.VAR1=value1
 spark.executorEnv.VAR3=value3
 spark.executorEnv.VAR4=value4
@@ -56,6 +56,13 @@ spark.home=/path
 
 __all__ = ['SparkConf']
 
+import sys
+import re
+
+if sys.version > '3':
+    unicode = str
+    __doc__ = re.sub(r"(\W|^)[uU](['])", r'\1\2', __doc__)
+
 
 class SparkConf(object):
 

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 78dccc4..1dc2fec 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 import os
 import shutil
 import sys
@@ -32,11 +34,14 @@ from pyspark.java_gateway import launch_gateway
 from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
     PairDeserializer, AutoBatchedSerializer, NoOpSerializer
 from pyspark.storagelevel import StorageLevel
-from pyspark.rdd import RDD, _load_from_socket
+from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
 from pyspark.traceback_utils import CallSite, first_spark_call
 from pyspark.status import StatusTracker
 from pyspark.profiler import ProfilerCollector, BasicProfiler
 
+if sys.version > '3':
+    xrange = range
+
 
 __all__ = ['SparkContext']
 
@@ -133,7 +138,7 @@ class SparkContext(object):
         if sparkHome:
             self._conf.setSparkHome(sparkHome)
         if environment:
-            for key, value in environment.iteritems():
+            for key, value in environment.items():
                 self._conf.setExecutorEnv(key, value)
         for key, value in DEFAULT_CONFIGS.items():
             self._conf.setIfMissing(key, value)
@@ -153,6 +158,10 @@ class SparkContext(object):
             if k.startswith("spark.executorEnv."):
                 varName = k[len("spark.executorEnv."):]
                 self.environment[varName] = v
+        if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ:
+            # disable randomness of hash of string in worker, if this is not
+            # launched by spark-submit
+            self.environment["PYTHONHASHSEED"] = "0"
 
         # Create the Java SparkContext through Py4J
         self._jsc = jsc or self._initialize_context(self._conf._jconf)
@@ -323,7 +332,7 @@ class SparkContext(object):
             start0 = c[0]
 
             def getStart(split):
-                return start0 + (split * size / numSlices) * step
+                return start0 + int((split * size / numSlices)) * step
 
             def f(split, iterator):
                 return xrange(getStart(split), getStart(split + 1), step)
@@ -357,6 +366,7 @@ class SparkContext(object):
         minPartitions = minPartitions or self.defaultMinPartitions
         return RDD(self._jsc.objectFile(name, minPartitions), self)
 
+    @ignore_unicode_prefix
     def textFile(self, name, minPartitions=None, use_unicode=True):
         """
         Read a text file from HDFS, a local file system (available on all
@@ -369,7 +379,7 @@ class SparkContext(object):
 
         >>> path = os.path.join(tempdir, "sample-text.txt")
         >>> with open(path, "w") as testFile:
-        ...    testFile.write("Hello world!")
+        ...    _ = testFile.write("Hello world!")
         >>> textFile = sc.textFile(path)
         >>> textFile.collect()
         [u'Hello world!']
@@ -378,6 +388,7 @@ class SparkContext(object):
         return RDD(self._jsc.textFile(name, minPartitions), self,
                    UTF8Deserializer(use_unicode))
 
+    @ignore_unicode_prefix
     def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
         """
         Read a directory of text files from HDFS, a local file system
@@ -411,9 +422,9 @@ class SparkContext(object):
         >>> dirPath = os.path.join(tempdir, "files")
         >>> os.mkdir(dirPath)
         >>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
-        ...    file1.write("1")
+        ...    _ = file1.write("1")
         >>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
-        ...    file2.write("2")
+        ...    _ = file2.write("2")
         >>> textFiles = sc.wholeTextFiles(dirPath)
         >>> sorted(textFiles.collect())
         [(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]
@@ -456,7 +467,7 @@ class SparkContext(object):
         jm = self._jvm.java.util.HashMap()
         if not d:
             d = {}
-        for k, v in d.iteritems():
+        for k, v in d.items():
             jm[k] = v
         return jm
 
@@ -608,6 +619,7 @@ class SparkContext(object):
         jrdd = self._jsc.checkpointFile(name)
         return RDD(jrdd, self, input_deserializer)
 
+    @ignore_unicode_prefix
     def union(self, rdds):
         """
         Build the union of a list of RDDs.
@@ -618,7 +630,7 @@ class SparkContext(object):
 
         >>> path = os.path.join(tempdir, "union-text.txt")
         >>> with open(path, "w") as testFile:
-        ...    testFile.write("Hello")
+        ...    _ = testFile.write("Hello")
         >>> textFile = sc.textFile(path)
         >>> textFile.collect()
         [u'Hello']
@@ -677,7 +689,7 @@ class SparkContext(object):
         >>> from pyspark import SparkFiles
         >>> path = os.path.join(tempdir, "test.txt")
         >>> with open(path, "w") as testFile:
-        ...    testFile.write("100")
+        ...    _ = testFile.write("100")
         >>> sc.addFile(path)
         >>> def func(iterator):
         ...    with open(SparkFiles.get("test.txt")) as testFile:
@@ -705,11 +717,13 @@ class SparkContext(object):
         """
         self.addFile(path)
         (dirname, filename) = os.path.split(path)  # dirname may be directory or HDFS/S3 prefix
-
         if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
             self._python_includes.append(filename)
             # for tests in local mode
             sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
+        if sys.version > '3':
+            import importlib
+            importlib.invalidate_caches()
 
     def setCheckpointDir(self, dirName):
         """
@@ -744,7 +758,7 @@ class SparkContext(object):
         The application can use L{SparkContext.cancelJobGroup} to cancel all
         running jobs in this group.
 
-        >>> import thread, threading
+        >>> import threading
         >>> from time import sleep
         >>> result = "Not Set"
         >>> lock = threading.Lock()
@@ -763,10 +777,10 @@ class SparkContext(object):
         ...     sleep(5)
         ...     sc.cancelJobGroup("job_to_cancel")
         >>> supress = lock.acquire()
-        >>> supress = thread.start_new_thread(start_job, (10,))
-        >>> supress = thread.start_new_thread(stop_job, tuple())
+        >>> supress = threading.Thread(target=start_job, args=(10,)).start()
+        >>> supress = threading.Thread(target=stop_job).start()
         >>> supress = lock.acquire()
-        >>> print result
+        >>> print(result)
         Cancelled
 
         If interruptOnCancel is set to true for the job group, then job cancellation will result

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/daemon.py
----------------------------------------------------------------------
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 9388598..7f06d42 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -24,9 +24,10 @@ import sys
 import traceback
 import time
 import gc
-from errno import EINTR, ECHILD, EAGAIN
+from errno import EINTR, EAGAIN
 from socket import AF_INET, SOCK_STREAM, SOMAXCONN
 from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
+
 from pyspark.worker import main as worker_main
 from pyspark.serializers import read_int, write_int
 
@@ -53,8 +54,8 @@ def worker(sock):
     # Read the socket using fdopen instead of socket.makefile() because the latter
     # seems to be very slow; note that we need to dup() the file descriptor because
     # otherwise writes also cause a seek that makes us miss data on the read side.
-    infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
-    outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
+    infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536)
+    outfile = os.fdopen(os.dup(sock.fileno()), "wb", 65536)
     exit_code = 0
     try:
         worker_main(infile, outfile)
@@ -68,17 +69,6 @@ def worker(sock):
     return exit_code
 
 
-# Cleanup zombie children
-def cleanup_dead_children():
-    try:
-        while True:
-            pid, _ = os.waitpid(0, os.WNOHANG)
-            if not pid:
-                break
-    except:
-        pass
-
-
 def manager():
     # Create a new process group to corral our children
     os.setpgid(0, 0)
@@ -88,8 +78,12 @@ def manager():
     listen_sock.bind(('127.0.0.1', 0))
     listen_sock.listen(max(1024, SOMAXCONN))
     listen_host, listen_port = listen_sock.getsockname()
-    write_int(listen_port, sys.stdout)
-    sys.stdout.flush()
+
+    # re-open stdin/stdout in 'wb' mode
+    stdin_bin = os.fdopen(sys.stdin.fileno(), 'rb', 4)
+    stdout_bin = os.fdopen(sys.stdout.fileno(), 'wb', 4)
+    write_int(listen_port, stdout_bin)
+    stdout_bin.flush()
 
     def shutdown(code):
         signal.signal(SIGTERM, SIG_DFL)
@@ -101,6 +95,7 @@ def manager():
         shutdown(1)
     signal.signal(SIGTERM, handle_sigterm)  # Gracefully exit on SIGTERM
     signal.signal(SIGHUP, SIG_IGN)  # Don't die on SIGHUP
+    signal.signal(SIGCHLD, SIG_IGN)
 
     reuse = os.environ.get("SPARK_REUSE_WORKER")
 
@@ -115,12 +110,9 @@ def manager():
                 else:
                     raise
 
-            # cleanup in signal handler will cause deadlock
-            cleanup_dead_children()
-
             if 0 in ready_fds:
                 try:
-                    worker_pid = read_int(sys.stdin)
+                    worker_pid = read_int(stdin_bin)
                 except EOFError:
                     # Spark told us to exit by closing stdin
                     shutdown(0)
@@ -145,7 +137,7 @@ def manager():
                         time.sleep(1)
                         pid = os.fork()  # error here will shutdown daemon
                     else:
-                        outfile = sock.makefile('w')
+                        outfile = sock.makefile(mode='wb')
                         write_int(e.errno, outfile)  # Signal that the fork failed
                         outfile.flush()
                         outfile.close()
@@ -157,7 +149,7 @@ def manager():
                     listen_sock.close()
                     try:
                         # Acknowledge that the fork was successful
-                        outfile = sock.makefile("w")
+                        outfile = sock.makefile(mode="wb")
                         write_int(os.getpid(), outfile)
                         outfile.flush()
                         outfile.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/heapq3.py
----------------------------------------------------------------------
diff --git a/python/pyspark/heapq3.py b/python/pyspark/heapq3.py
index bc441f1..4ef2afe 100644
--- a/python/pyspark/heapq3.py
+++ b/python/pyspark/heapq3.py
@@ -627,51 +627,49 @@ def merge(iterables, key=None, reverse=False):
     if key is None:
         for order, it in enumerate(map(iter, iterables)):
             try:
-                next = it.next
-                h_append([next(), order * direction, next])
+                h_append([next(it), order * direction, it])
             except StopIteration:
                 pass
         _heapify(h)
         while len(h) > 1:
             try:
                 while True:
-                    value, order, next = s = h[0]
+                    value, order, it = s = h[0]
                     yield value
-                    s[0] = next()           # raises StopIteration when exhausted
+                    s[0] = next(it)           # raises StopIteration when exhausted
                     _heapreplace(h, s)      # restore heap condition
             except StopIteration:
                 _heappop(h)                 # remove empty iterator
         if h:
             # fast case when only a single iterator remains
-            value, order, next = h[0]
+            value, order, it = h[0]
             yield value
-            for value in next.__self__:
+            for value in it:
                 yield value
         return
 
     for order, it in enumerate(map(iter, iterables)):
         try:
-            next = it.next
-            value = next()
-            h_append([key(value), order * direction, value, next])
+            value = next(it)
+            h_append([key(value), order * direction, value, it])
         except StopIteration:
             pass
     _heapify(h)
     while len(h) > 1:
         try:
             while True:
-                key_value, order, value, next = s = h[0]
+                key_value, order, value, it = s = h[0]
                 yield value
-                value = next()
+                value = next(it)
                 s[0] = key(value)
                 s[2] = value
                 _heapreplace(h, s)
         except StopIteration:
             _heappop(h)
     if h:
-        key_value, order, value, next = h[0]
+        key_value, order, value, it = h[0]
         yield value
-        for value in next.__self__:
+        for value in it:
             yield value
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/java_gateway.py
----------------------------------------------------------------------
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 2a5e84a..45bc38f 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -69,7 +69,7 @@ def launch_gateway():
             if callback_socket in readable:
                 gateway_connection = callback_socket.accept()[0]
                 # Determine which ephemeral port the server started on:
-                gateway_port = read_int(gateway_connection.makefile())
+                gateway_port = read_int(gateway_connection.makefile(mode="rb"))
                 gateway_connection.close()
                 callback_socket.close()
         if gateway_port is None:

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/join.py
----------------------------------------------------------------------
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index c3491de..94df399 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -32,6 +32,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 """
 
 from pyspark.resultiterable import ResultIterable
+from functools import reduce
 
 
 def _do_python_join(rdd, other, numPartitions, dispatch):

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/ml/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py
index d7bc09f..45754bc 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -39,10 +39,10 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti
     >>> lr = LogisticRegression(maxIter=5, regParam=0.01)
     >>> model = lr.fit(df)
     >>> test0 = sc.parallelize([Row(features=Vectors.dense(-1.0))]).toDF()
-    >>> print model.transform(test0).head().prediction
+    >>> model.transform(test0).head().prediction
     0.0
     >>> test1 = sc.parallelize([Row(features=Vectors.sparse(1, [0], [1.0]))]).toDF()
-    >>> print model.transform(test1).head().prediction
+    >>> model.transform(test1).head().prediction
     1.0
     >>> lr.setParams("vector")
     Traceback (most recent call last):

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/ml/feature.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 263fe2a..4e4614b 100644
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+from pyspark.rdd import ignore_unicode_prefix
 from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasNumFeatures
 from pyspark.ml.util import keyword_only
 from pyspark.ml.wrapper import JavaTransformer
@@ -24,6 +25,7 @@ __all__ = ['Tokenizer', 'HashingTF']
 
 
 @inherit_doc
+@ignore_unicode_prefix
 class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol):
     """
     A tokenizer that converts the input string to lowercase and then
@@ -32,15 +34,15 @@ class Tokenizer(JavaTransformer, HasInputCol, HasOutputCol):
     >>> from pyspark.sql import Row
     >>> df = sc.parallelize([Row(text="a b c")]).toDF()
     >>> tokenizer = Tokenizer(inputCol="text", outputCol="words")
-    >>> print tokenizer.transform(df).head()
+    >>> tokenizer.transform(df).head()
     Row(text=u'a b c', words=[u'a', u'b', u'c'])
     >>> # Change a parameter.
-    >>> print tokenizer.setParams(outputCol="tokens").transform(df).head()
+    >>> tokenizer.setParams(outputCol="tokens").transform(df).head()
     Row(text=u'a b c', tokens=[u'a', u'b', u'c'])
     >>> # Temporarily modify a parameter.
-    >>> print tokenizer.transform(df, {tokenizer.outputCol: "words"}).head()
+    >>> tokenizer.transform(df, {tokenizer.outputCol: "words"}).head()
     Row(text=u'a b c', words=[u'a', u'b', u'c'])
-    >>> print tokenizer.transform(df).head()
+    >>> tokenizer.transform(df).head()
     Row(text=u'a b c', tokens=[u'a', u'b', u'c'])
     >>> # Must use keyword arguments to specify params.
     >>> tokenizer.setParams("text")
@@ -79,13 +81,13 @@ class HashingTF(JavaTransformer, HasInputCol, HasOutputCol, HasNumFeatures):
     >>> from pyspark.sql import Row
     >>> df = sc.parallelize([Row(words=["a", "b", "c"])]).toDF()
     >>> hashingTF = HashingTF(numFeatures=10, inputCol="words", outputCol="features")
-    >>> print hashingTF.transform(df).head().features
-    (10,[7,8,9],[1.0,1.0,1.0])
-    >>> print hashingTF.setParams(outputCol="freqs").transform(df).head().freqs
-    (10,[7,8,9],[1.0,1.0,1.0])
+    >>> hashingTF.transform(df).head().features
+    SparseVector(10, {7: 1.0, 8: 1.0, 9: 1.0})
+    >>> hashingTF.setParams(outputCol="freqs").transform(df).head().freqs
+    SparseVector(10, {7: 1.0, 8: 1.0, 9: 1.0})
     >>> params = {hashingTF.numFeatures: 5, hashingTF.outputCol: "vector"}
-    >>> print hashingTF.transform(df, params).head().vector
-    (5,[2,3,4],[1.0,1.0,1.0])
+    >>> hashingTF.transform(df, params).head().vector
+    SparseVector(5, {2: 1.0, 3: 1.0, 4: 1.0})
     """
 
     _java_class = "org.apache.spark.ml.feature.HashingTF"

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/ml/param/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py
index 5c62620..9fccb65 100644
--- a/python/pyspark/ml/param/__init__.py
+++ b/python/pyspark/ml/param/__init__.py
@@ -63,8 +63,8 @@ class Params(Identifiable):
         uses :py:func:`dir` to get all attributes of type
         :py:class:`Param`.
         """
-        return filter(lambda attr: isinstance(attr, Param),
-                      [getattr(self, x) for x in dir(self) if x != "params"])
+        return list(filter(lambda attr: isinstance(attr, Param),
+                           [getattr(self, x) for x in dir(self) if x != "params"]))
 
     def _explain(self, param):
         """
@@ -185,7 +185,7 @@ class Params(Identifiable):
         """
         Sets user-supplied params.
         """
-        for param, value in kwargs.iteritems():
+        for param, value in kwargs.items():
             self.paramMap[getattr(self, param)] = value
         return self
 
@@ -193,6 +193,6 @@ class Params(Identifiable):
         """
         Sets default params.
         """
-        for param, value in kwargs.iteritems():
+        for param, value in kwargs.items():
             self.defaultParamMap[getattr(self, param)] = value
         return self

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/ml/param/_shared_params_code_gen.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py
index 55f4224..6a31924 100644
--- a/python/pyspark/ml/param/_shared_params_code_gen.py
+++ b/python/pyspark/ml/param/_shared_params_code_gen.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 header = """#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -82,9 +84,9 @@ def _gen_param_code(name, doc, defaultValueStr):
         .replace("$defaultValueStr", str(defaultValueStr))
 
 if __name__ == "__main__":
-    print header
-    print "\n# DO NOT MODIFY THIS FILE! It was generated by _shared_params_code_gen.py.\n"
-    print "from pyspark.ml.param import Param, Params\n\n"
+    print(header)
+    print("\n# DO NOT MODIFY THIS FILE! It was generated by _shared_params_code_gen.py.\n")
+    print("from pyspark.ml.param import Param, Params\n\n")
     shared = [
         ("maxIter", "max number of iterations", None),
         ("regParam", "regularization constant", None),
@@ -97,4 +99,4 @@ if __name__ == "__main__":
     code = []
     for name, doc, defaultValueStr in shared:
         code.append(_gen_param_code(name, doc, defaultValueStr))
-    print "\n\n\n".join(code)
+    print("\n\n\n".join(code))

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/__init__.py b/python/pyspark/mllib/__init__.py
index f2ef573..07507b2 100644
--- a/python/pyspark/mllib/__init__.py
+++ b/python/pyspark/mllib/__init__.py
@@ -18,6 +18,7 @@
 """
 Python bindings for MLlib.
 """
+from __future__ import absolute_import
 
 # MLlib currently needs NumPy 1.4+, so complain if lower
 
@@ -29,7 +30,9 @@ __all__ = ['classification', 'clustering', 'feature', 'fpm', 'linalg', 'random',
            'recommendation', 'regression', 'stat', 'tree', 'util']
 
 import sys
-import rand as random
-random.__name__ = 'random'
-random.RandomRDDs.__module__ = __name__ + '.random'
-sys.modules[__name__ + '.random'] = random
+from . import rand as random
+modname = __name__ + '.random'
+random.__name__ = modname
+random.RandomRDDs.__module__ = modname
+sys.modules[modname] = random
+del modname, sys

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 2466e8a..eda0b60 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -510,9 +510,10 @@ class NaiveBayesModel(Saveable, Loader):
     def load(cls, sc, path):
         java_model = sc._jvm.org.apache.spark.mllib.classification.NaiveBayesModel.load(
             sc._jsc.sc(), path)
-        py_labels = _java2py(sc, java_model.labels())
-        py_pi = _java2py(sc, java_model.pi())
-        py_theta = _java2py(sc, java_model.theta())
+        # Can not unpickle array.array from Pyrolite in Python3 with "bytes"
+        py_labels = _java2py(sc, java_model.labels(), "latin1")
+        py_pi = _java2py(sc, java_model.pi(), "latin1")
+        py_theta = _java2py(sc, java_model.theta(), "latin1")
         return NaiveBayesModel(py_labels, py_pi, numpy.array(py_theta))
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 464f49a..abbb7cf 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -15,6 +15,12 @@
 # limitations under the License.
 #
 
+import sys
+import array as pyarray
+
+if sys.version > '3':
+    xrange = range
+
 from numpy import array
 
 from pyspark import RDD
@@ -55,8 +61,8 @@ class KMeansModel(Saveable, Loader):
     True
     >>> model.predict(sparse_data[2]) == model.predict(sparse_data[3])
     True
-    >>> type(model.clusterCenters)
-    <type 'list'>
+    >>> isinstance(model.clusterCenters, list)
+    True
     >>> import os, tempfile
     >>> path = tempfile.mkdtemp()
     >>> model.save(sc, path)
@@ -90,7 +96,7 @@ class KMeansModel(Saveable, Loader):
         return best
 
     def save(self, sc, path):
-        java_centers = _py2java(sc, map(_convert_to_vector, self.centers))
+        java_centers = _py2java(sc, [_convert_to_vector(c) for c in self.centers])
         java_model = sc._jvm.org.apache.spark.mllib.clustering.KMeansModel(java_centers)
         java_model.save(sc._jsc.sc(), path)
 
@@ -133,7 +139,7 @@ class GaussianMixtureModel(object):
     ...                                         5.7048,  4.6567, 5.5026,
     ...                                         4.5605,  5.2043,  6.2734]).reshape(5, 3))
     >>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001,
-    ...                                 maxIterations=150, seed=10)
+    ...                               maxIterations=150, seed=10)
     >>> labels = model.predict(clusterdata_2).collect()
     >>> labels[0]==labels[1]==labels[2]
     True
@@ -168,8 +174,8 @@ class GaussianMixtureModel(object):
         if isinstance(x, RDD):
             means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians])
             membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector),
-                                              self.weights, means, sigmas)
-            return membership_matrix
+                                              _convert_to_vector(self.weights), means, sigmas)
+            return membership_matrix.map(lambda x: pyarray.array('d', x))
 
 
 class GaussianMixture(object):

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/common.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/common.py b/python/pyspark/mllib/common.py
index a539d2f..ba60589 100644
--- a/python/pyspark/mllib/common.py
+++ b/python/pyspark/mllib/common.py
@@ -15,6 +15,11 @@
 # limitations under the License.
 #
 
+import sys
+if sys.version >= '3':
+    long = int
+    unicode = str
+
 import py4j.protocol
 from py4j.protocol import Py4JJavaError
 from py4j.java_gateway import JavaObject
@@ -36,7 +41,7 @@ _float_str_mapping = {
 
 def _new_smart_decode(obj):
     if isinstance(obj, float):
-        s = unicode(obj)
+        s = str(obj)
         return _float_str_mapping.get(s, s)
     return _old_smart_decode(obj)
 
@@ -74,15 +79,15 @@ def _py2java(sc, obj):
         obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client)
     elif isinstance(obj, JavaObject):
         pass
-    elif isinstance(obj, (int, long, float, bool, basestring)):
+    elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
         pass
     else:
-        bytes = bytearray(PickleSerializer().dumps(obj))
-        obj = sc._jvm.SerDe.loads(bytes)
+        data = bytearray(PickleSerializer().dumps(obj))
+        obj = sc._jvm.SerDe.loads(data)
     return obj
 
 
-def _java2py(sc, r):
+def _java2py(sc, r, encoding="bytes"):
     if isinstance(r, JavaObject):
         clsName = r.getClass().getSimpleName()
         # convert RDD into JavaRDD
@@ -102,8 +107,8 @@ def _java2py(sc, r):
             except Py4JJavaError:
                 pass  # not pickable
 
-    if isinstance(r, bytearray):
-        r = PickleSerializer().loads(str(r))
+    if isinstance(r, (bytearray, bytes)):
+        r = PickleSerializer().loads(bytes(r), encoding=encoding)
     return r
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/feature.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py
index 8be819a..1140539 100644
--- a/python/pyspark/mllib/feature.py
+++ b/python/pyspark/mllib/feature.py
@@ -23,12 +23,17 @@ from __future__ import absolute_import
 import sys
 import warnings
 import random
+import binascii
+if sys.version >= '3':
+    basestring = str
+    unicode = str
 
 from py4j.protocol import Py4JJavaError
 
-from pyspark import RDD, SparkContext
+from pyspark import SparkContext
+from pyspark.rdd import RDD, ignore_unicode_prefix
 from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
-from pyspark.mllib.linalg import Vectors, Vector, _convert_to_vector
+from pyspark.mllib.linalg import Vectors, _convert_to_vector
 
 __all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
            'HashingTF', 'IDFModel', 'IDF', 'Word2Vec', 'Word2VecModel']
@@ -206,7 +211,7 @@ class HashingTF(object):
     >>> htf = HashingTF(100)
     >>> doc = "a a b b c d".split(" ")
     >>> htf.transform(doc)
-    SparseVector(100, {1: 1.0, 14: 1.0, 31: 2.0, 44: 2.0})
+    SparseVector(100, {...})
     """
     def __init__(self, numFeatures=1 << 20):
         """
@@ -360,6 +365,7 @@ class Word2VecModel(JavaVectorTransformer):
         return self.call("getVectors")
 
 
+@ignore_unicode_prefix
 class Word2Vec(object):
     """
     Word2Vec creates vector representation of words in a text corpus.
@@ -382,7 +388,7 @@ class Word2Vec(object):
     >>> sentence = "a b " * 100 + "a c " * 10
     >>> localDoc = [sentence, sentence]
     >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" "))
-    >>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc)
+    >>> model = Word2Vec().setVectorSize(10).setSeed(42).fit(doc)
 
     >>> syms = model.findSynonyms("a", 2)
     >>> [s[0] for s in syms]
@@ -400,7 +406,7 @@ class Word2Vec(object):
         self.learningRate = 0.025
         self.numPartitions = 1
         self.numIterations = 1
-        self.seed = random.randint(0, sys.maxint)
+        self.seed = random.randint(0, sys.maxsize)
         self.minCount = 5
 
     def setVectorSize(self, vectorSize):
@@ -459,7 +465,7 @@ class Word2Vec(object):
             raise TypeError("data should be an RDD of list of string")
         jmodel = callMLlibFunc("trainWord2Vec", data, int(self.vectorSize),
                                float(self.learningRate), int(self.numPartitions),
-                               int(self.numIterations), long(self.seed),
+                               int(self.numIterations), int(self.seed),
                                int(self.minCount))
         return Word2VecModel(jmodel)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org