You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/05/07 05:00:25 UTC
spark git commit: [SPARK-24126][PYSPARK] Use build-specific temp
directory for pyspark tests.
Repository: spark
Updated Branches:
refs/heads/master f38ea00e8 -> a634d66ce
[SPARK-24126][PYSPARK] Use build-specific temp directory for pyspark tests.
This avoids polluting and leaving garbage behind in /tmp, and allows the
usual build tools to clean up any leftover files.
Author: Marcelo Vanzin <va...@cloudera.com>
Closes #21198 from vanzin/SPARK-24126.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a634d66c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a634d66c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a634d66c
Branch: refs/heads/master
Commit: a634d66ce767bd5e1d8553d1a2c32e2b1a80f642
Parents: f38ea00
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Mon May 7 13:00:18 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Mon May 7 13:00:18 2018 +0800
----------------------------------------------------------------------
python/pyspark/sql/tests.py | 4 ++--
python/pyspark/streaming/tests.py | 6 ++++--
python/pyspark/tests.py | 33 +++++++++++++++++++++------------
python/run-tests.py | 29 +++++++++++++++++++++++++++--
4 files changed, 54 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a634d66c/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index cc6acfd..16aa937 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3092,8 +3092,8 @@ class HiveSparkSubmitTests(SparkSubmitTests):
|print(hive_context.sql("show databases").collect())
""")
proc = subprocess.Popen(
- [self.sparkSubmit, "--master", "local-cluster[1,1,1024]",
- "--driver-class-path", hive_site_dir, script],
+ self.sparkSubmit + ["--master", "local-cluster[1,1,1024]",
+ "--driver-class-path", hive_site_dir, script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
http://git-wip-us.apache.org/repos/asf/spark/blob/a634d66c/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index d77f1ba..e4a428a 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -63,7 +63,7 @@ class PySparkStreamingTestCase(unittest.TestCase):
class_name = cls.__name__
conf = SparkConf().set("spark.default.parallelism", 1)
cls.sc = SparkContext(appName=class_name, conf=conf)
- cls.sc.setCheckpointDir("/tmp")
+ cls.sc.setCheckpointDir(tempfile.mkdtemp())
@classmethod
def tearDownClass(cls):
@@ -1549,7 +1549,9 @@ if __name__ == "__main__":
kinesis_jar_present = True
jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar)
- os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
+ existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
+ jars_args = "--jars %s" % jars
+ os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([jars_args, existing_args])
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
StreamingListenerTests]
http://git-wip-us.apache.org/repos/asf/spark/blob/a634d66c/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 8392d7f..7b8ce2c 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1951,7 +1951,12 @@ class SparkSubmitTests(unittest.TestCase):
def setUp(self):
self.programDir = tempfile.mkdtemp()
- self.sparkSubmit = os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit")
+ tmp_dir = tempfile.gettempdir()
+ self.sparkSubmit = [
+ os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit"),
+ "--conf", "spark.driver.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
+ "--conf", "spark.executor.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
+ ]
def tearDown(self):
shutil.rmtree(self.programDir)
@@ -2017,7 +2022,7 @@ class SparkSubmitTests(unittest.TestCase):
|sc = SparkContext()
|print(sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect())
""")
- proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
+ proc = subprocess.Popen(self.sparkSubmit + [script], stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 4, 6]", out.decode('utf-8'))
@@ -2033,7 +2038,7 @@ class SparkSubmitTests(unittest.TestCase):
|sc = SparkContext()
|print(sc.parallelize([1, 2, 3]).map(foo).collect())
""")
- proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
+ proc = subprocess.Popen(self.sparkSubmit + [script], stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[3, 6, 9]", out.decode('utf-8'))
@@ -2051,7 +2056,7 @@ class SparkSubmitTests(unittest.TestCase):
|def myfunc(x):
| return x + 1
""")
- proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, script],
+ proc = subprocess.Popen(self.sparkSubmit + ["--py-files", zip, script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
@@ -2070,7 +2075,7 @@ class SparkSubmitTests(unittest.TestCase):
|def myfunc(x):
| return x + 1
""")
- proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, "--master",
+ proc = subprocess.Popen(self.sparkSubmit + ["--py-files", zip, "--master",
"local-cluster[1,1,1024]", script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
@@ -2087,8 +2092,10 @@ class SparkSubmitTests(unittest.TestCase):
|print(sc.parallelize([1, 2, 3]).map(myfunc).collect())
""")
self.create_spark_package("a:mylib:0.1")
- proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
- "file:" + self.programDir, script], stdout=subprocess.PIPE)
+ proc = subprocess.Popen(
+ self.sparkSubmit + ["--packages", "a:mylib:0.1", "--repositories",
+ "file:" + self.programDir, script],
+ stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out.decode('utf-8'))
@@ -2103,9 +2110,11 @@ class SparkSubmitTests(unittest.TestCase):
|print(sc.parallelize([1, 2, 3]).map(myfunc).collect())
""")
self.create_spark_package("a:mylib:0.1")
- proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
- "file:" + self.programDir, "--master",
- "local-cluster[1,1,1024]", script], stdout=subprocess.PIPE)
+ proc = subprocess.Popen(
+ self.sparkSubmit + ["--packages", "a:mylib:0.1", "--repositories",
+ "file:" + self.programDir, "--master", "local-cluster[1,1,1024]",
+ script],
+ stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 3, 4]", out.decode('utf-8'))
@@ -2124,7 +2133,7 @@ class SparkSubmitTests(unittest.TestCase):
# this will fail if you have different spark.executor.memory
# in conf/spark-defaults.conf
proc = subprocess.Popen(
- [self.sparkSubmit, "--master", "local-cluster[1,1,1024]", script],
+ self.sparkSubmit + ["--master", "local-cluster[1,1,1024]", script],
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
@@ -2144,7 +2153,7 @@ class SparkSubmitTests(unittest.TestCase):
| sc.stop()
""")
proc = subprocess.Popen(
- [self.sparkSubmit, "--master", "local", script],
+ self.sparkSubmit + ["--master", "local", script],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
out, err = proc.communicate()
http://git-wip-us.apache.org/repos/asf/spark/blob/a634d66c/python/run-tests.py
----------------------------------------------------------------------
diff --git a/python/run-tests.py b/python/run-tests.py
index f408fc5..4c90926 100755
--- a/python/run-tests.py
+++ b/python/run-tests.py
@@ -22,11 +22,13 @@ import logging
from optparse import OptionParser
import os
import re
+import shutil
import subprocess
import sys
import tempfile
from threading import Thread, Lock
import time
+import uuid
if sys.version < '3':
import Queue
else:
@@ -68,7 +70,7 @@ else:
raise Exception("Cannot find assembly build directory, please build Spark first.")
-def run_individual_python_test(test_name, pyspark_python):
+def run_individual_python_test(target_dir, test_name, pyspark_python):
env = dict(os.environ)
env.update({
'SPARK_DIST_CLASSPATH': SPARK_DIST_CLASSPATH,
@@ -77,6 +79,23 @@ def run_individual_python_test(test_name, pyspark_python):
'PYSPARK_PYTHON': which(pyspark_python),
'PYSPARK_DRIVER_PYTHON': which(pyspark_python)
})
+
+ # Create a unique temp directory under 'target/' for each run. The TMPDIR variable is
+ # recognized by the tempfile module to override the default system temp directory.
+ tmp_dir = os.path.join(target_dir, str(uuid.uuid4()))
+ while os.path.isdir(tmp_dir):
+ tmp_dir = os.path.join(target_dir, str(uuid.uuid4()))
+ os.mkdir(tmp_dir)
+ env["TMPDIR"] = tmp_dir
+
+ # Also override the JVM's temp directory by setting driver and executor options.
+ spark_args = [
+ "--conf", "spark.driver.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
+ "--conf", "spark.executor.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
+ "pyspark-shell"
+ ]
+ env["PYSPARK_SUBMIT_ARGS"] = " ".join(spark_args)
+
LOGGER.info("Starting test(%s): %s", pyspark_python, test_name)
start_time = time.time()
try:
@@ -84,6 +103,7 @@ def run_individual_python_test(test_name, pyspark_python):
retcode = subprocess.Popen(
[os.path.join(SPARK_HOME, "bin/pyspark"), test_name],
stderr=per_test_output, stdout=per_test_output, env=env).wait()
+ shutil.rmtree(tmp_dir, ignore_errors=True)
except:
LOGGER.exception("Got exception while running %s with %s", test_name, pyspark_python)
# Here, we use os._exit() instead of sys.exit() in order to force Python to exit even if
@@ -238,6 +258,11 @@ def main():
priority = 100
task_queue.put((priority, (python_exec, test_goal)))
+ # Create the target directory before starting tasks to avoid races.
+ target_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'target'))
+ if not os.path.isdir(target_dir):
+ os.mkdir(target_dir)
+
def process_queue(task_queue):
while True:
try:
@@ -245,7 +270,7 @@ def main():
except Queue.Empty:
break
try:
- run_individual_python_test(test_goal, python_exec)
+ run_individual_python_test(target_dir, test_goal, python_exec)
finally:
task_queue.task_done()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org