You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/01/18 15:06:54 UTC

spark git commit: [SPARK-19223][SQL][PYSPARK] Fix InputFileBlockHolder for datasources which are based on HadoopRDD or NewHadoopRDD

Repository: spark
Updated Branches:
  refs/heads/master f85f29608 -> d06172b88


[SPARK-19223][SQL][PYSPARK] Fix InputFileBlockHolder for datasources which are based on HadoopRDD or NewHadoopRDD

## What changes were proposed in this pull request?

For some datasources which are based on HadoopRDD or NewHadoopRDD, such as spark-xml, InputFileBlockHolder doesn't work with Python UDF.

The method to reproduce it is, running the following codes with `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`:

    from pyspark.sql.functions import udf,input_file_name
    from pyspark.sql.types import StringType
    from pyspark.sql import SparkSession

    def filename(path):
        return path

    session = SparkSession.builder.appName('APP').getOrCreate()

    session.udf.register('sameText', filename)
    sameText = udf(filename, StringType())

    df = session.read.format('xml').load('a.xml', rowTag='root').select('*', input_file_name().alias('file'))
    df.select('file').show() # works
    df.select(sameText(df['file'])).show()   # returns empty content

The issue is because in `HadoopRDD` and `NewHadoopRDD` we set the file block's info in `InputFileBlockHolder` before the returned iterator begins consuming. `InputFileBlockHolder` will record this info into thread local variable. When running Python UDF in batch, we set up another thread to consume the iterator from child plan's output rdd, so we can't read the info back in another thread.

To fix this, we have to set the info in `InputFileBlockHolder` after the iterator begins consuming. So the info can be read in correct thread.

## How was this patch tested?

Manual test with above example codes for spark-xml package on pyspark: `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`.

Added pyspark test.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #16585 from viirya/fix-inputfileblock-hadooprdd.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d06172b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d06172b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d06172b8

Branch: refs/heads/master
Commit: d06172b88e61c0f79e3dea5703a17c6ae590f248
Parents: f85f296
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Wed Jan 18 23:06:44 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Jan 18 23:06:44 2017 +0800

----------------------------------------------------------------------
 .../apache/spark/rdd/InputFileBlockHolder.scala |  7 +++---
 python/pyspark/sql/tests.py                     | 24 ++++++++++++++++++++
 2 files changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d06172b8/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala b/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala
index 9ba476d..ff2f58d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala
@@ -41,9 +41,10 @@ private[spark] object InputFileBlockHolder {
    * The thread variable for the name of the current file being read. This is used by
    * the InputFileName function in Spark SQL.
    */
-  private[this] val inputBlock: ThreadLocal[FileBlock] = new ThreadLocal[FileBlock] {
-    override protected def initialValue(): FileBlock = new FileBlock
-  }
+  private[this] val inputBlock: InheritableThreadLocal[FileBlock] =
+    new InheritableThreadLocal[FileBlock] {
+      override protected def initialValue(): FileBlock = new FileBlock
+    }
 
   /**
    * Returns the holding file name or empty string if it is unknown.

http://git-wip-us.apache.org/repos/asf/spark/blob/d06172b8/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index a825028..73a5df6 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -435,6 +435,30 @@ class SQLTests(ReusedPySparkTestCase):
         row = self.spark.read.json(filePath).select(sourceFile(input_file_name())).first()
         self.assertTrue(row[0].find("people1.json") != -1)
 
+    def test_udf_with_input_file_name_for_hadooprdd(self):
+        from pyspark.sql.functions import udf, input_file_name
+        from pyspark.sql.types import StringType
+
+        def filename(path):
+            return path
+
+        sameText = udf(filename, StringType())
+
+        rdd = self.sc.textFile('python/test_support/sql/people.json')
+        df = self.spark.read.json(rdd).select(input_file_name().alias('file'))
+        row = df.select(sameText(df['file'])).first()
+        self.assertTrue(row[0].find("people.json") != -1)
+
+        rdd2 = self.sc.newAPIHadoopFile(
+            'python/test_support/sql/people.json',
+            'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
+            'org.apache.hadoop.io.LongWritable',
+            'org.apache.hadoop.io.Text')
+
+        df2 = self.spark.read.json(rdd2).select(input_file_name().alias('file'))
+        row2 = df2.select(sameText(df2['file'])).first()
+        self.assertTrue(row2[0].find("people.json") != -1)
+
     def test_basic_functions(self):
         rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
         df = self.spark.read.json(rdd)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org