You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/04/17 01:21:17 UTC
[6/6] spark git commit: [SPARK-4897] [PySpark] Python 3 support
[SPARK-4897] [PySpark] Python 3 support
This PR update PySpark to support Python 3 (tested with 3.4).
Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped.
TODO: ec2/spark-ec2.py is not fully tested with python3.
Author: Davies Liu <da...@databricks.com>
Author: twneale <tw...@gmail.com>
Author: Josh Rosen <jo...@databricks.com>
Closes #5173 from davies/python3 and squashes the following commits:
d7d6323 [Davies Liu] fix tests
6c52a98 [Davies Liu] fix mllib test
99e334f [Davies Liu] update timeout
b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
cafd5ec [Davies Liu] adddress comments from @mengxr
bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
179fc8d [Davies Liu] tuning flaky tests
8c8b957 [Davies Liu] fix ResourceWarning in Python 3
5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
4006829 [Davies Liu] fix test
2fc0066 [Davies Liu] add python3 path
71535e9 [Davies Liu] fix xrange and divide
5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
ed498c8 [Davies Liu] fix compatibility with python 3
820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
ad7c374 [Davies Liu] fix mllib test and warning
ef1fc2f [Davies Liu] fix tests
4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
59bb492 [Davies Liu] fix tests
1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
ca0fdd3 [Davies Liu] fix code style
9563a15 [Davies Liu] add imap back for python 2
0b1ec04 [Davies Liu] make python examples work with Python 3
d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
a716d34 [Davies Liu] test with python 3.4
f1700e8 [Davies Liu] fix test in python3
671b1db [Davies Liu] fix test in python3
692ff47 [Davies Liu] fix flaky test
7b9699f [Davies Liu] invalidate import cache for Python 3.3+
9c58497 [Davies Liu] fix kill worker
309bfbf [Davies Liu] keep compatibility
5707476 [Davies Liu] cleanup, fix hash of string in 3.3+
8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3
f53e1f0 [Davies Liu] fix tests
70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3
a39167e [Davies Liu] support customize class in __main__
814c77b [Davies Liu] run unittests with python 3
7f4476e [Davies Liu] mllib tests passed
d737924 [Davies Liu] pass ml tests
375ea17 [Davies Liu] SQL tests pass
6cc42a9 [Davies Liu] rename
431a8de [Davies Liu] streaming tests pass
78901a7 [Davies Liu] fix hash of serializer in Python 3
24b2f2e [Davies Liu] pass all RDD tests
35f48fe [Davies Liu] run future again
1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py
6e3c21d [Davies Liu] make cloudpickle work with Python3
2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run
1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out
7354371 [twneale] buffer --> memoryview I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work.
b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?).
f40d925 [twneale] xrange --> range
e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206
79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper
2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3
854be27 [Josh Rosen] Run `futurize` on Python code:
7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04e44b37
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04e44b37
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04e44b37
Branch: refs/heads/master
Commit: 04e44b37cc04f62fbf9e08c7076349e0a4d12ea8
Parents: 55f553a
Author: Davies Liu <da...@databricks.com>
Authored: Thu Apr 16 16:20:57 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Apr 16 16:20:57 2015 -0700
----------------------------------------------------------------------
bin/pyspark | 1 +
bin/spark-submit | 3 +
bin/spark-submit2.cmd | 3 +
dev/run-tests | 2 +
dev/run-tests-jenkins | 2 +-
ec2/spark_ec2.py | 262 ++--
examples/src/main/python/als.py | 15 +-
examples/src/main/python/avro_inputformat.py | 9 +-
.../src/main/python/cassandra_inputformat.py | 8 +-
.../src/main/python/cassandra_outputformat.py | 6 +-
examples/src/main/python/hbase_inputformat.py | 8 +-
examples/src/main/python/hbase_outputformat.py | 6 +-
examples/src/main/python/kmeans.py | 11 +-
examples/src/main/python/logistic_regression.py | 20 +-
.../ml/simple_text_classification_pipeline.py | 20 +-
examples/src/main/python/mllib/correlations.py | 19 +-
.../src/main/python/mllib/dataset_example.py | 13 +-
.../main/python/mllib/decision_tree_runner.py | 29 +-
.../main/python/mllib/gaussian_mixture_model.py | 9 +-
.../main/python/mllib/gradient_boosted_trees.py | 7 +-
examples/src/main/python/mllib/kmeans.py | 5 +-
.../main/python/mllib/logistic_regression.py | 9 +-
.../main/python/mllib/random_forest_example.py | 9 +-
.../main/python/mllib/random_rdd_generation.py | 21 +-
examples/src/main/python/mllib/sampled_rdds.py | 29 +-
examples/src/main/python/mllib/word2vec.py | 5 +-
examples/src/main/python/pagerank.py | 16 +-
examples/src/main/python/parquet_inputformat.py | 7 +-
examples/src/main/python/pi.py | 5 +-
examples/src/main/python/sort.py | 6 +-
examples/src/main/python/sql.py | 4 +-
examples/src/main/python/status_api_demo.py | 10 +-
.../src/main/python/streaming/hdfs_wordcount.py | 3 +-
.../main/python/streaming/kafka_wordcount.py | 3 +-
.../main/python/streaming/network_wordcount.py | 3 +-
.../streaming/recoverable_network_wordcount.py | 11 +-
.../python/streaming/sql_network_wordcount.py | 5 +-
.../streaming/stateful_network_wordcount.py | 3 +-
examples/src/main/python/transitive_closure.py | 10 +-
examples/src/main/python/wordcount.py | 6 +-
.../MatrixFactorizationModelWrapper.scala | 9 +-
.../spark/mllib/api/python/PythonMLLibAPI.scala | 39 +-
python/pyspark/accumulators.py | 9 +-
python/pyspark/broadcast.py | 37 +-
python/pyspark/cloudpickle.py | 577 +++-----
python/pyspark/conf.py | 9 +-
python/pyspark/context.py | 42 +-
python/pyspark/daemon.py | 36 +-
python/pyspark/heapq3.py | 24 +-
python/pyspark/java_gateway.py | 2 +-
python/pyspark/join.py | 1 +
python/pyspark/ml/classification.py | 4 +-
python/pyspark/ml/feature.py | 22 +-
python/pyspark/ml/param/__init__.py | 8 +-
.../pyspark/ml/param/_shared_params_code_gen.py | 10 +-
python/pyspark/mllib/__init__.py | 11 +-
python/pyspark/mllib/classification.py | 7 +-
python/pyspark/mllib/clustering.py | 18 +-
python/pyspark/mllib/common.py | 19 +-
python/pyspark/mllib/feature.py | 18 +-
python/pyspark/mllib/fpm.py | 2 +
python/pyspark/mllib/linalg.py | 48 +-
python/pyspark/mllib/rand.py | 33 +-
python/pyspark/mllib/recommendation.py | 7 +-
python/pyspark/mllib/stat/_statistics.py | 25 +-
python/pyspark/mllib/tests.py | 20 +-
python/pyspark/mllib/tree.py | 15 +-
python/pyspark/mllib/util.py | 26 +-
python/pyspark/profiler.py | 10 +-
python/pyspark/rdd.py | 189 +--
python/pyspark/rddsampler.py | 4 +-
python/pyspark/serializers.py | 101 +-
python/pyspark/shell.py | 16 +-
python/pyspark/shuffle.py | 126 +-
python/pyspark/sql/__init__.py | 15 +-
python/pyspark/sql/_types.py | 1261 ++++++++++++++++++
python/pyspark/sql/context.py | 32 +-
python/pyspark/sql/dataframe.py | 63 +-
python/pyspark/sql/functions.py | 6 +-
python/pyspark/sql/tests.py | 11 +-
python/pyspark/sql/types.py | 1252 -----------------
python/pyspark/statcounter.py | 4 +-
python/pyspark/streaming/context.py | 5 +-
python/pyspark/streaming/dstream.py | 51 +-
python/pyspark/streaming/kafka.py | 8 +-
python/pyspark/streaming/tests.py | 39 +-
python/pyspark/streaming/util.py | 6 +-
python/pyspark/tests.py | 327 ++---
python/pyspark/worker.py | 16 +-
python/run-tests | 15 +-
python/test_support/userlib-0.1-py2.7.egg | Bin 1945 -> 0 bytes
python/test_support/userlib-0.1.zip | Bin 0 -> 668 bytes
92 files changed, 2630 insertions(+), 2628 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/bin/pyspark
----------------------------------------------------------------------
diff --git a/bin/pyspark b/bin/pyspark
index 776b28d..8acad61 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -89,6 +89,7 @@ export PYTHONSTARTUP="$SPARK_HOME/python/pyspark/shell.py"
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
+ export PYTHONHASHSEED=0
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
else
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/bin/spark-submit
----------------------------------------------------------------------
diff --git a/bin/spark-submit b/bin/spark-submit
index bcff78e..0e0afe7 100755
--- a/bin/spark-submit
+++ b/bin/spark-submit
@@ -19,6 +19,9 @@
SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
+# disable randomized hash for string in Python 3.3+
+export PYTHONHASHSEED=0
+
# Only define a usage function if an upstream script hasn't done so.
if ! type -t usage >/dev/null 2>&1; then
usage() {
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/bin/spark-submit2.cmd
----------------------------------------------------------------------
diff --git a/bin/spark-submit2.cmd b/bin/spark-submit2.cmd
index 08ddb18..d3fc4a5 100644
--- a/bin/spark-submit2.cmd
+++ b/bin/spark-submit2.cmd
@@ -20,6 +20,9 @@ rem
rem This is the entry point for running Spark submit. To avoid polluting the
rem environment, it just launches a new cmd to do the real work.
+rem disable randomized hash for string in Python 3.3+
+set PYTHONHASHSEED=0
+
set CLASS=org.apache.spark.deploy.SparkSubmit
call %~dp0spark-class2.cmd %CLASS% %*
set SPARK_ERROR_LEVEL=%ERRORLEVEL%
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/dev/run-tests
----------------------------------------------------------------------
diff --git a/dev/run-tests b/dev/run-tests
index bb21ab6..861d167 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -235,6 +235,8 @@ echo "========================================================================="
CURRENT_BLOCK=$BLOCK_PYSPARK_UNIT_TESTS
+# add path for python 3 in jenkins
+export PATH="${PATH}:/home/anaonda/envs/py3k/bin"
./python/run-tests
echo ""
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/dev/run-tests-jenkins
----------------------------------------------------------------------
diff --git a/dev/run-tests-jenkins b/dev/run-tests-jenkins
index 3c1c91a..030f2cd 100755
--- a/dev/run-tests-jenkins
+++ b/dev/run-tests-jenkins
@@ -47,7 +47,7 @@ COMMIT_URL="https://github.com/apache/spark/commit/${ghprbActualCommit}"
# GitHub doesn't auto-link short hashes when submitted via the API, unfortunately. :(
SHORT_COMMIT_HASH="${ghprbActualCommit:0:7}"
-TESTS_TIMEOUT="120m" # format: http://linux.die.net/man/1/timeout
+TESTS_TIMEOUT="150m" # format: http://linux.die.net/man/1/timeout
# Array to capture all tests to run on the pull request. These tests are held under the
#+ dev/tests/ directory.
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/ec2/spark_ec2.py
----------------------------------------------------------------------
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 0c1f247..87c0818 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -19,7 +19,7 @@
# limitations under the License.
#
-from __future__ import with_statement
+from __future__ import with_statement, print_function
import hashlib
import itertools
@@ -37,12 +37,17 @@ import tarfile
import tempfile
import textwrap
import time
-import urllib2
import warnings
from datetime import datetime
from optparse import OptionParser
from sys import stderr
+if sys.version < "3":
+ from urllib2 import urlopen, Request, HTTPError
+else:
+ from urllib.request import urlopen, Request
+ from urllib.error import HTTPError
+
SPARK_EC2_VERSION = "1.2.1"
SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))
@@ -88,10 +93,10 @@ def setup_external_libs(libs):
SPARK_EC2_LIB_DIR = os.path.join(SPARK_EC2_DIR, "lib")
if not os.path.exists(SPARK_EC2_LIB_DIR):
- print "Downloading external libraries that spark-ec2 needs from PyPI to {path}...".format(
+ print("Downloading external libraries that spark-ec2 needs from PyPI to {path}...".format(
path=SPARK_EC2_LIB_DIR
- )
- print "This should be a one-time operation."
+ ))
+ print("This should be a one-time operation.")
os.mkdir(SPARK_EC2_LIB_DIR)
for lib in libs:
@@ -100,8 +105,8 @@ def setup_external_libs(libs):
if not os.path.isdir(lib_dir):
tgz_file_path = os.path.join(SPARK_EC2_LIB_DIR, versioned_lib_name + ".tar.gz")
- print " - Downloading {lib}...".format(lib=lib["name"])
- download_stream = urllib2.urlopen(
+ print(" - Downloading {lib}...".format(lib=lib["name"]))
+ download_stream = urlopen(
"{prefix}/{first_letter}/{lib_name}/{lib_name}-{lib_version}.tar.gz".format(
prefix=PYPI_URL_PREFIX,
first_letter=lib["name"][:1],
@@ -113,13 +118,13 @@ def setup_external_libs(libs):
tgz_file.write(download_stream.read())
with open(tgz_file_path) as tar:
if hashlib.md5(tar.read()).hexdigest() != lib["md5"]:
- print >> stderr, "ERROR: Got wrong md5sum for {lib}.".format(lib=lib["name"])
+ print("ERROR: Got wrong md5sum for {lib}.".format(lib=lib["name"]), file=stderr)
sys.exit(1)
tar = tarfile.open(tgz_file_path)
tar.extractall(path=SPARK_EC2_LIB_DIR)
tar.close()
os.remove(tgz_file_path)
- print " - Finished downloading {lib}.".format(lib=lib["name"])
+ print(" - Finished downloading {lib}.".format(lib=lib["name"]))
sys.path.insert(1, lib_dir)
@@ -299,12 +304,12 @@ def parse_args():
if home_dir is None or not os.path.isfile(home_dir + '/.boto'):
if not os.path.isfile('/etc/boto.cfg'):
if os.getenv('AWS_ACCESS_KEY_ID') is None:
- print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " +
- "must be set")
+ print("ERROR: The environment variable AWS_ACCESS_KEY_ID must be set",
+ file=stderr)
sys.exit(1)
if os.getenv('AWS_SECRET_ACCESS_KEY') is None:
- print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " +
- "must be set")
+ print("ERROR: The environment variable AWS_SECRET_ACCESS_KEY must be set",
+ file=stderr)
sys.exit(1)
return (opts, action, cluster_name)
@@ -316,7 +321,7 @@ def get_or_make_group(conn, name, vpc_id):
if len(group) > 0:
return group[0]
else:
- print "Creating security group " + name
+ print("Creating security group " + name)
return conn.create_security_group(name, "Spark EC2 group", vpc_id)
@@ -324,18 +329,19 @@ def get_validate_spark_version(version, repo):
if "." in version:
version = version.replace("v", "")
if version not in VALID_SPARK_VERSIONS:
- print >> stderr, "Don't know about Spark version: {v}".format(v=version)
+ print("Don't know about Spark version: {v}".format(v=version), file=stderr)
sys.exit(1)
return version
else:
github_commit_url = "{repo}/commit/{commit_hash}".format(repo=repo, commit_hash=version)
- request = urllib2.Request(github_commit_url)
+ request = Request(github_commit_url)
request.get_method = lambda: 'HEAD'
try:
- response = urllib2.urlopen(request)
- except urllib2.HTTPError, e:
- print >> stderr, "Couldn't validate Spark commit: {url}".format(url=github_commit_url)
- print >> stderr, "Received HTTP response code of {code}.".format(code=e.code)
+ response = urlopen(request)
+ except HTTPError as e:
+ print("Couldn't validate Spark commit: {url}".format(url=github_commit_url),
+ file=stderr)
+ print("Received HTTP response code of {code}.".format(code=e.code), file=stderr)
sys.exit(1)
return version
@@ -394,8 +400,7 @@ def get_spark_ami(opts):
instance_type = EC2_INSTANCE_TYPES[opts.instance_type]
else:
instance_type = "pvm"
- print >> stderr,\
- "Don't recognize %s, assuming type is pvm" % opts.instance_type
+ print("Don't recognize %s, assuming type is pvm" % opts.instance_type, file=stderr)
# URL prefix from which to fetch AMI information
ami_prefix = "{r}/{b}/ami-list".format(
@@ -404,10 +409,10 @@ def get_spark_ami(opts):
ami_path = "%s/%s/%s" % (ami_prefix, opts.region, instance_type)
try:
- ami = urllib2.urlopen(ami_path).read().strip()
- print "Spark AMI: " + ami
+ ami = urlopen(ami_path).read().strip()
+ print("Spark AMI: " + ami)
except:
- print >> stderr, "Could not resolve AMI at: " + ami_path
+ print("Could not resolve AMI at: " + ami_path, file=stderr)
sys.exit(1)
return ami
@@ -419,11 +424,11 @@ def get_spark_ami(opts):
# Fails if there already instances running in the cluster's groups.
def launch_cluster(conn, opts, cluster_name):
if opts.identity_file is None:
- print >> stderr, "ERROR: Must provide an identity file (-i) for ssh connections."
+ print("ERROR: Must provide an identity file (-i) for ssh connections.", file=stderr)
sys.exit(1)
if opts.key_pair is None:
- print >> stderr, "ERROR: Must provide a key pair name (-k) to use on instances."
+ print("ERROR: Must provide a key pair name (-k) to use on instances.", file=stderr)
sys.exit(1)
user_data_content = None
@@ -431,7 +436,7 @@ def launch_cluster(conn, opts, cluster_name):
with open(opts.user_data) as user_data_file:
user_data_content = user_data_file.read()
- print "Setting up security groups..."
+ print("Setting up security groups...")
master_group = get_or_make_group(conn, cluster_name + "-master", opts.vpc_id)
slave_group = get_or_make_group(conn, cluster_name + "-slaves", opts.vpc_id)
authorized_address = opts.authorized_address
@@ -497,8 +502,8 @@ def launch_cluster(conn, opts, cluster_name):
existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name,
die_on_error=False)
if existing_slaves or (existing_masters and not opts.use_existing_master):
- print >> stderr, ("ERROR: There are already instances running in " +
- "group %s or %s" % (master_group.name, slave_group.name))
+ print("ERROR: There are already instances running in group %s or %s" %
+ (master_group.name, slave_group.name), file=stderr)
sys.exit(1)
# Figure out Spark AMI
@@ -511,12 +516,12 @@ def launch_cluster(conn, opts, cluster_name):
additional_group_ids = [sg.id
for sg in conn.get_all_security_groups()
if opts.additional_security_group in (sg.name, sg.id)]
- print "Launching instances..."
+ print("Launching instances...")
try:
image = conn.get_all_images(image_ids=[opts.ami])[0]
except:
- print >> stderr, "Could not find AMI " + opts.ami
+ print("Could not find AMI " + opts.ami, file=stderr)
sys.exit(1)
# Create block device mapping so that we can add EBS volumes if asked to.
@@ -542,8 +547,8 @@ def launch_cluster(conn, opts, cluster_name):
# Launch slaves
if opts.spot_price is not None:
# Launch spot instances with the requested price
- print ("Requesting %d slaves as spot instances with price $%.3f" %
- (opts.slaves, opts.spot_price))
+ print("Requesting %d slaves as spot instances with price $%.3f" %
+ (opts.slaves, opts.spot_price))
zones = get_zones(conn, opts)
num_zones = len(zones)
i = 0
@@ -566,7 +571,7 @@ def launch_cluster(conn, opts, cluster_name):
my_req_ids += [req.id for req in slave_reqs]
i += 1
- print "Waiting for spot instances to be granted..."
+ print("Waiting for spot instances to be granted...")
try:
while True:
time.sleep(10)
@@ -579,24 +584,24 @@ def launch_cluster(conn, opts, cluster_name):
if i in id_to_req and id_to_req[i].state == "active":
active_instance_ids.append(id_to_req[i].instance_id)
if len(active_instance_ids) == opts.slaves:
- print "All %d slaves granted" % opts.slaves
+ print("All %d slaves granted" % opts.slaves)
reservations = conn.get_all_reservations(active_instance_ids)
slave_nodes = []
for r in reservations:
slave_nodes += r.instances
break
else:
- print "%d of %d slaves granted, waiting longer" % (
- len(active_instance_ids), opts.slaves)
+ print("%d of %d slaves granted, waiting longer" % (
+ len(active_instance_ids), opts.slaves))
except:
- print "Canceling spot instance requests"
+ print("Canceling spot instance requests")
conn.cancel_spot_instance_requests(my_req_ids)
# Log a warning if any of these requests actually launched instances:
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
running = len(master_nodes) + len(slave_nodes)
if running:
- print >> stderr, ("WARNING: %d instances are still running" % running)
+ print(("WARNING: %d instances are still running" % running), file=stderr)
sys.exit(0)
else:
# Launch non-spot instances
@@ -618,16 +623,16 @@ def launch_cluster(conn, opts, cluster_name):
placement_group=opts.placement_group,
user_data=user_data_content)
slave_nodes += slave_res.instances
- print "Launched {s} slave{plural_s} in {z}, regid = {r}".format(
- s=num_slaves_this_zone,
- plural_s=('' if num_slaves_this_zone == 1 else 's'),
- z=zone,
- r=slave_res.id)
+ print("Launched {s} slave{plural_s} in {z}, regid = {r}".format(
+ s=num_slaves_this_zone,
+ plural_s=('' if num_slaves_this_zone == 1 else 's'),
+ z=zone,
+ r=slave_res.id))
i += 1
# Launch or resume masters
if existing_masters:
- print "Starting master..."
+ print("Starting master...")
for inst in existing_masters:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
@@ -650,10 +655,10 @@ def launch_cluster(conn, opts, cluster_name):
user_data=user_data_content)
master_nodes = master_res.instances
- print "Launched master in %s, regid = %s" % (zone, master_res.id)
+ print("Launched master in %s, regid = %s" % (zone, master_res.id))
# This wait time corresponds to SPARK-4983
- print "Waiting for AWS to propagate instance metadata..."
+ print("Waiting for AWS to propagate instance metadata...")
time.sleep(5)
# Give the instances descriptive names
for master in master_nodes:
@@ -674,8 +679,8 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
Get the EC2 instances in an existing cluster if available.
Returns a tuple of lists of EC2 instance objects for the masters and slaves.
"""
- print "Searching for existing cluster {c} in region {r}...".format(
- c=cluster_name, r=opts.region)
+ print("Searching for existing cluster {c} in region {r}...".format(
+ c=cluster_name, r=opts.region))
def get_instances(group_names):
"""
@@ -693,16 +698,15 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
slave_instances = get_instances([cluster_name + "-slaves"])
if any((master_instances, slave_instances)):
- print "Found {m} master{plural_m}, {s} slave{plural_s}.".format(
- m=len(master_instances),
- plural_m=('' if len(master_instances) == 1 else 's'),
- s=len(slave_instances),
- plural_s=('' if len(slave_instances) == 1 else 's'))
+ print("Found {m} master{plural_m}, {s} slave{plural_s}.".format(
+ m=len(master_instances),
+ plural_m=('' if len(master_instances) == 1 else 's'),
+ s=len(slave_instances),
+ plural_s=('' if len(slave_instances) == 1 else 's')))
if not master_instances and die_on_error:
- print >> sys.stderr, \
- "ERROR: Could not find a master for cluster {c} in region {r}.".format(
- c=cluster_name, r=opts.region)
+ print("ERROR: Could not find a master for cluster {c} in region {r}.".format(
+ c=cluster_name, r=opts.region), file=sys.stderr)
sys.exit(1)
return (master_instances, slave_instances)
@@ -713,7 +717,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = get_dns_name(master_nodes[0], opts.private_ips)
if deploy_ssh_key:
- print "Generating cluster's SSH key on master..."
+ print("Generating cluster's SSH key on master...")
key_setup = """
[ -f ~/.ssh/id_rsa ] ||
(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa &&
@@ -721,10 +725,10 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
"""
ssh(master, opts, key_setup)
dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
- print "Transferring cluster's SSH key to slaves..."
+ print("Transferring cluster's SSH key to slaves...")
for slave in slave_nodes:
slave_address = get_dns_name(slave, opts.private_ips)
- print slave_address
+ print(slave_address)
ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar)
modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs',
@@ -738,8 +742,8 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
# NOTE: We should clone the repository before running deploy_files to
# prevent ec2-variables.sh from being overwritten
- print "Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format(
- r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch)
+ print("Cloning spark-ec2 scripts from {r}/tree/{b} on master...".format(
+ r=opts.spark_ec2_git_repo, b=opts.spark_ec2_git_branch))
ssh(
host=master,
opts=opts,
@@ -749,7 +753,7 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
b=opts.spark_ec2_git_branch)
)
- print "Deploying files to master..."
+ print("Deploying files to master...")
deploy_files(
conn=conn,
root_dir=SPARK_EC2_DIR + "/" + "deploy.generic",
@@ -760,25 +764,25 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
)
if opts.deploy_root_dir is not None:
- print "Deploying {s} to master...".format(s=opts.deploy_root_dir)
+ print("Deploying {s} to master...".format(s=opts.deploy_root_dir))
deploy_user_files(
root_dir=opts.deploy_root_dir,
opts=opts,
master_nodes=master_nodes
)
- print "Running setup on master..."
+ print("Running setup on master...")
setup_spark_cluster(master, opts)
- print "Done!"
+ print("Done!")
def setup_spark_cluster(master, opts):
ssh(master, opts, "chmod u+x spark-ec2/setup.sh")
ssh(master, opts, "spark-ec2/setup.sh")
- print "Spark standalone cluster started at http://%s:8080" % master
+ print("Spark standalone cluster started at http://%s:8080" % master)
if opts.ganglia:
- print "Ganglia started at http://%s:5080/ganglia" % master
+ print("Ganglia started at http://%s:5080/ganglia" % master)
def is_ssh_available(host, opts, print_ssh_output=True):
@@ -795,7 +799,7 @@ def is_ssh_available(host, opts, print_ssh_output=True):
if s.returncode != 0 and print_ssh_output:
# extra leading newline is for spacing in wait_for_cluster_state()
- print textwrap.dedent("""\n
+ print(textwrap.dedent("""\n
Warning: SSH connection error. (This could be temporary.)
Host: {h}
SSH return code: {r}
@@ -804,7 +808,7 @@ def is_ssh_available(host, opts, print_ssh_output=True):
h=host,
r=s.returncode,
o=cmd_output.strip()
- )
+ ))
return s.returncode == 0
@@ -865,10 +869,10 @@ def wait_for_cluster_state(conn, opts, cluster_instances, cluster_state):
sys.stdout.write("\n")
end_time = datetime.now()
- print "Cluster is now in '{s}' state. Waited {t} seconds.".format(
+ print("Cluster is now in '{s}' state. Waited {t} seconds.".format(
s=cluster_state,
t=(end_time - start_time).seconds
- )
+ ))
# Get number of local disks available for a given EC2 instance type.
@@ -916,8 +920,8 @@ def get_num_disks(instance_type):
if instance_type in disks_by_instance:
return disks_by_instance[instance_type]
else:
- print >> stderr, ("WARNING: Don't know number of disks on instance type %s; assuming 1"
- % instance_type)
+ print("WARNING: Don't know number of disks on instance type %s; assuming 1"
+ % instance_type, file=stderr)
return 1
@@ -951,7 +955,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
# Spark-only custom deploy
spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version)
tachyon_v = ""
- print "Deploying Spark via git hash; Tachyon won't be set up"
+ print("Deploying Spark via git hash; Tachyon won't be set up")
modules = filter(lambda x: x != "tachyon", modules)
master_addresses = [get_dns_name(i, opts.private_ips) for i in master_nodes]
@@ -1067,8 +1071,8 @@ def ssh(host, opts, command):
"--key-pair parameters and try again.".format(host))
else:
raise e
- print >> stderr, \
- "Error executing remote command, retrying after 30 seconds: {0}".format(e)
+ print("Error executing remote command, retrying after 30 seconds: {0}".format(e),
+ file=stderr)
time.sleep(30)
tries = tries + 1
@@ -1107,8 +1111,8 @@ def ssh_write(host, opts, command, arguments):
elif tries > 5:
raise RuntimeError("ssh_write failed with error %s" % proc.returncode)
else:
- print >> stderr, \
- "Error {0} while executing remote command, retrying after 30 seconds".format(status)
+ print("Error {0} while executing remote command, retrying after 30 seconds".
+ format(status), file=stderr)
time.sleep(30)
tries = tries + 1
@@ -1162,42 +1166,41 @@ def real_main():
if opts.identity_file is not None:
if not os.path.exists(opts.identity_file):
- print >> stderr,\
- "ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file)
+ print("ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file),
+ file=stderr)
sys.exit(1)
file_mode = os.stat(opts.identity_file).st_mode
if not (file_mode & S_IRUSR) or not oct(file_mode)[-2:] == '00':
- print >> stderr, "ERROR: The identity file must be accessible only by you."
- print >> stderr, 'You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file)
+ print("ERROR: The identity file must be accessible only by you.", file=stderr)
+ print('You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file),
+ file=stderr)
sys.exit(1)
if opts.instance_type not in EC2_INSTANCE_TYPES:
- print >> stderr, "Warning: Unrecognized EC2 instance type for instance-type: {t}".format(
- t=opts.instance_type)
+ print("Warning: Unrecognized EC2 instance type for instance-type: {t}".format(
+ t=opts.instance_type), file=stderr)
if opts.master_instance_type != "":
if opts.master_instance_type not in EC2_INSTANCE_TYPES:
- print >> stderr, \
- "Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format(
- t=opts.master_instance_type)
+ print("Warning: Unrecognized EC2 instance type for master-instance-type: {t}".format(
+ t=opts.master_instance_type), file=stderr)
# Since we try instance types even if we can't resolve them, we check if they resolve first
# and, if they do, see if they resolve to the same virtualization type.
if opts.instance_type in EC2_INSTANCE_TYPES and \
opts.master_instance_type in EC2_INSTANCE_TYPES:
if EC2_INSTANCE_TYPES[opts.instance_type] != \
EC2_INSTANCE_TYPES[opts.master_instance_type]:
- print >> stderr, \
- "Error: spark-ec2 currently does not support having a master and slaves " + \
- "with different AMI virtualization types."
- print >> stderr, "master instance virtualization type: {t}".format(
- t=EC2_INSTANCE_TYPES[opts.master_instance_type])
- print >> stderr, "slave instance virtualization type: {t}".format(
- t=EC2_INSTANCE_TYPES[opts.instance_type])
+ print("Error: spark-ec2 currently does not support having a master and slaves "
+ "with different AMI virtualization types.", file=stderr)
+ print("master instance virtualization type: {t}".format(
+ t=EC2_INSTANCE_TYPES[opts.master_instance_type]), file=stderr)
+ print("slave instance virtualization type: {t}".format(
+ t=EC2_INSTANCE_TYPES[opts.instance_type]), file=stderr)
sys.exit(1)
if opts.ebs_vol_num > 8:
- print >> stderr, "ebs-vol-num cannot be greater than 8"
+ print("ebs-vol-num cannot be greater than 8", file=stderr)
sys.exit(1)
# Prevent breaking ami_prefix (/, .git and startswith checks)
@@ -1206,23 +1209,22 @@ def real_main():
opts.spark_ec2_git_repo.endswith(".git") or \
not opts.spark_ec2_git_repo.startswith("https://github.com") or \
not opts.spark_ec2_git_repo.endswith("spark-ec2"):
- print >> stderr, "spark-ec2-git-repo must be a github repo and it must not have a " \
- "trailing / or .git. " \
- "Furthermore, we currently only support forks named spark-ec2."
+ print("spark-ec2-git-repo must be a github repo and it must not have a trailing / or .git. "
+ "Furthermore, we currently only support forks named spark-ec2.", file=stderr)
sys.exit(1)
if not (opts.deploy_root_dir is None or
(os.path.isabs(opts.deploy_root_dir) and
os.path.isdir(opts.deploy_root_dir) and
os.path.exists(opts.deploy_root_dir))):
- print >> stderr, "--deploy-root-dir must be an absolute path to a directory that exists " \
- "on the local file system"
+ print("--deploy-root-dir must be an absolute path to a directory that exists "
+ "on the local file system", file=stderr)
sys.exit(1)
try:
conn = ec2.connect_to_region(opts.region)
except Exception as e:
- print >> stderr, (e)
+ print((e), file=stderr)
sys.exit(1)
# Select an AZ at random if it was not specified.
@@ -1231,7 +1233,7 @@ def real_main():
if action == "launch":
if opts.slaves <= 0:
- print >> sys.stderr, "ERROR: You have to start at least 1 slave"
+ print("ERROR: You have to start at least 1 slave", file=sys.stderr)
sys.exit(1)
if opts.resume:
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
@@ -1250,18 +1252,18 @@ def real_main():
conn, opts, cluster_name, die_on_error=False)
if any(master_nodes + slave_nodes):
- print "The following instances will be terminated:"
+ print("The following instances will be terminated:")
for inst in master_nodes + slave_nodes:
- print "> %s" % get_dns_name(inst, opts.private_ips)
- print "ALL DATA ON ALL NODES WILL BE LOST!!"
+ print("> %s" % get_dns_name(inst, opts.private_ips))
+ print("ALL DATA ON ALL NODES WILL BE LOST!!")
msg = "Are you sure you want to destroy the cluster {c}? (y/N) ".format(c=cluster_name)
response = raw_input(msg)
if response == "y":
- print "Terminating master..."
+ print("Terminating master...")
for inst in master_nodes:
inst.terminate()
- print "Terminating slaves..."
+ print("Terminating slaves...")
for inst in slave_nodes:
inst.terminate()
@@ -1274,16 +1276,16 @@ def real_main():
cluster_instances=(master_nodes + slave_nodes),
cluster_state='terminated'
)
- print "Deleting security groups (this will take some time)..."
+ print("Deleting security groups (this will take some time)...")
attempt = 1
while attempt <= 3:
- print "Attempt %d" % attempt
+ print("Attempt %d" % attempt)
groups = [g for g in conn.get_all_security_groups() if g.name in group_names]
success = True
# Delete individual rules in all groups before deleting groups to
# remove dependencies between them
for group in groups:
- print "Deleting rules in security group " + group.name
+ print("Deleting rules in security group " + group.name)
for rule in group.rules:
for grant in rule.grants:
success &= group.revoke(ip_protocol=rule.ip_protocol,
@@ -1298,10 +1300,10 @@ def real_main():
try:
# It is needed to use group_id to make it work with VPC
conn.delete_security_group(group_id=group.id)
- print "Deleted security group %s" % group.name
+ print("Deleted security group %s" % group.name)
except boto.exception.EC2ResponseError:
success = False
- print "Failed to delete security group %s" % group.name
+ print("Failed to delete security group %s" % group.name)
# Unfortunately, group.revoke() returns True even if a rule was not
# deleted, so this needs to be rerun if something fails
@@ -1311,17 +1313,16 @@ def real_main():
attempt += 1
if not success:
- print "Failed to delete all security groups after 3 tries."
- print "Try re-running in a few minutes."
+ print("Failed to delete all security groups after 3 tries.")
+ print("Try re-running in a few minutes.")
elif action == "login":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
if not master_nodes[0].public_dns_name and not opts.private_ips:
- print "Master has no public DNS name. Maybe you meant to specify " \
- "--private-ips?"
+ print("Master has no public DNS name. Maybe you meant to specify --private-ips?")
else:
master = get_dns_name(master_nodes[0], opts.private_ips)
- print "Logging into master " + master + "..."
+ print("Logging into master " + master + "...")
proxy_opt = []
if opts.proxy_port is not None:
proxy_opt = ['-D', opts.proxy_port]
@@ -1336,19 +1337,18 @@ def real_main():
if response == "y":
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
- print "Rebooting slaves..."
+ print("Rebooting slaves...")
for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
- print "Rebooting " + inst.id
+ print("Rebooting " + inst.id)
inst.reboot()
elif action == "get-master":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
if not master_nodes[0].public_dns_name and not opts.private_ips:
- print "Master has no public DNS name. Maybe you meant to specify " \
- "--private-ips?"
+ print("Master has no public DNS name. Maybe you meant to specify --private-ips?")
else:
- print get_dns_name(master_nodes[0], opts.private_ips)
+ print(get_dns_name(master_nodes[0], opts.private_ips))
elif action == "stop":
response = raw_input(
@@ -1361,11 +1361,11 @@ def real_main():
if response == "y":
(master_nodes, slave_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
- print "Stopping master..."
+ print("Stopping master...")
for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.stop()
- print "Stopping slaves..."
+ print("Stopping slaves...")
for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
if inst.spot_instance_request_id:
@@ -1375,11 +1375,11 @@ def real_main():
elif action == "start":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
- print "Starting slaves..."
+ print("Starting slaves...")
for inst in slave_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
- print "Starting master..."
+ print("Starting master...")
for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
@@ -1403,15 +1403,15 @@ def real_main():
setup_cluster(conn, master_nodes, slave_nodes, opts, False)
else:
- print >> stderr, "Invalid action: %s" % action
+ print("Invalid action: %s" % action, file=stderr)
sys.exit(1)
def main():
try:
real_main()
- except UsageError, e:
- print >> stderr, "\nError:\n", e
+ except UsageError as e:
+ print("\nError:\n", e, file=stderr)
sys.exit(1)
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/als.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py
index 70b6146..1c3a787 100755
--- a/examples/src/main/python/als.py
+++ b/examples/src/main/python/als.py
@@ -21,7 +21,8 @@ ALS in pyspark.mllib.recommendation for more conventional use.
This example requires numpy (http://www.numpy.org/)
"""
-from os.path import realpath
+from __future__ import print_function
+
import sys
import numpy as np
@@ -57,9 +58,9 @@ if __name__ == "__main__":
Usage: als [M] [U] [F] [iterations] [partitions]"
"""
- print >> sys.stderr, """WARN: This is a naive implementation of ALS and is given as an
+ print("""WARN: This is a naive implementation of ALS and is given as an
example. Please use the ALS method found in pyspark.mllib.recommendation for more
- conventional use."""
+ conventional use.""", file=sys.stderr)
sc = SparkContext(appName="PythonALS")
M = int(sys.argv[1]) if len(sys.argv) > 1 else 100
@@ -68,8 +69,8 @@ if __name__ == "__main__":
ITERATIONS = int(sys.argv[4]) if len(sys.argv) > 4 else 5
partitions = int(sys.argv[5]) if len(sys.argv) > 5 else 2
- print "Running ALS with M=%d, U=%d, F=%d, iters=%d, partitions=%d\n" % \
- (M, U, F, ITERATIONS, partitions)
+ print("Running ALS with M=%d, U=%d, F=%d, iters=%d, partitions=%d\n" %
+ (M, U, F, ITERATIONS, partitions))
R = matrix(rand(M, F)) * matrix(rand(U, F).T)
ms = matrix(rand(M, F))
@@ -95,7 +96,7 @@ if __name__ == "__main__":
usb = sc.broadcast(us)
error = rmse(R, ms, us)
- print "Iteration %d:" % i
- print "\nRMSE: %5.4f\n" % error
+ print("Iteration %d:" % i)
+ print("\nRMSE: %5.4f\n" % error)
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/avro_inputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/avro_inputformat.py b/examples/src/main/python/avro_inputformat.py
index 4626bbb..da368ac 100644
--- a/examples/src/main/python/avro_inputformat.py
+++ b/examples/src/main/python/avro_inputformat.py
@@ -15,9 +15,12 @@
# limitations under the License.
#
+from __future__ import print_function
+
import sys
from pyspark import SparkContext
+from functools import reduce
"""
Read data file users.avro in local Spark distro:
@@ -49,7 +52,7 @@ $ ./bin/spark-submit --driver-class-path /path/to/example/jar \
"""
if __name__ == "__main__":
if len(sys.argv) != 2 and len(sys.argv) != 3:
- print >> sys.stderr, """
+ print("""
Usage: avro_inputformat <data_file> [reader_schema_file]
Run with example jar:
@@ -57,7 +60,7 @@ if __name__ == "__main__":
/path/to/examples/avro_inputformat.py <data_file> [reader_schema_file]
Assumes you have Avro data stored in <data_file>. Reader schema can be optionally specified
in [reader_schema_file].
- """
+ """, file=sys.stderr)
exit(-1)
path = sys.argv[1]
@@ -77,6 +80,6 @@ if __name__ == "__main__":
conf=conf)
output = avro_rdd.map(lambda x: x[0]).collect()
for k in output:
- print k
+ print(k)
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/cassandra_inputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py
index 05f34b7..93ca0cf 100644
--- a/examples/src/main/python/cassandra_inputformat.py
+++ b/examples/src/main/python/cassandra_inputformat.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import sys
from pyspark import SparkContext
@@ -47,14 +49,14 @@ cqlsh:test> SELECT * FROM users;
"""
if __name__ == "__main__":
if len(sys.argv) != 4:
- print >> sys.stderr, """
+ print("""
Usage: cassandra_inputformat <host> <keyspace> <cf>
Run with example jar:
./bin/spark-submit --driver-class-path /path/to/example/jar \
/path/to/examples/cassandra_inputformat.py <host> <keyspace> <cf>
Assumes you have some data in Cassandra already, running on <host>, in <keyspace> and <cf>
- """
+ """, file=sys.stderr)
exit(-1)
host = sys.argv[1]
@@ -77,6 +79,6 @@ if __name__ == "__main__":
conf=conf)
output = cass_rdd.collect()
for (k, v) in output:
- print (k, v)
+ print((k, v))
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/cassandra_outputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py
index d144539..5d643ea 100644
--- a/examples/src/main/python/cassandra_outputformat.py
+++ b/examples/src/main/python/cassandra_outputformat.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import sys
from pyspark import SparkContext
@@ -46,7 +48,7 @@ cqlsh:test> SELECT * FROM users;
"""
if __name__ == "__main__":
if len(sys.argv) != 7:
- print >> sys.stderr, """
+ print("""
Usage: cassandra_outputformat <host> <keyspace> <cf> <user_id> <fname> <lname>
Run with example jar:
@@ -60,7 +62,7 @@ if __name__ == "__main__":
... fname text,
... lname text
... );
- """
+ """, file=sys.stderr)
exit(-1)
host = sys.argv[1]
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/hbase_inputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py
index 3b16010..e17819d 100644
--- a/examples/src/main/python/hbase_inputformat.py
+++ b/examples/src/main/python/hbase_inputformat.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import sys
from pyspark import SparkContext
@@ -47,14 +49,14 @@ ROW COLUMN+CELL
"""
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, """
+ print("""
Usage: hbase_inputformat <host> <table>
Run with example jar:
./bin/spark-submit --driver-class-path /path/to/example/jar \
/path/to/examples/hbase_inputformat.py <host> <table>
Assumes you have some data in HBase already, running on <host>, in <table>
- """
+ """, file=sys.stderr)
exit(-1)
host = sys.argv[1]
@@ -74,6 +76,6 @@ if __name__ == "__main__":
conf=conf)
output = hbase_rdd.collect()
for (k, v) in output:
- print (k, v)
+ print((k, v))
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/hbase_outputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py
index abb425b..9e56417 100644
--- a/examples/src/main/python/hbase_outputformat.py
+++ b/examples/src/main/python/hbase_outputformat.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import sys
from pyspark import SparkContext
@@ -40,7 +42,7 @@ ROW COLUMN+CELL
"""
if __name__ == "__main__":
if len(sys.argv) != 7:
- print >> sys.stderr, """
+ print("""
Usage: hbase_outputformat <host> <table> <row> <family> <qualifier> <value>
Run with example jar:
@@ -48,7 +50,7 @@ if __name__ == "__main__":
/path/to/examples/hbase_outputformat.py <args>
Assumes you have created <table> with column family <family> in HBase
running on <host> already
- """
+ """, file=sys.stderr)
exit(-1)
host = sys.argv[1]
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/kmeans.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/kmeans.py b/examples/src/main/python/kmeans.py
index 86ef6f3..1939150 100755
--- a/examples/src/main/python/kmeans.py
+++ b/examples/src/main/python/kmeans.py
@@ -22,6 +22,7 @@ examples/src/main/python/mllib/kmeans.py.
This example requires NumPy (http://www.numpy.org/).
"""
+from __future__ import print_function
import sys
@@ -47,12 +48,12 @@ def closestPoint(p, centers):
if __name__ == "__main__":
if len(sys.argv) != 4:
- print >> sys.stderr, "Usage: kmeans <file> <k> <convergeDist>"
+ print("Usage: kmeans <file> <k> <convergeDist>", file=sys.stderr)
exit(-1)
- print >> sys.stderr, """WARN: This is a naive implementation of KMeans Clustering and is given
+ print("""WARN: This is a naive implementation of KMeans Clustering and is given
as an example! Please refer to examples/src/main/python/mllib/kmeans.py for an example on
- how to use MLlib's KMeans implementation."""
+ how to use MLlib's KMeans implementation.""", file=sys.stderr)
sc = SparkContext(appName="PythonKMeans")
lines = sc.textFile(sys.argv[1])
@@ -69,13 +70,13 @@ if __name__ == "__main__":
pointStats = closest.reduceByKey(
lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2))
newPoints = pointStats.map(
- lambda (x, (y, z)): (x, y / z)).collect()
+ lambda xy: (xy[0], xy[1][0] / xy[1][1])).collect()
tempDist = sum(np.sum((kPoints[x] - y) ** 2) for (x, y) in newPoints)
for (x, y) in newPoints:
kPoints[x] = y
- print "Final centers: " + str(kPoints)
+ print("Final centers: " + str(kPoints))
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/logistic_regression.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/logistic_regression.py b/examples/src/main/python/logistic_regression.py
index 3aa56b0..b318b7d 100755
--- a/examples/src/main/python/logistic_regression.py
+++ b/examples/src/main/python/logistic_regression.py
@@ -22,10 +22,8 @@ to act on batches of input data using efficient matrix operations.
In practice, one may prefer to use the LogisticRegression algorithm in
MLlib, as shown in examples/src/main/python/mllib/logistic_regression.py.
"""
+from __future__ import print_function
-from collections import namedtuple
-from math import exp
-from os.path import realpath
import sys
import numpy as np
@@ -42,19 +40,19 @@ D = 10 # Number of dimensions
def readPointBatch(iterator):
strs = list(iterator)
matrix = np.zeros((len(strs), D + 1))
- for i in xrange(len(strs)):
- matrix[i] = np.fromstring(strs[i].replace(',', ' '), dtype=np.float32, sep=' ')
+ for i, s in enumerate(strs):
+ matrix[i] = np.fromstring(s.replace(',', ' '), dtype=np.float32, sep=' ')
return [matrix]
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: logistic_regression <file> <iterations>"
+ print("Usage: logistic_regression <file> <iterations>", file=sys.stderr)
exit(-1)
- print >> sys.stderr, """WARN: This is a naive implementation of Logistic Regression and is
+ print("""WARN: This is a naive implementation of Logistic Regression and is
given as an example! Please refer to examples/src/main/python/mllib/logistic_regression.py
- to see how MLlib's implementation is used."""
+ to see how MLlib's implementation is used.""", file=sys.stderr)
sc = SparkContext(appName="PythonLR")
points = sc.textFile(sys.argv[1]).mapPartitions(readPointBatch).cache()
@@ -62,7 +60,7 @@ if __name__ == "__main__":
# Initialize w to a random value
w = 2 * np.random.ranf(size=D) - 1
- print "Initial w: " + str(w)
+ print("Initial w: " + str(w))
# Compute logistic regression gradient for a matrix of data points
def gradient(matrix, w):
@@ -76,9 +74,9 @@ if __name__ == "__main__":
return x
for i in range(iterations):
- print "On iteration %i" % (i + 1)
+ print("On iteration %i" % (i + 1))
w -= points.map(lambda m: gradient(m, w)).reduce(add)
- print "Final w: " + str(w)
+ print("Final w: " + str(w))
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/ml/simple_text_classification_pipeline.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/ml/simple_text_classification_pipeline.py b/examples/src/main/python/ml/simple_text_classification_pipeline.py
index c73edb7..fab21f0 100644
--- a/examples/src/main/python/ml/simple_text_classification_pipeline.py
+++ b/examples/src/main/python/ml/simple_text_classification_pipeline.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
@@ -37,10 +39,10 @@ if __name__ == "__main__":
# Prepare training documents, which are labeled.
LabeledDocument = Row("id", "text", "label")
- training = sc.parallelize([(0L, "a b c d e spark", 1.0),
- (1L, "b d", 0.0),
- (2L, "spark f g h", 1.0),
- (3L, "hadoop mapreduce", 0.0)]) \
+ training = sc.parallelize([(0, "a b c d e spark", 1.0),
+ (1, "b d", 0.0),
+ (2, "spark f g h", 1.0),
+ (3, "hadoop mapreduce", 0.0)]) \
.map(lambda x: LabeledDocument(*x)).toDF()
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
@@ -54,16 +56,16 @@ if __name__ == "__main__":
# Prepare test documents, which are unlabeled.
Document = Row("id", "text")
- test = sc.parallelize([(4L, "spark i j k"),
- (5L, "l m n"),
- (6L, "mapreduce spark"),
- (7L, "apache hadoop")]) \
+ test = sc.parallelize([(4, "spark i j k"),
+ (5, "l m n"),
+ (6, "mapreduce spark"),
+ (7, "apache hadoop")]) \
.map(lambda x: Document(*x)).toDF()
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
- print row
+ print(row)
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/correlations.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/correlations.py b/examples/src/main/python/mllib/correlations.py
index 4218eca..0e13546 100755
--- a/examples/src/main/python/mllib/correlations.py
+++ b/examples/src/main/python/mllib/correlations.py
@@ -18,6 +18,7 @@
"""
Correlations using MLlib.
"""
+from __future__ import print_function
import sys
@@ -29,7 +30,7 @@ from pyspark.mllib.util import MLUtils
if __name__ == "__main__":
if len(sys.argv) not in [1, 2]:
- print >> sys.stderr, "Usage: correlations (<file>)"
+ print("Usage: correlations (<file>)", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonCorrelations")
if len(sys.argv) == 2:
@@ -41,20 +42,20 @@ if __name__ == "__main__":
points = MLUtils.loadLibSVMFile(sc, filepath)\
.map(lambda lp: LabeledPoint(lp.label, lp.features.toArray()))
- print
- print 'Summary of data file: ' + filepath
- print '%d data points' % points.count()
+ print()
+ print('Summary of data file: ' + filepath)
+ print('%d data points' % points.count())
# Statistics (correlations)
- print
- print 'Correlation (%s) between label and each feature' % corrType
- print 'Feature\tCorrelation'
+ print()
+ print('Correlation (%s) between label and each feature' % corrType)
+ print('Feature\tCorrelation')
numFeatures = points.take(1)[0].features.size
labelRDD = points.map(lambda lp: lp.label)
for i in range(numFeatures):
featureRDD = points.map(lambda lp: lp.features[i])
corr = Statistics.corr(labelRDD, featureRDD, corrType)
- print '%d\t%g' % (i, corr)
- print
+ print('%d\t%g' % (i, corr))
+ print()
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/dataset_example.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/dataset_example.py b/examples/src/main/python/mllib/dataset_example.py
index fcbf56c..e23ecc0 100644
--- a/examples/src/main/python/mllib/dataset_example.py
+++ b/examples/src/main/python/mllib/dataset_example.py
@@ -19,6 +19,7 @@
An example of how to use DataFrame as a dataset for ML. Run with::
bin/spark-submit examples/src/main/python/mllib/dataset_example.py
"""
+from __future__ import print_function
import os
import sys
@@ -32,16 +33,16 @@ from pyspark.mllib.stat import Statistics
def summarize(dataset):
- print "schema: %s" % dataset.schema().json()
+ print("schema: %s" % dataset.schema().json())
labels = dataset.map(lambda r: r.label)
- print "label average: %f" % labels.mean()
+ print("label average: %f" % labels.mean())
features = dataset.map(lambda r: r.features)
summary = Statistics.colStats(features)
- print "features average: %r" % summary.mean()
+ print("features average: %r" % summary.mean())
if __name__ == "__main__":
if len(sys.argv) > 2:
- print >> sys.stderr, "Usage: dataset_example.py <libsvm file>"
+ print("Usage: dataset_example.py <libsvm file>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="DatasetExample")
sqlContext = SQLContext(sc)
@@ -54,9 +55,9 @@ if __name__ == "__main__":
summarize(dataset0)
tempdir = tempfile.NamedTemporaryFile(delete=False).name
os.unlink(tempdir)
- print "Save dataset as a Parquet file to %s." % tempdir
+ print("Save dataset as a Parquet file to %s." % tempdir)
dataset0.saveAsParquetFile(tempdir)
- print "Load it back and summarize it again."
+ print("Load it back and summarize it again.")
dataset1 = sqlContext.parquetFile(tempdir).setName("dataset1").cache()
summarize(dataset1)
shutil.rmtree(tempdir)
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/decision_tree_runner.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/decision_tree_runner.py b/examples/src/main/python/mllib/decision_tree_runner.py
index fccabd8..513ed8f 100755
--- a/examples/src/main/python/mllib/decision_tree_runner.py
+++ b/examples/src/main/python/mllib/decision_tree_runner.py
@@ -20,6 +20,7 @@ Decision tree classification and regression using MLlib.
This example requires NumPy (http://www.numpy.org/).
"""
+from __future__ import print_function
import numpy
import os
@@ -83,18 +84,17 @@ def reindexClassLabels(data):
numClasses = len(classCounts)
# origToNewLabels: class --> index in 0,...,numClasses-1
if (numClasses < 2):
- print >> sys.stderr, \
- "Dataset for classification should have at least 2 classes." + \
- " The given dataset had only %d classes." % numClasses
+ print("Dataset for classification should have at least 2 classes."
+ " The given dataset had only %d classes." % numClasses, file=sys.stderr)
exit(1)
origToNewLabels = dict([(sortedClasses[i], i) for i in range(0, numClasses)])
- print "numClasses = %d" % numClasses
- print "Per-class example fractions, counts:"
- print "Class\tFrac\tCount"
+ print("numClasses = %d" % numClasses)
+ print("Per-class example fractions, counts:")
+ print("Class\tFrac\tCount")
for c in sortedClasses:
frac = classCounts[c] / (numExamples + 0.0)
- print "%g\t%g\t%d" % (c, frac, classCounts[c])
+ print("%g\t%g\t%d" % (c, frac, classCounts[c]))
if (sortedClasses[0] == 0 and sortedClasses[-1] == numClasses - 1):
return (data, origToNewLabels)
@@ -105,8 +105,7 @@ def reindexClassLabels(data):
def usage():
- print >> sys.stderr, \
- "Usage: decision_tree_runner [libsvm format data filepath]"
+ print("Usage: decision_tree_runner [libsvm format data filepath]", file=sys.stderr)
exit(1)
@@ -133,13 +132,13 @@ if __name__ == "__main__":
model = DecisionTree.trainClassifier(reindexedData, numClasses=numClasses,
categoricalFeaturesInfo=categoricalFeaturesInfo)
# Print learned tree and stats.
- print "Trained DecisionTree for classification:"
- print " Model numNodes: %d" % model.numNodes()
- print " Model depth: %d" % model.depth()
- print " Training accuracy: %g" % getAccuracy(model, reindexedData)
+ print("Trained DecisionTree for classification:")
+ print(" Model numNodes: %d" % model.numNodes())
+ print(" Model depth: %d" % model.depth())
+ print(" Training accuracy: %g" % getAccuracy(model, reindexedData))
if model.numNodes() < 20:
- print model.toDebugString()
+ print(model.toDebugString())
else:
- print model
+ print(model)
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/gaussian_mixture_model.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/gaussian_mixture_model.py b/examples/src/main/python/mllib/gaussian_mixture_model.py
index a2cd626..2cb8010 100644
--- a/examples/src/main/python/mllib/gaussian_mixture_model.py
+++ b/examples/src/main/python/mllib/gaussian_mixture_model.py
@@ -18,7 +18,8 @@
"""
A Gaussian Mixture Model clustering program using MLlib.
"""
-import sys
+from __future__ import print_function
+
import random
import argparse
import numpy as np
@@ -59,7 +60,7 @@ if __name__ == "__main__":
model = GaussianMixture.train(data, args.k, args.convergenceTol,
args.maxIterations, args.seed)
for i in range(args.k):
- print ("weight = ", model.weights[i], "mu = ", model.gaussians[i].mu,
- "sigma = ", model.gaussians[i].sigma.toArray())
- print ("Cluster labels (first 100): ", model.predict(data).take(100))
+ print(("weight = ", model.weights[i], "mu = ", model.gaussians[i].mu,
+ "sigma = ", model.gaussians[i].sigma.toArray()))
+ print(("Cluster labels (first 100): ", model.predict(data).take(100)))
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/gradient_boosted_trees.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/gradient_boosted_trees.py b/examples/src/main/python/mllib/gradient_boosted_trees.py
index e647773..781bd61 100644
--- a/examples/src/main/python/mllib/gradient_boosted_trees.py
+++ b/examples/src/main/python/mllib/gradient_boosted_trees.py
@@ -18,6 +18,7 @@
"""
Gradient boosted Trees classification and regression using MLlib.
"""
+from __future__ import print_function
import sys
@@ -34,7 +35,7 @@ def testClassification(trainingData, testData):
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
- testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() \
+ testErr = labelsAndPredictions.filter(lambda v_p: v_p[0] != v_p[1]).count() \
/ float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification ensemble model:')
@@ -49,7 +50,7 @@ def testRegression(trainingData, testData):
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
- testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() \
+ testMSE = labelsAndPredictions.map(lambda vp: (vp[0] - vp[1]) * (vp[0] - vp[1])).sum() \
/ float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression ensemble model:')
@@ -58,7 +59,7 @@ def testRegression(trainingData, testData):
if __name__ == "__main__":
if len(sys.argv) > 1:
- print >> sys.stderr, "Usage: gradient_boosted_trees"
+ print("Usage: gradient_boosted_trees", file=sys.stderr)
exit(1)
sc = SparkContext(appName="PythonGradientBoostedTrees")
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/kmeans.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/kmeans.py b/examples/src/main/python/mllib/kmeans.py
index 2eeb1ab..f901a87 100755
--- a/examples/src/main/python/mllib/kmeans.py
+++ b/examples/src/main/python/mllib/kmeans.py
@@ -20,6 +20,7 @@ A K-means clustering program using MLlib.
This example requires NumPy (http://www.numpy.org/).
"""
+from __future__ import print_function
import sys
@@ -34,12 +35,12 @@ def parseVector(line):
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: kmeans <file> <k>"
+ print("Usage: kmeans <file> <k>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="KMeans")
lines = sc.textFile(sys.argv[1])
data = lines.map(parseVector)
k = int(sys.argv[2])
model = KMeans.train(data, k)
- print "Final centers: " + str(model.clusterCenters)
+ print("Final centers: " + str(model.clusterCenters))
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/logistic_regression.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/logistic_regression.py b/examples/src/main/python/mllib/logistic_regression.py
index 8cae27f..d4f1d34 100755
--- a/examples/src/main/python/mllib/logistic_regression.py
+++ b/examples/src/main/python/mllib/logistic_regression.py
@@ -20,11 +20,10 @@ Logistic regression using MLlib.
This example requires NumPy (http://www.numpy.org/).
"""
+from __future__ import print_function
-from math import exp
import sys
-import numpy as np
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
@@ -42,12 +41,12 @@ def parsePoint(line):
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: logistic_regression <file> <iterations>"
+ print("Usage: logistic_regression <file> <iterations>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonLR")
points = sc.textFile(sys.argv[1]).map(parsePoint)
iterations = int(sys.argv[2])
model = LogisticRegressionWithSGD.train(points, iterations)
- print "Final weights: " + str(model.weights)
- print "Final intercept: " + str(model.intercept)
+ print("Final weights: " + str(model.weights))
+ print("Final intercept: " + str(model.intercept))
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/random_forest_example.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/random_forest_example.py b/examples/src/main/python/mllib/random_forest_example.py
index d3c24f7..4cfdad8 100755
--- a/examples/src/main/python/mllib/random_forest_example.py
+++ b/examples/src/main/python/mllib/random_forest_example.py
@@ -22,6 +22,7 @@ Note: This example illustrates binary classification.
For information on multiclass classification, please refer to the decision_tree_runner.py
example.
"""
+from __future__ import print_function
import sys
@@ -43,7 +44,7 @@ def testClassification(trainingData, testData):
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
- testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count()\
+ testErr = labelsAndPredictions.filter(lambda v_p: v_p[0] != v_p[1]).count()\
/ float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
@@ -62,8 +63,8 @@ def testRegression(trainingData, testData):
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
- testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum()\
- / float(testData.count())
+ testMSE = labelsAndPredictions.map(lambda v_p1: (v_p1[0] - v_p1[1]) * (v_p1[0] - v_p1[1]))\
+ .sum() / float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression forest model:')
print(model.toDebugString())
@@ -71,7 +72,7 @@ def testRegression(trainingData, testData):
if __name__ == "__main__":
if len(sys.argv) > 1:
- print >> sys.stderr, "Usage: random_forest_example"
+ print("Usage: random_forest_example", file=sys.stderr)
exit(1)
sc = SparkContext(appName="PythonRandomForestExample")
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/random_rdd_generation.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/random_rdd_generation.py b/examples/src/main/python/mllib/random_rdd_generation.py
index 1e88927..729bae3 100755
--- a/examples/src/main/python/mllib/random_rdd_generation.py
+++ b/examples/src/main/python/mllib/random_rdd_generation.py
@@ -18,6 +18,7 @@
"""
Randomly generated RDDs.
"""
+from __future__ import print_function
import sys
@@ -27,7 +28,7 @@ from pyspark.mllib.random import RandomRDDs
if __name__ == "__main__":
if len(sys.argv) not in [1, 2]:
- print >> sys.stderr, "Usage: random_rdd_generation"
+ print("Usage: random_rdd_generation", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonRandomRDDGeneration")
@@ -37,19 +38,19 @@ if __name__ == "__main__":
# Example: RandomRDDs.normalRDD
normalRDD = RandomRDDs.normalRDD(sc, numExamples)
- print 'Generated RDD of %d examples sampled from the standard normal distribution'\
- % normalRDD.count()
- print ' First 5 samples:'
+ print('Generated RDD of %d examples sampled from the standard normal distribution'
+ % normalRDD.count())
+ print(' First 5 samples:')
for sample in normalRDD.take(5):
- print ' ' + str(sample)
- print
+ print(' ' + str(sample))
+ print()
# Example: RandomRDDs.normalVectorRDD
normalVectorRDD = RandomRDDs.normalVectorRDD(sc, numRows=numExamples, numCols=2)
- print 'Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count()
- print ' First 5 samples:'
+ print('Generated RDD of %d examples of length-2 vectors.' % normalVectorRDD.count())
+ print(' First 5 samples:')
for sample in normalVectorRDD.take(5):
- print ' ' + str(sample)
- print
+ print(' ' + str(sample))
+ print()
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/sampled_rdds.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/sampled_rdds.py b/examples/src/main/python/mllib/sampled_rdds.py
index 92af3af..b7033ab 100755
--- a/examples/src/main/python/mllib/sampled_rdds.py
+++ b/examples/src/main/python/mllib/sampled_rdds.py
@@ -18,6 +18,7 @@
"""
Randomly sampled RDDs.
"""
+from __future__ import print_function
import sys
@@ -27,7 +28,7 @@ from pyspark.mllib.util import MLUtils
if __name__ == "__main__":
if len(sys.argv) not in [1, 2]:
- print >> sys.stderr, "Usage: sampled_rdds <libsvm data file>"
+ print("Usage: sampled_rdds <libsvm data file>", file=sys.stderr)
exit(-1)
if len(sys.argv) == 2:
datapath = sys.argv[1]
@@ -41,24 +42,24 @@ if __name__ == "__main__":
examples = MLUtils.loadLibSVMFile(sc, datapath)
numExamples = examples.count()
if numExamples == 0:
- print >> sys.stderr, "Error: Data file had no samples to load."
+ print("Error: Data file had no samples to load.", file=sys.stderr)
exit(1)
- print 'Loaded data with %d examples from file: %s' % (numExamples, datapath)
+ print('Loaded data with %d examples from file: %s' % (numExamples, datapath))
# Example: RDD.sample() and RDD.takeSample()
expectedSampleSize = int(numExamples * fraction)
- print 'Sampling RDD using fraction %g. Expected sample size = %d.' \
- % (fraction, expectedSampleSize)
+ print('Sampling RDD using fraction %g. Expected sample size = %d.'
+ % (fraction, expectedSampleSize))
sampledRDD = examples.sample(withReplacement=True, fraction=fraction)
- print ' RDD.sample(): sample has %d examples' % sampledRDD.count()
+ print(' RDD.sample(): sample has %d examples' % sampledRDD.count())
sampledArray = examples.takeSample(withReplacement=True, num=expectedSampleSize)
- print ' RDD.takeSample(): sample has %d examples' % len(sampledArray)
+ print(' RDD.takeSample(): sample has %d examples' % len(sampledArray))
- print
+ print()
# Example: RDD.sampleByKey()
keyedRDD = examples.map(lambda lp: (int(lp.label), lp.features))
- print ' Keyed data using label (Int) as key ==> Orig'
+ print(' Keyed data using label (Int) as key ==> Orig')
# Count examples per label in original data.
keyCountsA = keyedRDD.countByKey()
@@ -69,18 +70,18 @@ if __name__ == "__main__":
sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement=True, fractions=fractions)
keyCountsB = sampledByKeyRDD.countByKey()
sizeB = sum(keyCountsB.values())
- print ' Sampled %d examples using approximate stratified sampling (by label). ==> Sample' \
- % sizeB
+ print(' Sampled %d examples using approximate stratified sampling (by label). ==> Sample'
+ % sizeB)
# Compare samples
- print ' \tFractions of examples with key'
- print 'Key\tOrig\tSample'
+ print(' \tFractions of examples with key')
+ print('Key\tOrig\tSample')
for k in sorted(keyCountsA.keys()):
fracA = keyCountsA[k] / float(numExamples)
if sizeB != 0:
fracB = keyCountsB.get(k, 0) / float(sizeB)
else:
fracB = 0
- print '%d\t%g\t%g' % (k, fracA, fracB)
+ print('%d\t%g\t%g' % (k, fracA, fracB))
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/mllib/word2vec.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/mllib/word2vec.py b/examples/src/main/python/mllib/word2vec.py
index 99fef42..40d1b88 100644
--- a/examples/src/main/python/mllib/word2vec.py
+++ b/examples/src/main/python/mllib/word2vec.py
@@ -23,6 +23,7 @@
# grep -o -E '\w+(\W+\w+){0,15}' text8 > text8_lines
# This was done so that the example can be run in local mode
+from __future__ import print_function
import sys
@@ -34,7 +35,7 @@ USAGE = ("bin/spark-submit --driver-memory 4g "
if __name__ == "__main__":
if len(sys.argv) < 2:
- print USAGE
+ print(USAGE)
sys.exit("Argument for file not provided")
file_path = sys.argv[1]
sc = SparkContext(appName='Word2Vec')
@@ -46,5 +47,5 @@ if __name__ == "__main__":
synonyms = model.findSynonyms('china', 40)
for word, cosine_distance in synonyms:
- print "{}: {}".format(word, cosine_distance)
+ print("{}: {}".format(word, cosine_distance))
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/pagerank.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/pagerank.py b/examples/src/main/python/pagerank.py
index a5f25d7..2fdc977 100755
--- a/examples/src/main/python/pagerank.py
+++ b/examples/src/main/python/pagerank.py
@@ -19,6 +19,7 @@
This is an example implementation of PageRank. For more conventional use,
Please refer to PageRank implementation provided by graphx
"""
+from __future__ import print_function
import re
import sys
@@ -42,11 +43,12 @@ def parseNeighbors(urls):
if __name__ == "__main__":
if len(sys.argv) != 3:
- print >> sys.stderr, "Usage: pagerank <file> <iterations>"
+ print("Usage: pagerank <file> <iterations>", file=sys.stderr)
exit(-1)
- print >> sys.stderr, """WARN: This is a naive implementation of PageRank and is
- given as an example! Please refer to PageRank implementation provided by graphx"""
+ print("""WARN: This is a naive implementation of PageRank and is
+ given as an example! Please refer to PageRank implementation provided by graphx""",
+ file=sys.stderr)
# Initialize the spark context.
sc = SparkContext(appName="PythonPageRank")
@@ -62,19 +64,19 @@ if __name__ == "__main__":
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
- ranks = links.map(lambda (url, neighbors): (url, 1.0))
+ ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))
# Calculates and updates URL ranks continuously using PageRank algorithm.
- for iteration in xrange(int(sys.argv[2])):
+ for iteration in range(int(sys.argv[2])):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks).flatMap(
- lambda (url, (urls, rank)): computeContribs(urls, rank))
+ lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
# Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
- print "%s has rank: %s." % (link, rank)
+ print("%s has rank: %s." % (link, rank))
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/parquet_inputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/parquet_inputformat.py b/examples/src/main/python/parquet_inputformat.py
index fa4c20a..96ddac7 100644
--- a/examples/src/main/python/parquet_inputformat.py
+++ b/examples/src/main/python/parquet_inputformat.py
@@ -1,3 +1,4 @@
+from __future__ import print_function
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -35,14 +36,14 @@ $ ./bin/spark-submit --driver-class-path /path/to/example/jar \\
"""
if __name__ == "__main__":
if len(sys.argv) != 2:
- print >> sys.stderr, """
+ print("""
Usage: parquet_inputformat.py <data_file>
Run with example jar:
./bin/spark-submit --driver-class-path /path/to/example/jar \\
/path/to/examples/parquet_inputformat.py <data_file>
Assumes you have Parquet data stored in <data_file>.
- """
+ """, file=sys.stderr)
exit(-1)
path = sys.argv[1]
@@ -56,6 +57,6 @@ if __name__ == "__main__":
valueConverter='org.apache.spark.examples.pythonconverters.IndexedRecordToJavaConverter')
output = parquet_rdd.map(lambda x: x[1]).collect()
for k in output:
- print k
+ print(k)
sc.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/examples/src/main/python/pi.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py
index a7c74e9..92e5cf4 100755
--- a/examples/src/main/python/pi.py
+++ b/examples/src/main/python/pi.py
@@ -1,3 +1,4 @@
+from __future__ import print_function
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -35,7 +36,7 @@ if __name__ == "__main__":
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
- count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
- print "Pi is roughly %f" % (4.0 * count / n)
+ count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
+ print("Pi is roughly %f" % (4.0 * count / n))
sc.stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org