You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/05/26 00:25:33 UTC

[spark] branch branch-2.4 updated: [SPARK-27711][CORE] Unset InputFileBlockHolder at the end of tasks

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new a287110  [SPARK-27711][CORE] Unset InputFileBlockHolder at the end of tasks
a287110 is described below

commit a28711043f6a48da72cc2fefc6b07145da6387ab
Author: Jose Torres <to...@gmail.com>
AuthorDate: Sat May 25 17:25:08 2019 -0700

    [SPARK-27711][CORE] Unset InputFileBlockHolder at the end of tasks
    
    ## What changes were proposed in this pull request?
    
    Unset InputFileBlockHolder at the end of tasks to stop the file name from leaking over to other tasks in the same thread. This happens in particular in Pyspark because of its complex threading model.
    
    Backport to 2.4.
    
    ## How was this patch tested?
    
    new pyspark test
    
    Closes #24690 from jose-torres/fix24.
    
    Authored-by: Jose Torres <to...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../main/scala/org/apache/spark/scheduler/Task.scala    |  2 ++
 python/pyspark/sql/tests.py                             | 17 ++++++++++++++++-
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index eb059f1..182f479 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.config.APP_CALLER_CONTEXT
 import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.rdd.InputFileBlockHolder
 import org.apache.spark.util._
 
 /**
@@ -154,6 +155,7 @@ private[spark] abstract class Task[T](
           // Though we unset the ThreadLocal here, the context member variable itself is still
           // queried directly in the TaskRunner to check for FetchFailedExceptions.
           TaskContext.unset()
+          InputFileBlockHolder.unset()
         }
       }
     }
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 2e6d015..642d5e0 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -88,7 +88,7 @@ from pyspark.sql.types import _array_signed_int_typecode_ctype_mappings, _array_
 from pyspark.sql.types import _array_unsigned_int_typecode_ctype_mappings
 from pyspark.sql.types import _merge_type
 from pyspark.tests import QuietTest, ReusedPySparkTestCase, PySparkTestCase, SparkSubmitTests
-from pyspark.sql.functions import UserDefinedFunction, sha2, lit
+from pyspark.sql.functions import UserDefinedFunction, sha2, lit, input_file_name
 from pyspark.sql.window import Window
 from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException
 
@@ -832,6 +832,21 @@ class SQLTests(ReusedSQLTestCase):
         row2 = df2.select(sameText(df2['file'])).first()
         self.assertTrue(row2[0].find("people.json") != -1)
 
+    def test_input_file_name_reset_for_rdd(self):
+        rdd = self.sc.textFile('python/test_support/hello/hello.txt').map(lambda x: {'data': x})
+        df = self.spark.createDataFrame(rdd, "data STRING")
+        df.select(input_file_name().alias('file')).collect()
+
+        non_file_df = self.spark.range(100).select(input_file_name())
+
+        results = non_file_df.collect()
+        self.assertTrue(len(results) == 100)
+
+        # [SPARK-24605]: if everything was properly reset after the last job, this should return
+        # empty string rather than the file read in the last job.
+        for result in results:
+            self.assertEqual(result[0], '')
+
     def test_udf_defers_judf_initialization(self):
         # This is separate of  UDFInitializationTests
         # to avoid context initialization


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org