You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/04/17 01:21:17 UTC

[6/6] spark git commit: [SPARK-4897] [PySpark] Python 3 support

[SPARK-4897] [PySpark] Python 3 support

This PR update PySpark to support Python 3 (tested with 3.4).

Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped.

TODO: ec2/spark-ec2.py is not fully tested with python3.

Author: Davies Liu <da...@databricks.com>
Author: twneale <tw...@gmail.com>
Author: Josh Rosen <jo...@databricks.com>

Closes #5173 from davies/python3 and squashes the following commits:

d7d6323 [Davies Liu] fix tests
6c52a98 [Davies Liu] fix mllib test
99e334f [Davies Liu] update timeout
b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
cafd5ec [Davies Liu] adddress comments from @mengxr
bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
179fc8d [Davies Liu] tuning flaky tests
8c8b957 [Davies Liu] fix ResourceWarning in Python 3
5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
4006829 [Davies Liu] fix test
2fc0066 [Davies Liu] add python3 path
71535e9 [Davies Liu] fix xrange and divide
5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
ed498c8 [Davies Liu] fix compatibility with python 3
820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
ad7c374 [Davies Liu] fix mllib test and warning
ef1fc2f [Davies Liu] fix tests
4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
59bb492 [Davies Liu] fix tests
1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
ca0fdd3 [Davies Liu] fix code style
9563a15 [Davies Liu] add imap back for python 2
0b1ec04 [Davies Liu] make python examples work with Python 3
d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
a716d34 [Davies Liu] test with python 3.4
f1700e8 [Davies Liu] fix test in python3
671b1db [Davies Liu] fix test in python3
692ff47 [Davies Liu] fix flaky test
7b9699f [Davies Liu] invalidate import cache for Python 3.3+
9c58497 [Davies Liu] fix kill worker
309bfbf [Davies Liu] keep compatibility
5707476 [Davies Liu] cleanup, fix hash of string in 3.3+
8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
f53e1f0 [Davies Liu] fix tests
70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3
a39167e [Davies Liu] support customize class in __main__
814c77b [Davies Liu] run unittests with python 3
7f4476e [Davies Liu] mllib tests passed
d737924 [Davies Liu] pass ml tests
375ea17 [Davies Liu] SQL tests pass
6cc42a9 [Davies Liu] rename
431a8de [Davies Liu] streaming tests pass
78901a7 [Davies Liu] fix hash of serializer in Python 3
24b2f2e [Davies Liu] pass all RDD tests
35f48fe [Davies Liu] run future again
1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py
6e3c21d [Davies Liu] make cloudpickle work with Python3
2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run
1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out
7354371 [twneale] buffer --> memoryview  I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work.
b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?).
f40d925 [twneale] xrange --> range
e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206
79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper
2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3
854be27 [Josh Rosen] Run `futurize` on Python code:
7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04e44b37
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04e44b37
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04e44b37

Branch: refs/heads/master
Commit: 04e44b37cc04f62fbf9e08c7076349e0a4d12ea8
Parents: 55f553a
Author: Davies Liu <da...@databricks.com>
Authored: Thu Apr 16 16:20:57 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Apr 16 16:20:57 2015 -0700

----------------------------------------------------------------------
 bin/pyspark                                     |    1 +
 bin/spark-submit                                |    3 +
 bin/spark-submit2.cmd                           |    3 +
 dev/run-tests                                   |    2 +
 dev/run-tests-jenkins                           |    2 +-
 ec2/spark_ec2.py                                |  262 ++--
 examples/src/main/python/als.py                 |   15 +-
 examples/src/main/python/avro_inputformat.py    |    9 +-
 .../src/main/python/cassandra_inputformat.py    |    8 +-
 .../src/main/python/cassandra_outputformat.py   |    6 +-
 examples/src/main/python/hbase_inputformat.py   |    8 +-
 examples/src/main/python/hbase_outputformat.py  |    6 +-
 examples/src/main/python/kmeans.py              |   11 +-
 examples/src/main/python/logistic_regression.py |   20 +-
 .../ml/simple_text_classification_pipeline.py   |   20 +-
 examples/src/main/python/mllib/correlations.py  |   19 +-
 .../src/main/python/mllib/dataset_example.py    |   13 +-
 .../main/python/mllib/decision_tree_runner.py   |   29 +-
 .../main/python/mllib/gaussian_mixture_model.py |    9 +-
 .../main/python/mllib/gradient_boosted_trees.py |    7 +-
 examples/src/main/python/mllib/kmeans.py        |    5 +-
 .../main/python/mllib/logistic_regression.py    |    9 +-
 .../main/python/mllib/random_forest_example.py  |    9 +-
 .../main/python/mllib/random_rdd_generation.py  |   21 +-
 examples/src/main/python/mllib/sampled_rdds.py  |   29 +-
 examples/src/main/python/mllib/word2vec.py      |    5 +-
 examples/src/main/python/pagerank.py            |   16 +-
 examples/src/main/python/parquet_inputformat.py |    7 +-
 examples/src/main/python/pi.py                  |    5 +-
 examples/src/main/python/sort.py                |    6 +-
 examples/src/main/python/sql.py                 |    4 +-
 examples/src/main/python/status_api_demo.py     |   10 +-
 .../src/main/python/streaming/hdfs_wordcount.py |    3 +-
 .../main/python/streaming/kafka_wordcount.py    |    3 +-
 .../main/python/streaming/network_wordcount.py  |    3 +-
 .../streaming/recoverable_network_wordcount.py  |   11 +-
 .../python/streaming/sql_network_wordcount.py   |    5 +-
 .../streaming/stateful_network_wordcount.py     |    3 +-
 examples/src/main/python/transitive_closure.py  |   10 +-
 examples/src/main/python/wordcount.py           |    6 +-
 .../MatrixFactorizationModelWrapper.scala       |    9 +-
 .../spark/mllib/api/python/PythonMLLibAPI.scala |   39 +-
 python/pyspark/accumulators.py                  |    9 +-
 python/pyspark/broadcast.py                     |   37 +-
 python/pyspark/cloudpickle.py                   |  577 +++-----
 python/pyspark/conf.py                          |    9 +-
 python/pyspark/context.py                       |   42 +-
 python/pyspark/daemon.py                        |   36 +-
 python/pyspark/heapq3.py                        |   24 +-
 python/pyspark/java_gateway.py                  |    2 +-
 python/pyspark/join.py                          |    1 +
 python/pyspark/ml/classification.py             |    4 +-
 python/pyspark/ml/feature.py                    |   22 +-
 python/pyspark/ml/param/__init__.py             |    8 +-
 .../pyspark/ml/param/_shared_params_code_gen.py |   10 +-
 python/pyspark/mllib/__init__.py                |   11 +-
 python/pyspark/mllib/classification.py          |    7 +-
 python/pyspark/mllib/clustering.py              |   18 +-
 python/pyspark/mllib/common.py                  |   19 +-
 python/pyspark/mllib/feature.py                 |   18 +-
 python/pyspark/mllib/fpm.py                     |    2 +
 python/pyspark/mllib/linalg.py                  |   48 +-
 python/pyspark/mllib/rand.py                    |   33 +-
 python/pyspark/mllib/recommendation.py          |    7 +-
 python/pyspark/mllib/stat/_statistics.py        |   25 +-
 python/pyspark/mllib/tests.py                   |   20 +-
 python/pyspark/mllib/tree.py                    |   15 +-
 python/pyspark/mllib/util.py                    |   26 +-
 python/pyspark/profiler.py                      |   10 +-
 python/pyspark/rdd.py                           |  189 +--
 python/pyspark/rddsampler.py                    |    4 +-
 python/pyspark/serializers.py                   |  101 +-
 python/pyspark/shell.py                         |   16 +-
 python/pyspark/shuffle.py                       |  126 +-
 python/pyspark/sql/__init__.py                  |   15 +-
 python/pyspark/sql/_types.py                    | 1261 ++++++++++++++++++
 python/pyspark/sql/context.py                   |   32 +-
 python/pyspark/sql/dataframe.py                 |   63 +-
 python/pyspark/sql/functions.py                 |    6 +-
 python/pyspark/sql/tests.py                     |   11 +-
 python/pyspark/sql/types.py                     | 1252 -----------------
 python/pyspark/statcounter.py                   |    4 +-
 python/pyspark/streaming/context.py             |    5 +-
 python/pyspark/streaming/dstream.py             |   51 +-
 python/pyspark/streaming/kafka.py               |    8 +-
 python/pyspark/streaming/tests.py               |   39 +-
 python/pyspark/streaming/util.py                |    6 +-
 python/pyspark/tests.py                         |  327 ++---
 python/pyspark/worker.py                        |   16 +-
 python/run-tests                                |   15 +-
 python/test_support/userlib-0.1-py2.7.egg       |  Bin 1945 -> 0 bytes
 python/test_support/userlib-0.1.zip             |  Bin 0 -> 668 bytes
 92 files changed, 2630 insertions(+), 2628 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/bin/pyspark
----------------------------------------------------------------------
diff --git a/bin/pyspark b/bin/pyspark
index 776b28d..8acad61 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -89,6 +89,7 @@ export PYTHONSTARTUP="$SPARK_HOME/python/pyspark/shell.py"
 if [[ -n "$SPARK_TESTING" ]]; then
   unset YARN_CONF_DIR
   unset HADOOP_CONF_DIR
+  export PYTHONHASHSEED=0
   if [[ -n "$PYSPARK_DOC_TEST" ]]; then
     exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
   else

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/bin/spark-submit
----------------------------------------------------------------------
diff --git a/bin/spark-submit b/bin/spark-submit
index bcff78e..0e0afe7 100755
--- a/bin/spark-submit
+++ b/bin/spark-submit
@@ -19,6 +19,9 @@
 
 SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
 
+# disable randomized hash for string in Python 3.3+
+export PYTHONHASHSEED=0
+
 # Only define a usage function if an upstream script hasn't done so.
 if ! type -t usage >/dev/null 2>&1; then
   usage() {

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/bin/spark-submit2.cmd
----------------------------------------------------------------------
diff --git a/bin/spark-submit2.cmd b/bin/spark-submit2.cmd
index 08ddb18..d3fc4a5 100644
--- a/bin/spark-submit2.cmd
+++ b/bin/spark-submit2.cmd
@@ -20,6 +20,9 @@ rem
 rem This is the entry point for running Spark submit. To avoid polluting the
 rem environment, it just launches a new cmd to do the real work.
 
+rem disable randomized hash for string in Python 3.3+
+set PYTHONHASHSEED=0
+
 set CLASS=org.apache.spark.deploy.SparkSubmit
 call %~dp0spark-class2.cmd %CLASS% %*
 set SPARK_ERROR_LEVEL=%ERRORLEVEL%

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/dev/run-tests
----------------------------------------------------------------------
diff --git a/dev/run-tests b/dev/run-tests
index bb21ab6..861d167 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -235,6 +235,8 @@ echo "========================================================================="
 
 CURRENT_BLOCK=$BLOCK_PYSPARK_UNIT_TESTS
 
+# add path for python 3 in jenkins
+export PATH="${PATH}:/home/anaonda/envs/py3k/bin"
 ./python/run-tests
 
 echo ""

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/dev/run-tests-jenkins
----------------------------------------------------------------------
diff --git a/dev/run-tests-jenkins b/dev/run-tests-jenkins
index 3c1c91a..030f2cd 100755
--- a/dev/run-tests-jenkins
+++ b/dev/run-tests-jenkins
@@ -47,7 +47,7 @@ COMMIT_URL="https://github.com/apache/spark/commit/${ghprbActualCommit}"
 # GitHub doesn't auto-link short hashes when submitted via the API, unfortunately. :(
 SHORT_COMMIT_HASH="${ghprbActualCommit:0:7}"
 
-TESTS_TIMEOUT="120m" # format: http://linux.die.net/man/1/timeout
+TESTS_TIMEOUT="150m" # format: http://linux.die.net/man/1/timeout
 
 # Array to capture all tests to run on the pull request. These tests are held under the
 #+ dev/tests/ directory.

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/ec2/spark_ec2.py
----------------------------------------------------------------------
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 0c1f247..87c0818 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -19,7 +19,7 @@
 # limitations under the License.
 #
 
-from __future__ import with_statement
+from __future__ import with_statement, print_function
 
 import hashlib
 import itertools
@@ -37,12 +37,17 @@ import tarfile
 import tempfile
 import textwrap
 import time
-import urllib2
 import warnings
 from datetime import datetime
 from optparse import OptionParser
 from sys import stderr
 
+if sys.version < "3":
+    from urllib2 import urlopen, Request, HTTPError
+else:
+    from urllib.request import urlopen, Request
+    from urllib.error import HTTPError
+
 SPARK_EC2_VERSION = "1.2.1"
 SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))
 
@@ -88,10 +93,10 @@ def setup_external_libs(libs):
     SPARK_EC2_LIB_DIR = os.path.join(SPARK_EC2_DIR, "lib")
 
     if not os.path.exists(SPARK_EC2_LIB_DIR):
-        print "Downloading external libraries that spark-ec2 needs from PyPI to {path}...".format(
+        print("Downloading external libraries that spark-ec2 needs from PyPI to {path}...".format(
             path=SPARK_EC2_LIB_DIR
-        )
-        print "This should be a one-time operation."
+        ))
+        print("This should be a one-time operation.")
         os.mkdir(SPARK_EC2_LIB_DIR)
 
     for lib in libs:
@@ -100,8 +105,8 @@ def setup_external_libs(libs):
 
         if not os.path.isdir(lib_dir):
             tgz_file_path = os.path.join(SPARK_EC2_LIB_DIR, versioned_lib_name + ".tar.gz")
-            print " - Downloading {lib}...".format(lib=lib["name"])
-            download_stream = urllib2.urlopen(
+            print(" - Downloading {lib}...".format(lib=lib["name"]))
+            download_stream = urlopen(
                 "{prefix}/{first_letter}/{lib_name}/{lib_name}-{lib_version}.tar.gz".format(
                     prefix=PYPI_URL_PREFIX,
                     first_letter=lib["name"][:1],
@@ -113,13 +118,13 @@ def setup_external_libs(libs):
                 tgz_file.write(download_stream.read())
             with open(tgz_file_path) as tar:
                 if hashlib.md5(tar.read()).hexdigest() != lib["md5"]:
-                    print >> stderr, "ERROR: Got wrong md5sum for {lib}.".format(lib=lib["name"])
+                    print("ERROR: Got wrong md5sum for {lib}.".format(lib=lib["name"]), file=stderr)
                     sys.exit(1)
             tar = tarfile.open(tgz_file_path)
             tar.extractall(path=SPARK_EC2_LIB_DIR)
             tar.close()
             os.remove(tgz_file_path)
-            print " - Finished downloading {lib}.".format(lib=lib["name"])
+            print(" - Finished downloading {lib}.".format(lib=lib["name"]))
         sys.path.insert(1, lib_dir)
 
 
@@ -299,12 +304,12 @@ def parse_args():
     if home_dir is None or not os.path.isfile(home_dir + '/.boto'):
         if not os.path.isfile('/etc/boto.cfg'):
             if os.getenv('AWS_ACCESS_KEY_ID') is None:
-                print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " +
-                                  "must be set")
+                print("ERROR: The environment variable AWS_ACCESS_KEY_ID must be set",
+                      file=stderr)
                 sys.exit(1)
             if os.getenv('AWS_SECRET_ACCESS_KEY') is None:
-                print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " +
-                                  "must be set")
+                print("ERROR: The environment variable AWS_SECRET_ACCESS_KEY must be set",
+                      file=stderr)
                 sys.exit(1)
     return (opts, action, cluster_name)
 
@@ -316,7 +321,7 @@ def get_or_make_group(conn, name, vpc_id):
     if len(group) > 0:
         return group[0]
     else:
-        print "Creating security group " + name
+        print("Creating security group " + name)
         return conn.create_security_group(name, "Spark EC2 group", vpc_id)
 
 
@@ -324,18 +329,19 @@ def get_validate_spark_version(version, repo):
     if "." in version:
         version = version.replace("v", "")
         if version not in VALID_SPARK_VERSIONS:
-            print >> stderr, "Don't know about Spark version: {v}".format(v=version)
+            print("Don't know about Spark version: {v}".format(v=version), file=stderr)
             sys.exit(1)
         return version
     else:
         github_commit_url = "{repo}/commit/{commit_hash}".format(repo=repo, commit_hash=version)
-        request = urllib2.Request(github_commit_url)
+        request = Request(github_commit_url)
         request.get_method = lambda: 'HEAD'
         try:
-            response = urllib2.urlopen(request)
-        except urllib2.HTTPError, e:
-            print >> stderr, "Couldn't validate Spark commit: {url}".format(url=github_commit_url)
-            print >> stderr, "Received HTTP response code of {code}.".format(code=e.code)
+            response = urlopen(request)
+        except HTTPError as e:
+            print("Couldn't validate Spark commit: {url}".format(url=github_commit_url),
+                  file=stderr)
+            print("Received HTTP response code of {code}.".format(code=e.code), file=stderr)
             sys.exit(1)
         return version
 
@@ -394,8 +400,7 @@ def get_spark_ami(opts):
         instance_type = EC2_INSTANCE_TYPES[opts.instance_type]
     else:
         instance_type = "pvm"
-        print >> stderr,\
-            "Don't recognize %s, assuming type is pvm" % opts.instance_type
+        print("Don't recognize %s, assuming type is pvm" % opts.instance_type, file=stderr)
 
     # URL prefix from which to fetch AMI information
     ami_prefix = "{r}/{b}/ami-list".format(
@@ -404,10 +409,10 @@ def get_spark_ami(opts):
 
     ami_path = "%s/%s/%s" % (ami_prefix, opts.region, instance_type)
     try:
-        ami = urllib2.urlopen(ami_path).read().strip()
-        print "Spark AMI: " + ami
+        ami = urlopen(ami_path).read().strip()
+        print("Spark AMI: " + ami)
     except:
-        print >> stderr, "Could not resolve AMI at: " + ami_path
+        print("Could not resolve AMI at: " + ami_path, file=stderr)
         sys.exit(1)
 
     return ami
@@ -419,11 +424,11 @@ def get_spark_ami(opts):
 # Fails if there already instances running in the cluster's groups.
 def launch_cluster(conn, opts, cluster_name):
     if opts.identity_file is None:
-        print >> stderr, "ERROR: Must provide an identity file (-i) for ssh connections."
+        print("ERROR: Must provide an identity file (-i) for ssh connections.", file=stderr)
         sys.exit(1)
 
     if opts.key_pair is None:
-        print >> stderr, "ERROR: Must provide a key pair name (-k) to use on instances."
+        print("ERROR: Must provide a key pair name (-k) to use on instances.", file=stderr)
         sys.exit(1)
 
     user_data_content = None
@@ -431,7 +436,7 @@ def launch_cluster(conn, opts, cluster_name):
         with open(opts.user_data) as user_data_file:
             user_data_content = user_data_file.read()
 
-    print "Setting up security groups..."
+    print("Setting up security groups...")
     master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id)
     slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id)
     authorized_address = opts.authorized_address
@@ -497,8 +502,8 @@ def launch_cluster(conn, opts, cluster_name):
     existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name,
                                                              die_on_error=False)
     if existing_slaves or (existing_masters and not opts.use_existing_master):
-        print >> stderr, ("ERROR: There are already instances running in " +
-                          "group %s or %s" % (master_group.name, slave_group.name))
+        print("ERROR: There are already instances running in group %s or %s" %
+              (master_group.name, slave_group.name), file=stderr)
         sys.exit(1)
 
     # Figure out Spark AMI
@@ -511,12 +516,12 @@ def launch_cluster(conn, opts, cluster_name):
         additional_group_ids = [sg.id
                                 for sg in conn.get_all_security_groups()
                                 if opts.additional_security_group in (sg.name, sg.id)]
-    print "Launching instances..."
+    print("Launching instances...")
 
     try:
         image = conn.get_all_images(image_ids=[opts.ami])[0]
     except:
-        print >> stderr, "Could not find AMI " + opts.ami
+        print("Could not find AMI " + opts.ami, file=stderr)
         sys.exit(1)
 
     # Create block device mapping so that we can add EBS volumes if asked to.
@@ -542,8 +547,8 @@ def launch_cluster(conn, opts, cluster_name):
     # Launch slaves
     if opts.spot_price is not None:
         # Launch spot instances with the requested price
-        print ("Requesting %d slaves as spot instances with price $%.3f" %
-               (opts.slaves, opts.spot_price))
+        print("Requesting %d slaves as spot instances with price $%.3f" %
+              (opts.slaves, opts.spot_price))
         zones = get_zones(conn, opts)
         num_zones = len(zones)
         i = 0
@@ -566,7 +571,7 @@ def launch_cluster(conn, opts, cluster_name):
             my_req_ids += [req.id for req in slave_reqs]
             i += 1
 
-        print "Waiting for spot instances to be granted..."
+        print("Waiting for spot instances to be granted...")
         try:
             while True:
                 time.sleep(10)
@@ -579,24 +584,24 @@ def launch_cluster(conn, opts, cluster_name):
                     if i in id_to_req and id_to_req[i].state == "active":
                         active_instance_ids.append(id_to_req[i].instance_id)
                 if len(active_instance_ids) == opts.slaves:
-                    print "All %d slaves granted" % opts.slaves
+                    print("All %d slaves granted" % opts.slaves)
                     reservations = conn.get_all_reservations(active_instance_ids)
                     slave_nodes = []
                     for r in reservations:
                         slave_nodes += r.instances
                     break
                 else:
-                    print "%d of %d slaves granted, waiting longer" % (
-                        len(active_instance_ids), opts.slaves)
+                    print("%d of %d slaves granted, waiting longer" % (
+                        len(active_instance_ids), opts.slaves))
         except:
-            print "Canceling spot instance requests"
+            print("Canceling spot instance requests")
             conn.cancel_spot_instance_requests(my_req_ids)
             # Log a warning if any of these requests actually launched instances:
             (master_nodes, slave_nodes) = get_existing_cluster(
                 conn, opts, cluster_name, die_on_error=False)
             running = len(master_nodes) + len(slave_nodes)
             if running:
-                print >> stderr, ("WARNING: %d instances are still running" % running)
+                print(("WARNING: %d instances are still running" % running), file=stderr)
             sys.exit(0)
     else:
         # Launch non-spot instances
@@ -618,16 +623,16 @@ def launch_cluster(conn, opts, cluster_name):
                                       placement_group=opts.placement_group,
                                       user_data=user_data_content)
                 slave_nodes += slave_res.instances
-                print "Launched {s} slave{plural_s} in {z}, regid = {r}".format(
-                    s=num_slaves_this_zone,
-                    plural_s=('' if num_slaves_this_zone == 1 else 's'),
-                    z=zone,
-                    r=slave_res.id)
+                print("Launched {s} slave{plural_s} in {z}, regid = {r}".format(
+                      s=num_slaves_this_zone,
+                      plural_s=('' if num_slaves_this_zone == 1 else 's'),
+                      z=zone,
+                      r=slave_res.id))
             i += 1
 
     # Launch or resume masters
     if existing_masters:
-        print "Starting master..."
+        print("Starting master...")
         for inst in existing_masters:
             if inst.state not in ["shutting-down", "terminated"]:
                 inst.start()
@@ -650,10 +655,10 @@ def launch_cluster(conn, opts, cluster_name):
                                user_data=user_data_content)
 
         master_nodes = master_res.instances
-        print "Launched master in %s, regid = %s" % (zone, master_res.id)
+        print("Launched master in %s, regid = %s" % (zone, master_res.id))
 
     # This wait time corresponds to SPARK-4983
-    print "Waiting for AWS to propagate instance metadata..."
+    print("Waiting for AWS to propagate instance metadata...")
     time.sleep(5)
     # Give the instances descriptive names
     for master in master_nodes:
@@ -674,8 +679,8 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
     Get the EC2 instances in an existing cluster if available.
     Returns a tuple of lists of EC2 instance objects for the masters and slaves.
     """
-    print "Searching for existing cluster {c} in region {r}...".format(
-        c=cluster_name, r=opts.region)
+    print("Searching for existing cluster {c} in region {r}...".format(
+          c=cluster_name, r=opts.region))
 
     def get_instances(group_names):
         """
@@ -693,16 +698,15 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
     slave_instances = get_instances([cluster_name + "-slaves"])
 
     if any((master_instances, slave_instances)):
-        print "Found {m} master{plural_m}, {s} slave{plural_s}.".format(
-            m=len(master_instances),
-            plural_m=('' if len(master_instances) == 1 else 's'),
-            s=len(slave_instances),
-            plural_s=('' if len(slave_instances) == 1 else 's'))
+        print("Found {m} master{plural_m}, {s} slave{plural_s}.".format(
+              m=len(master_instances),
+              plural_m=('' if len(master_instances) == 1 else 's'),
+              s=len(slave_instances),
+              plural_s=('' if len(slave_instances) == 1 else 's')))
 
     if not master_instances and die_on_error:
-        print >> sys.stderr, \
-            "ERROR: Could not find a master for cluster {c} in region {r}.".format(
-                c=cluster_name, r=opts.region)
+        print("ERROR: Could not find a master for cluster {c} in region {r}.".format(
+              c=cluster_name, r=opts.region), file=sys.stderr)
         sys.exit(1)
 
     return (master_instances, slave_instances)
@@ -713,7 +717,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
 def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
     master = get_dns_name(master_nodes[0], opts.private_ips)
     if deploy_ssh_key:
-        print "Generating cluster's SSH key on master..."
+        print("Generating cluster's SSH key on master...")
         key_setup = """
           [ -f ~/.ssh/id_rsa ] ||
             (ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa &&
@@ -721,10 +725,10 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
         """
         ssh(master, opts, key_setup)
         dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
-        print "Transferring cluster's SSH key to slaves..."
+        print("Transferring cluster's SSH key to slaves...")
         for slave in slave_nodes:
             slave_address = get_dns_name(slave, opts.private_ips)
-            print slave_address
+            print(slave_address)
             ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar)
 
     modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs',
@@ -738,8 +742,8 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
 
     # NOTE: We should clone the repository before running deploy_files to
     # prevent ec2-variables.sh from being overwritten
-    print "Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format(
-        r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch)
+    print("Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format(
+        r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch))
     ssh(
         host=master,
         opts=opts,
@@ -749,7 +753,7 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
                                                   b=opts.spark_ec2_git_branch)
     )
 
-    print "Deploying files to master..."
+    print("Deploying files to master...")
     deploy_files(
         conn=conn,
         root_dir=SPARK_EC2_DIR + "/" + "deploy.generic",
@@ -760,25 +764,25 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
     )
 
     if opts.deploy_root_dir is not None:
-        print "Deploying {s} to master...".format(s=opts.deploy_root_dir)
+        print("Deploying {s} to master...".format(s=opts.deploy_root_dir))
         deploy_user_files(
             root_dir=opts.deploy_root_dir,
             opts=opts,
             master_nodes=master_nodes
         )
 
-    print "Running setup on master..."
+    print("Running setup on master...")
     setup_spark_cluster(master, opts)
-    print "Done!"
+    print("Done!")
 
 
 def setup_spark_cluster(master, opts):
     ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
     ssh(master, opts, "spark-ec2/setup.sh")
-    print "Spark standalone cluster started at http://%s:8080" % master
+    print("Spark standalone cluster started at http://%s:8080" % master)
 
     if opts.ganglia:
-        print "Ganglia started at http://%s:5080/ganglia" % master
+        print("Ganglia started at http://%s:5080/ganglia" % master)
 
 
 def is_ssh_available(host, opts, print_ssh_output=True):
@@ -795,7 +799,7 @@ def is_ssh_available(host, opts, print_ssh_output=True):
 
     if s.returncode != 0 and print_ssh_output:
         # extra leading newline is for spacing in wait_for_cluster_state()
-        print textwrap.dedent("""\n
+        print(textwrap.dedent("""\n
             Warning: SSH connection error. (This could be temporary.)
             Host: {h}
             SSH return code: {r}
@@ -804,7 +808,7 @@ def is_ssh_available(host, opts, print_ssh_output=True):
             h=host,
             r=s.returncode,
             o=cmd_output.strip()
-        )
+        ))
 
     return s.returncode == 0
 
@@ -865,10 +869,10 @@ def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state):
     sys.stdout.write("\n")
 
     end_time = datetime.now()
-    print "Cluster is now in '{s}' state. Waited {t} seconds.".format(
+    print("Cluster is now in '{s}' state. Waited {t} seconds.".format(
         s=cluster_state,
         t=(end_time - start_time).seconds
-    )
+    ))
 
 
 # Get number of local disks available for a given EC2 instance type.
@@ -916,8 +920,8 @@ def get_num_disks(instance_type):
     if instance_type in disks_by_instance:
         return disks_by_instance[instance_type]
     else:
-        print >> stderr, ("WARNING: Don't know number of disks on instance type %s; assuming 1"
-                          % instance_type)
+        print("WARNING: Don't know number of disks on instance type %s; assuming 1"
+              % instance_type, file=stderr)
         return 1
 
 
@@ -951,7 +955,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
         # Spark-only custom deploy
         spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version)
         tachyon_v = ""
-        print "Deploying Spark via git hash; Tachyon won't be set up"
+        print("Deploying Spark via git hash; Tachyon won't be set up")
         modules = filter(lambda x: x != "tachyon", modules)
 
     master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes]
@@ -1067,8 +1071,8 @@ def ssh(host, opts, command):
                         "--key-pair parameters and try again.".format(host))
                 else:
                     raise e
-            print >> stderr, \
-                "Error executing remote command, retrying after 30 seconds: {0}".format(e)
+            print("Error executing remote command, retrying after 30 seconds: {0}".format(e),
+                  file=stderr)
             time.sleep(30)
             tries = tries + 1
 
@@ -1107,8 +1111,8 @@ def ssh_write(host, opts, command, arguments):
         elif tries > 5:
             raise RuntimeError("ssh_write failed with error %s" % proc.returncode)
         else:
-            print >> stderr, \
-                "Error {0} while executing remote command, retrying after 30 seconds".format(status)
+            print("Error {0} while executing remote command, retrying after 30 seconds".
+                  format(status), file=stderr)
             time.sleep(30)
             tries = tries + 1
 
@@ -1162,42 +1166,41 @@ def real_main():
 
     if opts.identity_file is not None:
         if not os.path.exists(opts.identity_file):
-            print >> stderr,\
-                "ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file)
+            print("ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file),
+                  file=stderr)
             sys.exit(1)
 
         file_mode = os.stat(opts.identity_file).st_mode
         if not (file_mode & S_IRUSR) or not oct(file_mode)[-2:] == '00':
-            print >> stderr, "ERROR: The identity file must be accessible only by you."
-            print >> stderr, 'You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file)
+            print("ERROR: The identity file must be accessible only by you.", file=stderr)
+            print('You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file),
+                  file=stderr)
             sys.exit(1)
 
     if opts.instance_type not in EC2_INSTANCE_TYPES:
-        print >> stderr, "Warning: Unrecognized EC2 instance type for instance-type: {t}".format(
-            t=opts.instance_type)
+        print("Warning: Unrecognized EC2 instance type for instance-type: {t}".format(
+              t=opts.instance_type), file=stderr)
 
     if opts.master_instance_type != "":
         if opts.master_instance_type not in EC2_INSTANCE_TYPES:
-            print >> stderr, \
-                "Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format(
-                    t=opts.master_instance_type)
+            print("Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format(
+                  t=opts.master_instance_type), file=stderr)
         # Since we try instance types even if we can't resolve them, we check if they resolve first
         # and, if they do, see if they resolve to the same virtualization type.
         if opts.instance_type in EC2_INSTANCE_TYPES and \
            opts.master_instance_type in EC2_INSTANCE_TYPES:
             if EC2_INSTANCE_TYPES[opts.instance_type] != \
                EC2_INSTANCE_TYPES[opts.master_instance_type]:
-                print >> stderr, \
-                    "Error: spark-ec2 currently does not support having a master and slaves " + \
-                    "with different AMI virtualization types."
-                print >> stderr, "master instance virtualization type: {t}".format(
-                    t=EC2_INSTANCE_TYPES[opts.master_instance_type])
-                print >> stderr, "slave instance virtualization type: {t}".format(
-                    t=EC2_INSTANCE_TYPES[opts.instance_type])
+                print("Error: spark-ec2 currently does not support having a master and slaves "
+                      "with different AMI virtualization types.", file=stderr)
+                print("master instance virtualization type: {t}".format(
+                      t=EC2_INSTANCE_TYPES[opts.master_instance_type]), file=stderr)
+                print("slave instance virtualization type: {t}".format(
+                      t=EC2_INSTANCE_TYPES[opts.instance_type]), file=stderr)
                 sys.exit(1)
 
     if opts.ebs_vol_num > 8:
-        print >> stderr, "ebs-vol-num cannot be greater than 8"
+        print("ebs-vol-num cannot be greater than 8", file=stderr)
         sys.exit(1)
 
     # Prevent breaking ami_prefix (/, .git and startswith checks)
@@ -1206,23 +1209,22 @@ def real_main():
             opts.spark_ec2_git_repo.endswith(".git") or \
             not opts.spark_ec2_git_repo.startswith("https://github.com") or \
             not opts.spark_ec2_git_repo.endswith("spark-ec2"):
-        print >> stderr, "spark-ec2-git-repo must be a github repo and it must not have a " \
-                         "trailing / or .git. " \
-                         "Furthermore, we currently only support forks named spark-ec2."
+        print("spark-ec2-git-repo must be a github repo and it must not have a trailing / or .git. "
+              "Furthermore, we currently only support forks named spark-ec2.", file=stderr)
         sys.exit(1)
 
     if not (opts.deploy_root_dir is None or
             (os.path.isabs(opts.deploy_root_dir) and
              os.path.isdir(opts.deploy_root_dir) and
              os.path.exists(opts.deploy_root_dir))):
-        print >> stderr, "--deploy-root-dir must be an absolute path to a directory that exists " \
-                         "on the local file system"
+        print("--deploy-root-dir must be an absolute path to a directory that exists "
+              "on the local file system", file=stderr)
         sys.exit(1)
 
     try:
         conn = ec2.connect_to_region(opts.region)
     except Exception as e:
-        print >> stderr, (e)
+        print((e), file=stderr)
         sys.exit(1)
 
     # Select an AZ at random if it was not specified.
@@ -1231,7 +1233,7 @@ def real_main():
 
     if action == "launch":
         if opts.slaves <= 0:
-            print >> sys.stderr, "ERROR: You have to start at least 1 slave"
+            print("ERROR: You have to start at least 1 slave", file=sys.stderr)
             sys.exit(1)
         if opts.resume:
             (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
@@ -1250,18 +1252,18 @@ def real_main():
             conn, opts, cluster_name, die_on_error=False)
 
         if any(master_nodes + slave_nodes):
-            print "The following instances will be terminated:"
+            print("The following instances will be terminated:")
             for inst in master_nodes + slave_nodes:
-                print "> %s" % get_dns_name(inst, opts.private_ips)
-            print "ALL DATA ON ALL NODES WILL BE LOST!!"
+                print("> %s" % get_dns_name(inst, opts.private_ips))
+            print("ALL DATA ON ALL NODES WILL BE LOST!!")
 
         msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name)
         response = raw_input(msg)
         if response == "y":
-            print "Terminating master..."
+            print("Terminating master...")
             for inst in master_nodes:
                 inst.terminate()
-            print "Terminating slaves..."
+            print("Terminating slaves...")
             for inst in slave_nodes:
                 inst.terminate()
 
@@ -1274,16 +1276,16 @@ def real_main():
                     cluster_instances=(master_nodes + slave_nodes),
                     cluster_state='terminated'
                 )
-                print "Deleting security groups (this will take some time)..."
+                print("Deleting security groups (this will take some time)...")
                 attempt = 1
                 while attempt <= 3:
-                    print "Attempt %d" % attempt
+                    print("Attempt %d" % attempt)
                     groups = [g for g in conn.get_all_security_groups() if g.name in group_names]
                     success = True
                     # Delete individual rules in all groups before deleting groups to
                     # remove dependencies between them
                     for group in groups:
-                        print "Deleting rules in security group " + group.name
+                        print("Deleting rules in security group " + group.name)
                         for rule in group.rules:
                             for grant in rule.grants:
                                 success &= group.revoke(ip_protocol=rule.ip_protocol,
@@ -1298,10 +1300,10 @@ def real_main():
                         try:
                             # It is needed to use group_id to make it work with VPC
                             conn.delete_security_group(group_id=group.id)
-                            print "Deleted security group %s" % group.name
+                            print("Deleted security group %s" % group.name)
                         except boto.exception.EC2ResponseError:
                             success = False
-                            print "Failed to delete security group %s" % group.name
+                            print("Failed to delete security group %s" % group.name)
 
                     # Unfortunately, group.revoke() returns True even if a rule was not
                     # deleted, so this needs to be rerun if something fails
@@ -1311,17 +1313,16 @@ def real_main():
                     attempt += 1
 
                 if not success:
-                    print "Failed to delete all security groups after 3 tries."
-                    print "Try re-running in a few minutes."
+                    print("Failed to delete all security groups after 3 tries.")
+                    print("Try re-running in a few minutes.")
 
     elif action == "login":
         (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
         if not master_nodes[0].public_dns_name and not opts.private_ips:
-            print "Master has no public DNS name.  Maybe you meant to specify " \
-                "--private-ips?"
+            print("Master has no public DNS name.  Maybe you meant to specify --private-ips?")
         else:
             master = get_dns_name(master_nodes[0], opts.private_ips)
-            print "Logging into master " + master + "..."
+            print("Logging into master " + master + "...")
             proxy_opt = []
             if opts.proxy_port is not None:
                 proxy_opt = ['-D', opts.proxy_port]
@@ -1336,19 +1337,18 @@ def real_main():
         if response == "y":
             (master_nodes, slave_nodes) = get_existing_cluster(
                 conn, opts, cluster_name, die_on_error=False)
-            print "Rebooting slaves..."
+            print("Rebooting slaves...")
             for inst in slave_nodes:
                 if inst.state not in ["shutting-down", "terminated"]:
-                    print "Rebooting " + inst.id
+                    print("Rebooting " + inst.id)
                     inst.reboot()
 
     elif action == "get-master":
         (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
         if not master_nodes[0].public_dns_name and not opts.private_ips:
-            print "Master has no public DNS name.  Maybe you meant to specify " \
-                "--private-ips?"
+            print("Master has no public DNS name.  Maybe you meant to specify --private-ips?")
         else:
-            print get_dns_name(master_nodes[0], opts.private_ips)
+            print(get_dns_name(master_nodes[0], opts.private_ips))
 
     elif action == "stop":
         response = raw_input(
@@ -1361,11 +1361,11 @@ def real_main():
         if response == "y":
             (master_nodes, slave_nodes) = get_existing_cluster(
                 conn, opts, cluster_name, die_on_error=False)
-            print "Stopping master..."
+            print("Stopping master...")
             for inst in master_nodes:
                 if inst.state not in ["shutting-down", "terminated"]:
                     inst.stop()
-            print "Stopping slaves..."
+            print("Stopping slaves...")
             for inst in slave_nodes:
                 if inst.state not in ["shutting-down", "terminated"]:
                     if inst.spot_instance_request_id:
@@ -1375,11 +1375,11 @@ def real_main():
 
     elif action == "start":
         (master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
-        print "Starting slaves..."
+        print("Starting slaves...")
         for inst in slave_nodes:
             if inst.state not in ["shutting-down", "terminated"]:
                 inst.start()
-        print "Starting master..."
+        print("Starting master...")
         for inst in master_nodes:
             if inst.state not in ["shutting-down", "terminated"]:
                 inst.start()
@@ -1403,15 +1403,15 @@ def real_main():
         setup_cluster(conn, master_nodes, slave_nodes, opts, False)
 
     else:
-        print >> stderr, "Invalid action: %s" % action
+        print("Invalid action: %s" % action, file=stderr)
         sys.exit(1)
 
 
 def main():
     try:
         real_main()
-    except UsageError, e:
-        print >> stderr, "\nError:\n", e
+    except UsageError as e:
+        print("\nError:\n", e, file=stderr)
         sys.exit(1)
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/als.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py
index 70b6146..1c3a787 100755
--- a/examples/src/main/python/als.py
+++ b/examples/src/main/python/als.py
@@ -21,7 +21,8 @@ ALS in pyspark.mllib.recommendation for more conventional use.
 
 This example requires numpy (http://www.numpy.org/)
 """
-from os.path import realpath
+from __future__ import print_function
+
 import sys
 
 import numpy as np
@@ -57,9 +58,9 @@ if __name__ == "__main__":
     Usage: als [M] [U] [F] [iterations] [partitions]"
     """
 
-    print >> sys.stderr, """WARN: This is a naive implementation of ALS and is given as an
+    print("""WARN: This is a naive implementation of ALS and is given as an
       example. Please use the ALS method found in pyspark.mllib.recommendation for more
-      conventional use."""
+      conventional use.""", file=sys.stderr)
 
     sc = SparkContext(appName="PythonALS")
     M = int(sys.argv[1]) if len(sys.argv) > 1 else 100
@@ -68,8 +69,8 @@ if __name__ == "__main__":
     ITERATIONS = int(sys.argv[4]) if len(sys.argv) > 4 else 5
     partitions = int(sys.argv[5]) if len(sys.argv) > 5 else 2
 
-    print "Running ALS with M=%d, U=%d, F=%d, iters=%d, partitions=%d\n" % \
-        (M, U, F, ITERATIONS, partitions)
+    print("Running ALS with M=%d, U=%d, F=%d, iters=%d, partitions=%d\n" %
+          (M, U, F, ITERATIONS, partitions))
 
     R = matrix(rand(M, F)) * matrix(rand(U, F).T)
     ms = matrix(rand(M, F))
@@ -95,7 +96,7 @@ if __name__ == "__main__":
         usb = sc.broadcast(us)
 
         error = rmse(R, ms, us)
-        print "Iteration %d:" % i
-        print "\nRMSE: %5.4f\n" % error
+        print("Iteration %d:" % i)
+        print("\nRMSE: %5.4f\n" % error)
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/avro_inputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/avro_inputformat.py b/examples/src/main/python/avro_inputformat.py
index 4626bbb..da368ac 100644
--- a/examples/src/main/python/avro_inputformat.py
+++ b/examples/src/main/python/avro_inputformat.py
@@ -15,9 +15,12 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 import sys
 
 from pyspark import SparkContext
+from functools import reduce
 
 """
 Read data file users.avro in local Spark distro:
@@ -49,7 +52,7 @@ $ ./bin/spark-submit --driver-class-path /path/to/example/jar \
 """
 if __name__ == "__main__":
     if len(sys.argv) != 2 and len(sys.argv) != 3:
-        print >> sys.stderr, """
+        print("""
         Usage: avro_inputformat <data_file> [reader_schema_file]
 
         Run with example jar:
@@ -57,7 +60,7 @@ if __name__ == "__main__":
         /path/to/examples/avro_inputformat.py <data_file> [reader_schema_file]
         Assumes you have Avro data stored in <data_file>. Reader schema can be optionally specified
         in [reader_schema_file].
-        """
+        """, file=sys.stderr)
         exit(-1)
 
     path = sys.argv[1]
@@ -77,6 +80,6 @@ if __name__ == "__main__":
         conf=conf)
     output = avro_rdd.map(lambda x: x[0]).collect()
     for k in output:
-        print k
+        print(k)
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/cassandra_inputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py
index 05f34b7..93ca0cf 100644
--- a/examples/src/main/python/cassandra_inputformat.py
+++ b/examples/src/main/python/cassandra_inputformat.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 import sys
 
 from pyspark import SparkContext
@@ -47,14 +49,14 @@ cqlsh:test> SELECT * FROM users;
 """
 if __name__ == "__main__":
     if len(sys.argv) != 4:
-        print >> sys.stderr, """
+        print("""
         Usage: cassandra_inputformat <host> <keyspace> <cf>
 
         Run with example jar:
         ./bin/spark-submit --driver-class-path /path/to/example/jar \
         /path/to/examples/cassandra_inputformat.py <host> <keyspace> <cf>
         Assumes you have some data in Cassandra already, running on <host>, in <keyspace> and <cf>
-        """
+        """, file=sys.stderr)
         exit(-1)
 
     host = sys.argv[1]
@@ -77,6 +79,6 @@ if __name__ == "__main__":
         conf=conf)
     output = cass_rdd.collect()
     for (k, v) in output:
-        print (k, v)
+        print((k, v))
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/cassandra_outputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py
index d144539..5d643ea 100644
--- a/examples/src/main/python/cassandra_outputformat.py
+++ b/examples/src/main/python/cassandra_outputformat.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 import sys
 
 from pyspark import SparkContext
@@ -46,7 +48,7 @@ cqlsh:test> SELECT * FROM users;
 """
 if __name__ == "__main__":
     if len(sys.argv) != 7:
-        print >> sys.stderr, """
+        print("""
         Usage: cassandra_outputformat <host> <keyspace> <cf> <user_id> <fname> <lname>
 
         Run with example jar:
@@ -60,7 +62,7 @@ if __name__ == "__main__":
            ...   fname text,
            ...   lname text
            ... );
-        """
+        """, file=sys.stderr)
         exit(-1)
 
     host = sys.argv[1]

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/hbase_inputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py
index 3b16010..e17819d 100644
--- a/examples/src/main/python/hbase_inputformat.py
+++ b/examples/src/main/python/hbase_inputformat.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 import sys
 
 from pyspark import SparkContext
@@ -47,14 +49,14 @@ ROW                           COLUMN+CELL
 """
 if __name__ == "__main__":
     if len(sys.argv) != 3:
-        print >> sys.stderr, """
+        print("""
         Usage: hbase_inputformat <host> <table>
 
         Run with example jar:
         ./bin/spark-submit --driver-class-path /path/to/example/jar \
         /path/to/examples/hbase_inputformat.py <host> <table>
         Assumes you have some data in HBase already, running on <host>, in <table>
-        """
+        """, file=sys.stderr)
         exit(-1)
 
     host = sys.argv[1]
@@ -74,6 +76,6 @@ if __name__ == "__main__":
         conf=conf)
     output = hbase_rdd.collect()
     for (k, v) in output:
-        print (k, v)
+        print((k, v))
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/hbase_outputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py
index abb425b..9e56417 100644
--- a/examples/src/main/python/hbase_outputformat.py
+++ b/examples/src/main/python/hbase_outputformat.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 import sys
 
 from pyspark import SparkContext
@@ -40,7 +42,7 @@ ROW                   COLUMN+CELL
 """
 if __name__ == "__main__":
     if len(sys.argv) != 7:
-        print >> sys.stderr, """
+        print("""
         Usage: hbase_outputformat <host> <table> <row> <family> <qualifier> <value>
 
         Run with example jar:
@@ -48,7 +50,7 @@ if __name__ == "__main__":
         /path/to/examples/hbase_outputformat.py <args>
         Assumes you have created <table> with column family <family> in HBase
         running on <host> already
-        """
+        """, file=sys.stderr)
         exit(-1)
 
     host = sys.argv[1]

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/kmeans.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/kmeans.py b/examples/src/main/python/kmeans.py
index 86ef6f3..1939150 100755
--- a/examples/src/main/python/kmeans.py
+++ b/examples/src/main/python/kmeans.py
@@ -22,6 +22,7 @@ examples/src/main/python/mllib/kmeans.py.
 
 This example requires NumPy (http://www.numpy.org/).
 """
+from __future__ import print_function
 
 import sys
 
@@ -47,12 +48,12 @@ def closestPoint(p, centers):
 if __name__ == "__main__":
 
     if len(sys.argv) != 4:
-        print >> sys.stderr, "Usage: kmeans <file> <k> <convergeDist>"
+        print("Usage: kmeans <file> <k> <convergeDist>", file=sys.stderr)
         exit(-1)
 
-    print >> sys.stderr, """WARN: This is a naive implementation of KMeans Clustering and is given
+    print("""WARN: This is a naive implementation of KMeans Clustering and is given
        as an example! Please refer to examples/src/main/python/mllib/kmeans.py for an example on
-       how to use MLlib's KMeans implementation."""
+       how to use MLlib's KMeans implementation.""", file=sys.stderr)
 
     sc = SparkContext(appName="PythonKMeans")
     lines = sc.textFile(sys.argv[1])
@@ -69,13 +70,13 @@ if __name__ == "__main__":
         pointStats = closest.reduceByKey(
             lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2))
         newPoints = pointStats.map(
-            lambda (x, (y, z)): (x, y / z)).collect()
+            lambda xy: (xy[0], xy[1][0] / xy[1][1])).collect()
 
         tempDist = sum(np.sum((kPoints[x] - y) ** 2) for (x, y) in newPoints)
 
         for (x, y) in newPoints:
             kPoints[x] = y
 
-    print "Final centers: " + str(kPoints)
+    print("Final centers: " + str(kPoints))
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/logistic_regression.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/logistic_regression.py b/examples/src/main/python/logistic_regression.py
index 3aa56b0..b318b7d 100755
--- a/examples/src/main/python/logistic_regression.py
+++ b/examples/src/main/python/logistic_regression.py
@@ -22,10 +22,8 @@ to act on batches of input data using efficient matrix operations.
 In practice, one may prefer to use the LogisticRegression algorithm in
 MLlib, as shown in examples/src/main/python/mllib/logistic_regression.py.
 """
+from __future__ import print_function
 
-from collections import namedtuple
-from math import exp
-from os.path import realpath
 import sys
 
 import numpy as np
@@ -42,19 +40,19 @@ D = 10  # Number of dimensions
 def readPointBatch(iterator):
     strs = list(iterator)
     matrix = np.zeros((len(strs), D + 1))
-    for i in xrange(len(strs)):
-        matrix[i] = np.fromstring(strs[i].replace(',', ' '), dtype=np.float32, sep=' ')
+    for i, s in enumerate(strs):
+        matrix[i] = np.fromstring(s.replace(',', ' '), dtype=np.float32, sep=' ')
     return [matrix]
 
 if __name__ == "__main__":
 
     if len(sys.argv) != 3:
-        print >> sys.stderr, "Usage: logistic_regression <file> <iterations>"
+        print("Usage: logistic_regression <file> <iterations>", file=sys.stderr)
         exit(-1)
 
-    print >> sys.stderr,  """WARN: This is a naive implementation of Logistic Regression and is
+    print("""WARN: This is a naive implementation of Logistic Regression and is
       given as an example! Please refer to examples/src/main/python/mllib/logistic_regression.py
-      to see how MLlib's implementation is used."""
+      to see how MLlib's implementation is used.""", file=sys.stderr)
 
     sc = SparkContext(appName="PythonLR")
     points = sc.textFile(sys.argv[1]).mapPartitions(readPointBatch).cache()
@@ -62,7 +60,7 @@ if __name__ == "__main__":
 
     # Initialize w to a random value
     w = 2 * np.random.ranf(size=D) - 1
-    print "Initial w: " + str(w)
+    print("Initial w: " + str(w))
 
     # Compute logistic regression gradient for a matrix of data points
     def gradient(matrix, w):
@@ -76,9 +74,9 @@ if __name__ == "__main__":
         return x
 
     for i in range(iterations):
-        print "On iteration %i" % (i + 1)
+        print("On iteration %i" % (i + 1))
         w -= points.map(lambda m: gradient(m, w)).reduce(add)
 
-    print "Final w: " + str(w)
+    print("Final w: " + str(w))
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/ml/simple_text_classification_pipeline.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/ml/simple_text_classification_pipeline.py b/examples/src/main/python/ml/simple_text_classification_pipeline.py
index c73edb7..fab21f0 100644
--- a/examples/src/main/python/ml/simple_text_classification_pipeline.py
+++ b/examples/src/main/python/ml/simple_text_classification_pipeline.py
@@ -15,6 +15,8 @@
 # limitations under the License.
 #
 
+from __future__ import print_function
+
 from pyspark import SparkContext
 from pyspark.ml import Pipeline
 from pyspark.ml.classification import LogisticRegression
@@ -37,10 +39,10 @@ if __name__ == "__main__":
 
     # Prepare training documents, which are labeled.
     LabeledDocument = Row("id", "text", "label")
-    training = sc.parallelize([(0L, "a b c d e spark", 1.0),
-                               (1L, "b d", 0.0),
-                               (2L, "spark f g h", 1.0),
-                               (3L, "hadoop mapreduce", 0.0)]) \
+    training = sc.parallelize([(0, "a b c d e spark", 1.0),
+                               (1, "b d", 0.0),
+                               (2, "spark f g h", 1.0),
+                               (3, "hadoop mapreduce", 0.0)]) \
         .map(lambda x: LabeledDocument(*x)).toDF()
 
     # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
@@ -54,16 +56,16 @@ if __name__ == "__main__":
 
     # Prepare test documents, which are unlabeled.
     Document = Row("id", "text")
-    test = sc.parallelize([(4L, "spark i j k"),
-                           (5L, "l m n"),
-                           (6L, "mapreduce spark"),
-                           (7L, "apache hadoop")]) \
+    test = sc.parallelize([(4, "spark i j k"),
+                           (5, "l m n"),
+                           (6, "mapreduce spark"),
+                           (7, "apache hadoop")]) \
         .map(lambda x: Document(*x)).toDF()
 
     # Make predictions on test documents and print columns of interest.
     prediction = model.transform(test)
     selected = prediction.select("id", "text", "prediction")
     for row in selected.collect():
-        print row
+        print(row)
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/correlations.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/correlations.py b/examples/src/main/python/mllib/correlations.py
index 4218eca..0e13546 100755
--- a/examples/src/main/python/mllib/correlations.py
+++ b/examples/src/main/python/mllib/correlations.py
@@ -18,6 +18,7 @@
 """
 Correlations using MLlib.
 """
+from __future__ import print_function
 
 import sys
 
@@ -29,7 +30,7 @@ from pyspark.mllib.util import MLUtils
 
 if __name__ == "__main__":
     if len(sys.argv) not in [1, 2]:
-        print >> sys.stderr, "Usage: correlations (<file>)"
+        print("Usage: correlations (<file>)", file=sys.stderr)
         exit(-1)
     sc = SparkContext(appName="PythonCorrelations")
     if len(sys.argv) == 2:
@@ -41,20 +42,20 @@ if __name__ == "__main__":
     points = MLUtils.loadLibSVMFile(sc, filepath)\
         .map(lambda lp: LabeledPoint(lp.label, lp.features.toArray()))
 
-    print
-    print 'Summary of data file: ' + filepath
-    print '%d data points' % points.count()
+    print()
+    print('Summary of data file: ' + filepath)
+    print('%d data points' % points.count())
 
     # Statistics (correlations)
-    print
-    print 'Correlation (%s) between label and each feature' % corrType
-    print 'Feature\tCorrelation'
+    print()
+    print('Correlation (%s) between label and each feature' % corrType)
+    print('Feature\tCorrelation')
     numFeatures = points.take(1)[0].features.size
     labelRDD = points.map(lambda lp: lp.label)
     for i in range(numFeatures):
         featureRDD = points.map(lambda lp: lp.features[i])
         corr = Statistics.corr(labelRDD, featureRDD, corrType)
-        print '%d\t%g' % (i, corr)
-    print
+        print('%d\t%g' % (i, corr))
+    print()
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/dataset_example.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/dataset_example.py b/examples/src/main/python/mllib/dataset_example.py
index fcbf56c..e23ecc0 100644
--- a/examples/src/main/python/mllib/dataset_example.py
+++ b/examples/src/main/python/mllib/dataset_example.py
@@ -19,6 +19,7 @@
 An example of how to use DataFrame as a dataset for ML. Run with::
     bin/spark-submit examples/src/main/python/mllib/dataset_example.py
 """
+from __future__ import print_function
 
 import os
 import sys
@@ -32,16 +33,16 @@ from pyspark.mllib.stat import Statistics
 
 
 def summarize(dataset):
-    print "schema: %s" % dataset.schema().json()
+    print("schema: %s" % dataset.schema().json())
     labels = dataset.map(lambda r: r.label)
-    print "label average: %f" % labels.mean()
+    print("label average: %f" % labels.mean())
     features = dataset.map(lambda r: r.features)
     summary = Statistics.colStats(features)
-    print "features average: %r" % summary.mean()
+    print("features average: %r" % summary.mean())
 
 if __name__ == "__main__":
     if len(sys.argv) > 2:
-        print >> sys.stderr, "Usage: dataset_example.py <libsvm file>"
+        print("Usage: dataset_example.py <libsvm file>", file=sys.stderr)
         exit(-1)
     sc = SparkContext(appName="DatasetExample")
     sqlContext = SQLContext(sc)
@@ -54,9 +55,9 @@ if __name__ == "__main__":
     summarize(dataset0)
     tempdir = tempfile.NamedTemporaryFile(delete=False).name
     os.unlink(tempdir)
-    print "Save dataset as a Parquet file to %s." % tempdir
+    print("Save dataset as a Parquet file to %s." % tempdir)
     dataset0.saveAsParquetFile(tempdir)
-    print "Load it back and summarize it again."
+    print("Load it back and summarize it again.")
     dataset1 = sqlContext.parquetFile(tempdir).setName("dataset1").cache()
     summarize(dataset1)
     shutil.rmtree(tempdir)

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/decision_tree_runner.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/decision_tree_runner.py b/examples/src/main/python/mllib/decision_tree_runner.py
index fccabd8..513ed8f 100755
--- a/examples/src/main/python/mllib/decision_tree_runner.py
+++ b/examples/src/main/python/mllib/decision_tree_runner.py
@@ -20,6 +20,7 @@ Decision tree classification and regression using MLlib.
 
 This example requires NumPy (http://www.numpy.org/).
 """
+from __future__ import print_function
 
 import numpy
 import os
@@ -83,18 +84,17 @@ def reindexClassLabels(data):
     numClasses = len(classCounts)
     # origToNewLabels: class --> index in 0,...,numClasses-1
     if (numClasses < 2):
-        print >> sys.stderr, \
-            "Dataset for classification should have at least 2 classes." + \
-            " The given dataset had only %d classes." % numClasses
+        print("Dataset for classification should have at least 2 classes."
+              " The given dataset had only %d classes." % numClasses, file=sys.stderr)
         exit(1)
     origToNewLabels = dict([(sortedClasses[i], i) for i in range(0, numClasses)])
 
-    print "numClasses = %d" % numClasses
-    print "Per-class example fractions, counts:"
-    print "Class\tFrac\tCount"
+    print("numClasses = %d" % numClasses)
+    print("Per-class example fractions, counts:")
+    print("Class\tFrac\tCount")
     for c in sortedClasses:
         frac = classCounts[c] / (numExamples + 0.0)
-        print "%g\t%g\t%d" % (c, frac, classCounts[c])
+        print("%g\t%g\t%d" % (c, frac, classCounts[c]))
 
     if (sortedClasses[0] == 0 and sortedClasses[-1] == numClasses - 1):
         return (data, origToNewLabels)
@@ -105,8 +105,7 @@ def reindexClassLabels(data):
 
 
 def usage():
-    print >> sys.stderr, \
-        "Usage: decision_tree_runner [libsvm format data filepath]"
+    print("Usage: decision_tree_runner [libsvm format data filepath]", file=sys.stderr)
     exit(1)
 
 
@@ -133,13 +132,13 @@ if __name__ == "__main__":
     model = DecisionTree.trainClassifier(reindexedData, numClasses=numClasses,
                                          categoricalFeaturesInfo=categoricalFeaturesInfo)
     # Print learned tree and stats.
-    print "Trained DecisionTree for classification:"
-    print "  Model numNodes: %d" % model.numNodes()
-    print "  Model depth: %d" % model.depth()
-    print "  Training accuracy: %g" % getAccuracy(model, reindexedData)
+    print("Trained DecisionTree for classification:")
+    print("  Model numNodes: %d" % model.numNodes())
+    print("  Model depth: %d" % model.depth())
+    print("  Training accuracy: %g" % getAccuracy(model, reindexedData))
     if model.numNodes() < 20:
-        print model.toDebugString()
+        print(model.toDebugString())
     else:
-        print model
+        print(model)
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/gaussian_mixture_model.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/gaussian_mixture_model.py b/examples/src/main/python/mllib/gaussian_mixture_model.py
index a2cd626..2cb8010 100644
--- a/examples/src/main/python/mllib/gaussian_mixture_model.py
+++ b/examples/src/main/python/mllib/gaussian_mixture_model.py
@@ -18,7 +18,8 @@
 """
 A Gaussian Mixture Model clustering program using MLlib.
 """
-import sys
+from __future__ import print_function
+
 import random
 import argparse
 import numpy as np
@@ -59,7 +60,7 @@ if __name__ == "__main__":
     model = GaussianMixture.train(data, args.k, args.convergenceTol,
                                   args.maxIterations, args.seed)
     for i in range(args.k):
-        print ("weight = ", model.weights[i], "mu = ", model.gaussians[i].mu,
-               "sigma = ", model.gaussians[i].sigma.toArray())
-    print ("Cluster labels (first 100): ", model.predict(data).take(100))
+        print(("weight = ", model.weights[i], "mu = ", model.gaussians[i].mu,
+               "sigma = ", model.gaussians[i].sigma.toArray()))
+    print(("Cluster labels (first 100): ", model.predict(data).take(100)))
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/gradient_boosted_trees.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/gradient_boosted_trees.py b/examples/src/main/python/mllib/gradient_boosted_trees.py
index e647773..781bd61 100644
--- a/examples/src/main/python/mllib/gradient_boosted_trees.py
+++ b/examples/src/main/python/mllib/gradient_boosted_trees.py
@@ -18,6 +18,7 @@
 """
 Gradient boosted Trees classification and regression using MLlib.
 """
+from __future__ import print_function
 
 import sys
 
@@ -34,7 +35,7 @@ def testClassification(trainingData, testData):
     # Evaluate model on test instances and compute test error
     predictions = model.predict(testData.map(lambda x: x.features))
     labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
-    testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() \
+    testErr = labelsAndPredictions.filter(lambda v_p: v_p[0] != v_p[1]).count() \
         / float(testData.count())
     print('Test Error = ' + str(testErr))
     print('Learned classification ensemble model:')
@@ -49,7 +50,7 @@ def testRegression(trainingData, testData):
     # Evaluate model on test instances and compute test error
     predictions = model.predict(testData.map(lambda x: x.features))
     labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
-    testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() \
+    testMSE = labelsAndPredictions.map(lambda vp: (vp[0] - vp[1]) * (vp[0] - vp[1])).sum() \
         / float(testData.count())
     print('Test Mean Squared Error = ' + str(testMSE))
     print('Learned regression ensemble model:')
@@ -58,7 +59,7 @@ def testRegression(trainingData, testData):
 
 if __name__ == "__main__":
     if len(sys.argv) > 1:
-        print >> sys.stderr, "Usage: gradient_boosted_trees"
+        print("Usage: gradient_boosted_trees", file=sys.stderr)
         exit(1)
     sc = SparkContext(appName="PythonGradientBoostedTrees")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/kmeans.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/kmeans.py b/examples/src/main/python/mllib/kmeans.py
index 2eeb1ab..f901a87 100755
--- a/examples/src/main/python/mllib/kmeans.py
+++ b/examples/src/main/python/mllib/kmeans.py
@@ -20,6 +20,7 @@ A K-means clustering program using MLlib.
 
 This example requires NumPy (http://www.numpy.org/).
 """
+from __future__ import print_function
 
 import sys
 
@@ -34,12 +35,12 @@ def parseVector(line):
 
 if __name__ == "__main__":
     if len(sys.argv) != 3:
-        print >> sys.stderr, "Usage: kmeans <file> <k>"
+        print("Usage: kmeans <file> <k>", file=sys.stderr)
         exit(-1)
     sc = SparkContext(appName="KMeans")
     lines = sc.textFile(sys.argv[1])
     data = lines.map(parseVector)
     k = int(sys.argv[2])
     model = KMeans.train(data, k)
-    print "Final centers: " + str(model.clusterCenters)
+    print("Final centers: " + str(model.clusterCenters))
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/logistic_regression.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/logistic_regression.py b/examples/src/main/python/mllib/logistic_regression.py
index 8cae27f..d4f1d34 100755
--- a/examples/src/main/python/mllib/logistic_regression.py
+++ b/examples/src/main/python/mllib/logistic_regression.py
@@ -20,11 +20,10 @@ Logistic regression using MLlib.
 
 This example requires NumPy (http://www.numpy.org/).
 """
+from __future__ import print_function
 
-from math import exp
 import sys
 
-import numpy as np
 from pyspark import SparkContext
 from pyspark.mllib.regression import LabeledPoint
 from pyspark.mllib.classification import LogisticRegressionWithSGD
@@ -42,12 +41,12 @@ def parsePoint(line):
 
 if __name__ == "__main__":
     if len(sys.argv) != 3:
-        print >> sys.stderr, "Usage: logistic_regression <file> <iterations>"
+        print("Usage: logistic_regression <file> <iterations>", file=sys.stderr)
         exit(-1)
     sc = SparkContext(appName="PythonLR")
     points = sc.textFile(sys.argv[1]).map(parsePoint)
     iterations = int(sys.argv[2])
     model = LogisticRegressionWithSGD.train(points, iterations)
-    print "Final weights: " + str(model.weights)
-    print "Final intercept: " + str(model.intercept)
+    print("Final weights: " + str(model.weights))
+    print("Final intercept: " + str(model.intercept))
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/random_forest_example.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/random_forest_example.py b/examples/src/main/python/mllib/random_forest_example.py
index d3c24f7..4cfdad8 100755
--- a/examples/src/main/python/mllib/random_forest_example.py
+++ b/examples/src/main/python/mllib/random_forest_example.py
@@ -22,6 +22,7 @@ Note: This example illustrates binary classification.
       For information on multiclass classification, please refer to the decision_tree_runner.py
       example.
 """
+from __future__ import print_function
 
 import sys
 
@@ -43,7 +44,7 @@ def testClassification(trainingData, testData):
     # Evaluate model on test instances and compute test error
     predictions = model.predict(testData.map(lambda x: x.features))
     labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
-    testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count()\
+    testErr = labelsAndPredictions.filter(lambda v_p: v_p[0] != v_p[1]).count()\
         / float(testData.count())
     print('Test Error = ' + str(testErr))
     print('Learned classification forest model:')
@@ -62,8 +63,8 @@ def testRegression(trainingData, testData):
     # Evaluate model on test instances and compute test error
     predictions = model.predict(testData.map(lambda x: x.features))
     labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
-    testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum()\
-        / float(testData.count())
+    testMSE = labelsAndPredictions.map(lambda v_p1: (v_p1[0] - v_p1[1]) * (v_p1[0] - v_p1[1]))\
+        .sum() / float(testData.count())
     print('Test Mean Squared Error = ' + str(testMSE))
     print('Learned regression forest model:')
     print(model.toDebugString())
@@ -71,7 +72,7 @@ def testRegression(trainingData, testData):
 
 if __name__ == "__main__":
     if len(sys.argv) > 1:
-        print >> sys.stderr, "Usage: random_forest_example"
+        print("Usage: random_forest_example", file=sys.stderr)
         exit(1)
     sc = SparkContext(appName="PythonRandomForestExample")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/random_rdd_generation.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/random_rdd_generation.py b/examples/src/main/python/mllib/random_rdd_generation.py
index 1e88927..729bae3 100755
--- a/examples/src/main/python/mllib/random_rdd_generation.py
+++ b/examples/src/main/python/mllib/random_rdd_generation.py
@@ -18,6 +18,7 @@
 """
 Randomly generated RDDs.
 """
+from __future__ import print_function
 
 import sys
 
@@ -27,7 +28,7 @@ from pyspark.mllib.random import RandomRDDs
 
 if __name__ == "__main__":
     if len(sys.argv) not in [1, 2]:
-        print >> sys.stderr, "Usage: random_rdd_generation"
+        print("Usage: random_rdd_generation", file=sys.stderr)
         exit(-1)
 
     sc = SparkContext(appName="PythonRandomRDDGeneration")
@@ -37,19 +38,19 @@ if __name__ == "__main__":
 
     # Example: RandomRDDs.normalRDD
     normalRDD = RandomRDDs.normalRDD(sc, numExamples)
-    print 'Generated RDD of %d examples sampled from the standard normal distribution'\
-        % normalRDD.count()
-    print '  First 5 samples:'
+    print('Generated RDD of %d examples sampled from the standard normal distribution'
+          % normalRDD.count())
+    print('  First 5 samples:')
     for sample in normalRDD.take(5):
-        print '    ' + str(sample)
-    print
+        print('    ' + str(sample))
+    print()
 
     # Example: RandomRDDs.normalVectorRDD
     normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows=numExamples, numCols=2)
-    print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count()
-    print '  First 5 samples:'
+    print('Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count())
+    print('  First 5 samples:')
     for sample in normalVectorRDD.take(5):
-        print '    ' + str(sample)
-    print
+        print('    ' + str(sample))
+    print()
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/sampled_rdds.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/sampled_rdds.py b/examples/src/main/python/mllib/sampled_rdds.py
index 92af3af..b7033ab 100755
--- a/examples/src/main/python/mllib/sampled_rdds.py
+++ b/examples/src/main/python/mllib/sampled_rdds.py
@@ -18,6 +18,7 @@
 """
 Randomly sampled RDDs.
 """
+from __future__ import print_function
 
 import sys
 
@@ -27,7 +28,7 @@ from pyspark.mllib.util import MLUtils
 
 if __name__ == "__main__":
     if len(sys.argv) not in [1, 2]:
-        print >> sys.stderr, "Usage: sampled_rdds <libsvm data file>"
+        print("Usage: sampled_rdds <libsvm data file>", file=sys.stderr)
         exit(-1)
     if len(sys.argv) == 2:
         datapath = sys.argv[1]
@@ -41,24 +42,24 @@ if __name__ == "__main__":
     examples = MLUtils.loadLibSVMFile(sc, datapath)
     numExamples = examples.count()
     if numExamples == 0:
-        print >> sys.stderr, "Error: Data file had no samples to load."
+        print("Error: Data file had no samples to load.", file=sys.stderr)
         exit(1)
-    print 'Loaded data with %d examples from file: %s' % (numExamples, datapath)
+    print('Loaded data with %d examples from file: %s' % (numExamples, datapath))
 
     # Example: RDD.sample() and RDD.takeSample()
     expectedSampleSize = int(numExamples * fraction)
-    print 'Sampling RDD using fraction %g.  Expected sample size = %d.' \
-        % (fraction, expectedSampleSize)
+    print('Sampling RDD using fraction %g.  Expected sample size = %d.'
+          % (fraction, expectedSampleSize))
     sampledRDD = examples.sample(withReplacement=True, fraction=fraction)
-    print '  RDD.sample(): sample has %d examples' % sampledRDD.count()
+    print('  RDD.sample(): sample has %d examples' % sampledRDD.count())
     sampledArray = examples.takeSample(withReplacement=True, num=expectedSampleSize)
-    print '  RDD.takeSample(): sample has %d examples' % len(sampledArray)
+    print('  RDD.takeSample(): sample has %d examples' % len(sampledArray))
 
-    print
+    print()
 
     # Example: RDD.sampleByKey()
     keyedRDD = examples.map(lambda lp: (int(lp.label), lp.features))
-    print '  Keyed data using label (Int) as key ==> Orig'
+    print('  Keyed data using label (Int) as key ==> Orig')
     #  Count examples per label in original data.
     keyCountsA = keyedRDD.countByKey()
 
@@ -69,18 +70,18 @@ if __name__ == "__main__":
     sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement=True, fractions=fractions)
     keyCountsB = sampledByKeyRDD.countByKey()
     sizeB = sum(keyCountsB.values())
-    print '  Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \
-        % sizeB
+    print('  Sampled %d examples using approximate stratified sampling (by label). ==> Sample'
+          % sizeB)
 
     #  Compare samples
-    print '   \tFractions of examples with key'
-    print 'Key\tOrig\tSample'
+    print('   \tFractions of examples with key')
+    print('Key\tOrig\tSample')
     for k in sorted(keyCountsA.keys()):
         fracA = keyCountsA[k] / float(numExamples)
         if sizeB != 0:
             fracB = keyCountsB.get(k, 0) / float(sizeB)
         else:
             fracB = 0
-        print '%d\t%g\t%g' % (k, fracA, fracB)
+        print('%d\t%g\t%g' % (k, fracA, fracB))
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/word2vec.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/word2vec.py b/examples/src/main/python/mllib/word2vec.py
index 99fef42..40d1b88 100644
--- a/examples/src/main/python/mllib/word2vec.py
+++ b/examples/src/main/python/mllib/word2vec.py
@@ -23,6 +23,7 @@
 # grep -o -E '\w+(\W+\w+){0,15}' text8 > text8_lines
 # This was done so that the example can be run in local mode
 
+from __future__ import print_function
 
 import sys
 
@@ -34,7 +35,7 @@ USAGE = ("bin/spark-submit --driver-memory 4g "
 
 if __name__ == "__main__":
     if len(sys.argv) < 2:
-        print USAGE
+        print(USAGE)
         sys.exit("Argument for file not provided")
     file_path = sys.argv[1]
     sc = SparkContext(appName='Word2Vec')
@@ -46,5 +47,5 @@ if __name__ == "__main__":
     synonyms = model.findSynonyms('china', 40)
 
     for word, cosine_distance in synonyms:
-        print "{}: {}".format(word, cosine_distance)
+        print("{}: {}".format(word, cosine_distance))
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/pagerank.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/pagerank.py b/examples/src/main/python/pagerank.py
index a5f25d7..2fdc977 100755
--- a/examples/src/main/python/pagerank.py
+++ b/examples/src/main/python/pagerank.py
@@ -19,6 +19,7 @@
 This is an example implementation of PageRank. For more conventional use,
 Please refer to PageRank implementation provided by graphx
 """
+from __future__ import print_function
 
 import re
 import sys
@@ -42,11 +43,12 @@ def parseNeighbors(urls):
 
 if __name__ == "__main__":
     if len(sys.argv) != 3:
-        print >> sys.stderr, "Usage: pagerank <file> <iterations>"
+        print("Usage: pagerank <file> <iterations>", file=sys.stderr)
         exit(-1)
 
-    print >> sys.stderr,  """WARN: This is a naive implementation of PageRank and is
-          given as an example! Please refer to PageRank implementation provided by graphx"""
+    print("""WARN: This is a naive implementation of PageRank and is
+          given as an example! Please refer to PageRank implementation provided by graphx""",
+          file=sys.stderr)
 
     # Initialize the spark context.
     sc = SparkContext(appName="PythonPageRank")
@@ -62,19 +64,19 @@ if __name__ == "__main__":
     links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
 
     # Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
-    ranks = links.map(lambda (url, neighbors): (url, 1.0))
+    ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))
 
     # Calculates and updates URL ranks continuously using PageRank algorithm.
-    for iteration in xrange(int(sys.argv[2])):
+    for iteration in range(int(sys.argv[2])):
         # Calculates URL contributions to the rank of other URLs.
         contribs = links.join(ranks).flatMap(
-            lambda (url, (urls, rank)): computeContribs(urls, rank))
+            lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
 
         # Re-calculates URL ranks based on neighbor contributions.
         ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
 
     # Collects all URL ranks and dump them to console.
     for (link, rank) in ranks.collect():
-        print "%s has rank: %s." % (link, rank)
+        print("%s has rank: %s." % (link, rank))
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/parquet_inputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/parquet_inputformat.py b/examples/src/main/python/parquet_inputformat.py
index fa4c20a..96ddac7 100644
--- a/examples/src/main/python/parquet_inputformat.py
+++ b/examples/src/main/python/parquet_inputformat.py
@@ -1,3 +1,4 @@
+from __future__ import print_function
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -35,14 +36,14 @@ $ ./bin/spark-submit --driver-class-path /path/to/example/jar \\
 """
 if __name__ == "__main__":
     if len(sys.argv) != 2:
-        print >> sys.stderr, """
+        print("""
         Usage: parquet_inputformat.py <data_file>
 
         Run with example jar:
         ./bin/spark-submit --driver-class-path /path/to/example/jar \\
                 /path/to/examples/parquet_inputformat.py <data_file>
         Assumes you have Parquet data stored in <data_file>.
-        """
+        """, file=sys.stderr)
         exit(-1)
 
     path = sys.argv[1]
@@ -56,6 +57,6 @@ if __name__ == "__main__":
         valueConverter='org.apache.spark.examples.pythonconverters.IndexedRecordToJavaConverter')
     output = parquet_rdd.map(lambda x: x[1]).collect()
     for k in output:
-        print k
+        print(k)
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/pi.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py
index a7c74e9..92e5cf4 100755
--- a/examples/src/main/python/pi.py
+++ b/examples/src/main/python/pi.py
@@ -1,3 +1,4 @@
+from __future__ import print_function
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -35,7 +36,7 @@ if __name__ == "__main__":
         y = random() * 2 - 1
         return 1 if x ** 2 + y ** 2 < 1 else 0
 
-    count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
-    print "Pi is roughly %f" % (4.0 * count / n)
+    count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
+    print("Pi is roughly %f" % (4.0 * count / n))
 
     sc.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org