You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/07/29 09:16:31 UTC

git commit: [SPARK-2580] [PySpark] keep silent in worker if JVM close the socket

Repository: spark
Updated Branches:
  refs/heads/master 16ef4d110 -> ccd5ab5f8


[SPARK-2580] [PySpark] keep silent in worker if JVM close the socket

During rdd.take(n), JVM will close the socket if it had got enough data, the Python worker should keep silent in this case.

In the same time, the worker should not print the trackback into stderr if it send the traceback to JVM successfully.

Author: Davies Liu <da...@gmail.com>

Closes #1625 from davies/error and squashes the following commits:

4fbcc6d [Davies Liu] disable log4j during testing when exception is expected.
cc14202 [Davies Liu] keep silent in worker if JVM close the socket


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccd5ab5f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccd5ab5f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccd5ab5f

Branch: refs/heads/master
Commit: ccd5ab5f82812abc2eb518448832cc20fb903345
Parents: 16ef4d1
Author: Davies Liu <da...@gmail.com>
Authored: Tue Jul 29 00:15:45 2014 -0700
Committer: Josh Rosen <jo...@apache.org>
Committed: Tue Jul 29 00:15:45 2014 -0700

----------------------------------------------------------------------
 python/pyspark/tests.py  |  6 ++++++
 python/pyspark/worker.py | 21 +++++++++++++--------
 2 files changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ccd5ab5f/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 63cc5e9..6dee7dc 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -165,11 +165,17 @@ class TestAddFile(PySparkTestCase):
     def test_add_py_file(self):
         # To ensure that we're actually testing addPyFile's effects, check that
         # this job fails due to `userlibrary` not being on the Python path:
+        # disable logging in log4j temporarily
+        log4j = self.sc._jvm.org.apache.log4j
+        old_level = log4j.LogManager.getRootLogger().getLevel()
+        log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
         def func(x):
             from userlibrary import UserClass
             return UserClass().hello()
         self.assertRaises(Exception,
                           self.sc.parallelize(range(2)).map(func).first)
+        log4j.LogManager.getRootLogger().setLevel(old_level)
+
         # Add the file, so the job should now succeed:
         path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py")
         self.sc.addPyFile(path)

http://git-wip-us.apache.org/repos/asf/spark/blob/ccd5ab5f/python/pyspark/worker.py
----------------------------------------------------------------------
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 24d41b1..2770f63 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -75,14 +75,19 @@ def main(infile, outfile):
         init_time = time.time()
         iterator = deserializer.load_stream(infile)
         serializer.dump_stream(func(split_index, iterator), outfile)
-    except Exception as e:
-        # Write the error to stderr in addition to trying to pass it back to
-        # Java, in case it happened while serializing a record
-        print >> sys.stderr, "PySpark worker failed with exception:"
-        print >> sys.stderr, traceback.format_exc()
-        write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
-        write_with_length(traceback.format_exc(), outfile)
-        sys.exit(-1)
+    except Exception:
+        try:
+            write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
+            write_with_length(traceback.format_exc(), outfile)
+            outfile.flush()
+        except IOError:
+            # JVM close the socket
+            pass
+        except Exception:
+            # Write the error to stderr if it happened while serializing
+            print >> sys.stderr, "PySpark worker failed with exception:"
+            print >> sys.stderr, traceback.format_exc()
+        exit(-1)
     finish_time = time.time()
     report_times(outfile, boot_time, init_time, finish_time)
     # Mark the beginning of the accumulators section of the output