You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dtenedor (via GitHub)" <gi...@apache.org> on 2023/10/03 22:43:18 UTC

[PR] [SPARK-45402][SQL][Python] Add UDTF API for 'analyze' to return a buffer to consume on each class creation [spark]

dtenedor opened a new pull request, #43204:
URL: https://github.com/apache/spark/pull/43204

   ### What changes were proposed in this pull request?
   
   This PR adds a Python UDTF API for 'analyze' to return a buffer to consume on each class creation.
   
   * The `AnalyzeResult` class now contains a new string field `prepare_buffer`.
   * If assigned to a non-empty value, the UDTF should have another method `prepare` that accepts the string argument, which will get called after `__init__` when the class is  created.
   * The format of the buffer is opaque and known only to the UDTF. Common use-cases including serializing protocol buffers or JSON objects into the buffer in order to help organize the contents therein.
   
   For example, this UDTF accepts a constant scalar string argument, then assigns this value to the buffer.
   
   ```
   @udtf
   class TestUDTF:
       def __init__(self):
           self._total = 0
           self._buffer = None
   
       @staticmethod
       def analyze(argument, _):
           return AnalyzeResult(
               schema=StructType().add("total", IntegerType()).add("buffer", StringType()),
               prepare_buffer=argument.value,
               with_single_partition=True)
   
       def prepare(self, buffer):
           self._buffer = buffer
           self._total = len(buffer)
   
       def eval(self, argument, row: Row):
           self._total += 1
   
       def terminate(self):
           yield self._total, self._buffer
   ```
   
   ### Why are the changes needed?
   
   In this way, the UDTF can perform potentially expensive initialization logic in the `analyze` method just once and result the result of such initialization rather than repeating the initialization in `eval`.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, see above.
   
   ### How was this patch tested?
   
   This PR adds new unit test coverage.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' methods to consume previous 'analyze' result [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1353474035


##########
sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala:
##########
@@ -524,27 +524,42 @@ object IntegratedUDFTestUtils extends SQLHelper {
     val name: String = "UDTFWithSinglePartition"
     val pythonScript: String =
       s"""
+        |import json
+        |from dataclasses import dataclass
         |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, PartitioningColumn
         |from pyspark.sql.types import IntegerType, Row, StructType
+        |
+        |@dataclass
+        |class AnalyzeResultWithBuffer(AnalyzeResult):
+        |    buffer: str = ""
+        |
         |class $name:
         |    def __init__(self):
         |        self._count = 0
+        |        # self._count = json.loads(buffer)["initial_count"]

Review Comment:
   Shall we remove this?



##########
python/pyspark/worker.py:
##########
@@ -693,6 +699,21 @@ def read_udtf(pickleSer, infile, eval_type):
             f"The return type of a UDTF must be a struct type, but got {type(return_type)}."
         )
 
+    # Update the handler that creates a new UDTF instance to first try calling the UDTF constructor
+    # with one argument containing the previous AnalyzeResult. If that fails, then try a constructor
+    # with no arguments. In this way each UDTF class instance can decide if it wants to inspect the
+    # AnalyzeResult.
+    if has_pickled_analyze_result:
+        prev_handler = handler
+
+        def construct_udtf():
+            try:
+                return prev_handler(dataclasses.replace(pickled_analyze_result))
+            except TypeError:
+                return prev_handler()

Review Comment:
   This means the UDTF handler does not accept an analyzeResult object in its `__init__` method? The try...except block will always be invoked when we call `contruct_udtf` which can be expensive. I wonder if we can only call this once.



##########
python/pyspark/sql/udtf.py:
##########
@@ -85,7 +85,7 @@ class OrderingColumn:
     overrideNullsFirst: Optional[bool] = None
 
 
-@dataclass(frozen=True)
+@dataclass

Review Comment:
   It would be really good to add this as a comment :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'analyze' to return a buffer to consume on each class creation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #43204:
URL: https://github.com/apache/spark/pull/43204#issuecomment-1753741579

   Hi @allisonwang-db @ueshin thanks for your reviews, these were good comments, please look again! I think the new API is better now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' methods to consume previous 'analyze' result [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1353211521


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -284,6 +285,16 @@ object UserDefinedPythonTableFunction {
       val schema = DataType.fromJson(
         PythonWorkerUtils.readUTF(length, dataIn)).asInstanceOf[StructType]
 
+      // Receive the pickled AnalyzeResult buffer, if any.
+      val pickledAnalyzeResult: Option[Array[Byte]] = dataIn.readInt() match {
+        case 0 =>
+          None

Review Comment:
   Right, good point. I changed this to make the `pickedAnalyzeResult: Array[Byte]` required in the `PythonUDTFAnalyzeResult`, and optional in the `PythonUDTF` class itself. In the latter case, the field will be `None` if the UDTF was defined with a static output schema instead of implementing an `analyze` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' methods to consume previous 'analyze' result [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1350811414


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -241,12 +248,17 @@ case class UnresolvedPolymorphicPythonUDTF(
  * @param orderByExpressions if non-empty, this contains the list of ordering items that the
  *                           'analyze' method explicitly indicated that the UDTF call should consume
  *                           the input table rows by
+ * @param pickledAnalyzeResult this is the pickled 'AnalyzeResult' instance from the UDTF, which
+ *                              contains all metadata returned by the Python UDTF 'analyze' method
+ *                              including the result schema of the function call as well as optional
+ *                              other information
  */
 case class PythonUDTFAnalyzeResult(
     schema: StructType,
     withSinglePartition: Boolean,
     partitionByExpressions: Seq[Expression],
-    orderByExpressions: Seq[SortOrder]) {
+    orderByExpressions: Seq[SortOrder],
+    pickledAnalyzeResult: Option[Array[Byte]]) {

Review Comment:
   We don't need `Option` here as it will always be set?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' methods to consume previous 'analyze' result [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1355452243


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonUDTFExec.scala:
##########
@@ -132,7 +135,15 @@ object PythonUDTFRunner {
       case None =>
         dataOut.writeInt(0)
     }
+    // Write the pickled AnalyzeResult buffer from the UDTF "analyze" method, if any.
+    dataOut.writeBoolean(udtf.pickledAnalyzeResult.nonEmpty)
+    udtf.pickledAnalyzeResult.map { p =>

Review Comment:
   nit: `foreach` instead of `map`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -284,6 +285,14 @@ object UserDefinedPythonTableFunction {
       val schema = DataType.fromJson(
         PythonWorkerUtils.readUTF(length, dataIn)).asInstanceOf[StructType]
 
+      // Receive the pickled AnalyzeResult buffer, if any.
+      val pickledAnalyzeResult: Array[Byte] = dataIn.readInt() match {
+        case length =>
+          val obj = new Array[Byte](length)
+          dataIn.readFully(obj)
+          obj
+      }

Review Comment:
   Shall we use `PythonWorkerUtils.readBytes` as https://github.com/apache/spark/pull/43321 is merged?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonUDTFExec.scala:
##########
@@ -132,7 +135,15 @@ object PythonUDTFRunner {
       case None =>
         dataOut.writeInt(0)
     }
+    // Write the pickled AnalyzeResult buffer from the UDTF "analyze" method, if any.
+    dataOut.writeBoolean(udtf.pickledAnalyzeResult.nonEmpty)
+    udtf.pickledAnalyzeResult.map { p =>
+      dataOut.writeInt(p.length)
+      dataOut.write(p)

Review Comment:
   ditto, `PythonWorkerUtils.writeBytes`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' methods to consume previous 'analyze' result [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1350991334


##########
python/pyspark/worker.py:
##########
@@ -693,6 +698,21 @@ def read_udtf(pickleSer, infile, eval_type):
             f"The return type of a UDTF must be a struct type, but got {type(return_type)}."
         )
 
+    # Update the handler that creates a new UDTF instance to first try calling the UDTF constructor
+    # with one argument containing the previous AnalyzeResult. If that fails, then try a constructor
+    # with no arguments. In this way each UDTF class instance can decide if it wants to inspect the
+    # AnalyzeResult.
+    if has_pickled_analyze_result:
+        prev_handler = handler
+
+        def construct_udtf():
+            try:
+                return prev_handler(pickled_analyze_result)

Review Comment:
   Sounds good, done.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -284,6 +285,16 @@ object UserDefinedPythonTableFunction {
       val schema = DataType.fromJson(
         PythonWorkerUtils.readUTF(length, dataIn)).asInstanceOf[StructType]
 
+      // Receive the pickled AnalyzeResult buffer, if any.
+      val pickledAnalyzeResult: Option[Array[Byte]] = dataIn.readInt() match {
+        case 0 =>
+          None

Review Comment:
   This should not happen unless the UDTF does not include an `analyze` method, in which case there is no picked `AnalyzeResult` buffer.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -241,12 +248,17 @@ case class UnresolvedPolymorphicPythonUDTF(
  * @param orderByExpressions if non-empty, this contains the list of ordering items that the
  *                           'analyze' method explicitly indicated that the UDTF call should consume
  *                           the input table rows by
+ * @param pickledAnalyzeResult this is the pickled 'AnalyzeResult' instance from the UDTF, which
+ *                              contains all metadata returned by the Python UDTF 'analyze' method
+ *                              including the result schema of the function call as well as optional
+ *                              other information
  */
 case class PythonUDTFAnalyzeResult(
     schema: StructType,
     withSinglePartition: Boolean,
     partitionByExpressions: Seq[Expression],
-    orderByExpressions: Seq[SortOrder]) {
+    orderByExpressions: Seq[SortOrder],
+    pickledAnalyzeResult: Option[Array[Byte]]) {

Review Comment:
   I find we need this since if the UDTF does not include an `analyze` method, this buffer is empty, and corresponding tests using these functions fail unless we check that this buffer is non-empty before using it.



##########
python/pyspark/sql/udtf.py:
##########
@@ -85,7 +85,7 @@ class OrderingColumn:
     overrideNullsFirst: Optional[bool] = None
 
 
-@dataclass(frozen=True)
+@dataclass

Review Comment:
   I found it was necessary to allow UDTFs to create subclasses of `AnalyzeResult`. This is the way that we plan for these functions to compute custom state in the `analyze` method just once per function call and then propagate this information to future class instances to be consumed by the `eval` and `terminate` methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' methods to consume previous 'analyze' result [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1355605235


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -284,6 +285,14 @@ object UserDefinedPythonTableFunction {
       val schema = DataType.fromJson(
         PythonWorkerUtils.readUTF(length, dataIn)).asInstanceOf[StructType]
 
+      // Receive the pickled AnalyzeResult buffer, if any.
+      val pickledAnalyzeResult: Array[Byte] = dataIn.readInt() match {
+        case length =>
+          val obj = new Array[Byte](length)
+          dataIn.readFully(obj)
+          obj
+      }

Review Comment:
   Sure, this is done.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonUDTFExec.scala:
##########
@@ -132,7 +135,15 @@ object PythonUDTFRunner {
       case None =>
         dataOut.writeInt(0)
     }
+    // Write the pickled AnalyzeResult buffer from the UDTF "analyze" method, if any.
+    dataOut.writeBoolean(udtf.pickledAnalyzeResult.nonEmpty)
+    udtf.pickledAnalyzeResult.map { p =>
+      dataOut.writeInt(p.length)
+      dataOut.write(p)

Review Comment:
   Done.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonUDTFExec.scala:
##########
@@ -132,7 +135,15 @@ object PythonUDTFRunner {
       case None =>
         dataOut.writeInt(0)
     }
+    // Write the pickled AnalyzeResult buffer from the UDTF "analyze" method, if any.
+    dataOut.writeBoolean(udtf.pickledAnalyzeResult.nonEmpty)
+    udtf.pickledAnalyzeResult.map { p =>

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][Python] Add UDTF API for 'analyze' to return a buffer to consume on each class creation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #43204:
URL: https://github.com/apache/spark/pull/43204#issuecomment-1745840699

   cc @ueshin @allisonwang-db @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'analyze' to return a buffer to consume on each class creation [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #43204:
URL: https://github.com/apache/spark/pull/43204#issuecomment-1746074413

   Implementation seems fine from a cursory look, but let me defer to @allisonwang-db and @ueshin for the design.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'analyze' to return a buffer to consume on each class creation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1350763041


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -167,22 +169,26 @@ abstract class UnevaluableGenerator extends Generator {
  * @param udfDeterministic true if this function is deterministic wherein it returns the same result
  *                         rows for every call with the same input arguments
  * @param resultId unique expression ID for this function invocation
- * @param pythonUDTFPartitionColumnIndexes holds the indexes of the TABLE argument to the Python
- *                                         UDTF call, if applicable
+ * @param pythonUDTFPartitionColumnIndexes holds the zero-based indexes of the projected results of
+ *                                         all PARTITION BY expressions within the TABLE argument of
+ *                                         the Python UDTF call, if applicable
  * @param analyzeResult holds the result of the polymorphic Python UDTF 'analze' method, if the UDTF
  *                      defined one
  */
 case class PythonUDTF(
     name: String,
     func: PythonFunction,
-    elementSchema: StructType,
+    analyzeResult: PythonUDTFAnalyzeResult,

Review Comment:
   Update, I switched it back to set the schema separately per your request.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -290,6 +295,20 @@ object UserDefinedPythonTableFunction {
           val msg = new String(obj, StandardCharsets.UTF_8)
           throw QueryCompilationErrors.tableValuedFunctionFailedToAnalyseInPythonError(msg)
       }
+      // Receive the "prepare_buffer" string, if any.
+      val prepareBuffer: String = dataIn.readInt() match {
+        case length: Int if length >= 0 =>
+          val obj = new Array[Byte](length)
+          dataIn.readFully(obj)
+          new String(obj, StandardCharsets.UTF_8)
+
+        case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+          val exLength = dataIn.readInt()
+          val obj = new Array[Byte](exLength)
+          dataIn.readFully(obj)
+          val msg = new String(obj, StandardCharsets.UTF_8)
+          throw QueryCompilationErrors.tableValuedFunctionFailedToAnalyseInPythonError(msg)

Review Comment:
   I removed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' methods to consume previous 'analyze' result [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1355450724


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -284,6 +285,14 @@ object UserDefinedPythonTableFunction {
       val schema = DataType.fromJson(
         PythonWorkerUtils.readUTF(length, dataIn)).asInstanceOf[StructType]
 
+      // Receive the pickled AnalyzeResult buffer, if any.
+      val pickledAnalyzeResult: Array[Byte] = dataIn.readInt() match {
+        case length =>
+          val obj = new Array[Byte](length)
+          dataIn.readFully(obj)
+          obj
+      }

Review Comment:
   Shall we use `PythonWorkerUtils.readBytes` as https://github.com/apache/spark/pull/43321 was merged?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' methods to consume previous 'analyze' result [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #43204:
URL: https://github.com/apache/spark/pull/43204#issuecomment-1758789225

   The failed tests seem not related to this PR. Let me merge this now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'analyze' to return a buffer to consume on each class creation [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1346427159


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -290,6 +295,20 @@ object UserDefinedPythonTableFunction {
           val msg = new String(obj, StandardCharsets.UTF_8)
           throw QueryCompilationErrors.tableValuedFunctionFailedToAnalyseInPythonError(msg)
       }
+      // Receive the "prepare_buffer" string, if any.
+      val prepareBuffer: String = dataIn.readInt() match {
+        case length: Int if length >= 0 =>
+          val obj = new Array[Byte](length)
+          dataIn.readFully(obj)
+          new String(obj, StandardCharsets.UTF_8)
+
+        case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+          val exLength = dataIn.readInt()
+          val obj = new Array[Byte](exLength)
+          dataIn.readFully(obj)
+          val msg = new String(obj, StandardCharsets.UTF_8)
+          throw QueryCompilationErrors.tableValuedFunctionFailedToAnalyseInPythonError(msg)

Review Comment:
   Why do we need this part?



##########
python/pyspark/sql/udtf.py:
##########
@@ -107,12 +107,20 @@ class AnalyzeResult:
         If non-empty, this is a sequence of columns that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partition_by' list must also be non-empty
         in this case.
+    prepare_buffer: str
+        If non-empty, this string represents state computed once within the 'analyze' method to be
+        propagated to each instance of the UDTF class at the time of its creation, using its
+        'prepare' method. The format this buffer is opaque and known only to the data source. Common
+        use cases include serializing protocol buffers or JSON configurations into this buffer so
+        that potentially expensive initialization work done at 'analyze' time does not need to be
+        recomputed later.
     """
 
     schema: StructType
     with_single_partition: bool = False
     partition_by: Sequence[PartitioningColumn] = field(default_factory=tuple)
     order_by: Sequence[OrderingColumn] = field(default_factory=tuple)
+    prepare_buffer: str = ""

Review Comment:
   I guess we should distinguish an empty string and `None`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -167,22 +169,26 @@ abstract class UnevaluableGenerator extends Generator {
  * @param udfDeterministic true if this function is deterministic wherein it returns the same result
  *                         rows for every call with the same input arguments
  * @param resultId unique expression ID for this function invocation
- * @param pythonUDTFPartitionColumnIndexes holds the indexes of the TABLE argument to the Python
- *                                         UDTF call, if applicable
+ * @param pythonUDTFPartitionColumnIndexes holds the zero-based indexes of the projected results of
+ *                                         all PARTITION BY expressions within the TABLE argument of
+ *                                         the Python UDTF call, if applicable
  * @param analyzeResult holds the result of the polymorphic Python UDTF 'analze' method, if the UDTF
  *                      defined one
  */
 case class PythonUDTF(
     name: String,
     func: PythonFunction,
-    elementSchema: StructType,
+    analyzeResult: PythonUDTFAnalyzeResult,

Review Comment:
   I'm not sure if we should have the whole `PythonUDTFAnalyzeResult` here..



##########
python/pyspark/worker.py:
##########
@@ -786,6 +787,24 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             else:
                 return arg
 
+    # Wrap the UDTF handler to call the "prepare" method if there was a non-empty "prepare_buffer"
+    # string returned from the "analyze" method earlier.
+    if len(prepare_buffer) > 0:
+        prev_handler = handler
+
+        def handler_with_prepare():
+            new_udtf = prev_handler()
+            if new_udtf.prepare is None:
+                raise PySparkRuntimeError(
+                    "The 'analyze' method returned a non-empty 'prepare_buffer' string, but the "

Review Comment:
   We should use some error class here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' methods to consume previous 'analyze' result [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin closed pull request #43204: [SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' methods to consume previous 'analyze' result
URL: https://github.com/apache/spark/pull/43204


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'analyze' to return a buffer to consume on each class creation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1350763173


##########
python/pyspark/worker.py:
##########
@@ -786,6 +787,24 @@ def _remove_partition_by_exprs(self, arg: Any) -> Any:
             else:
                 return arg
 
+    # Wrap the UDTF handler to call the "prepare" method if there was a non-empty "prepare_buffer"
+    # string returned from the "analyze" method earlier.
+    if len(prepare_buffer) > 0:
+        prev_handler = handler
+
+        def handler_with_prepare():
+            new_udtf = prev_handler()
+            if new_udtf.prepare is None:
+                raise PySparkRuntimeError(
+                    "The 'analyze' method returned a non-empty 'prepare_buffer' string, but the "

Review Comment:
   This part is gone now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' methods to consume previous 'analyze' result [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1350775454


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -284,6 +285,16 @@ object UserDefinedPythonTableFunction {
       val schema = DataType.fromJson(
         PythonWorkerUtils.readUTF(length, dataIn)).asInstanceOf[StructType]
 
+      // Receive the pickled AnalyzeResult buffer, if any.
+      val pickledAnalyzeResult: Option[Array[Byte]] = dataIn.readInt() match {
+        case 0 =>
+          None

Review Comment:
   I guess this will never happen?



##########
python/pyspark/worker.py:
##########
@@ -693,6 +698,21 @@ def read_udtf(pickleSer, infile, eval_type):
             f"The return type of a UDTF must be a struct type, but got {type(return_type)}."
         )
 
+    # Update the handler that creates a new UDTF instance to first try calling the UDTF constructor
+    # with one argument containing the previous AnalyzeResult. If that fails, then try a constructor
+    # with no arguments. In this way each UDTF class instance can decide if it wants to inspect the
+    # AnalyzeResult.
+    if has_pickled_analyze_result:
+        prev_handler = handler
+
+        def construct_udtf():
+            try:
+                return prev_handler(pickled_analyze_result)

Review Comment:
   We should use a clone of the argument by `dataclasses.replace(pickled_analyze_result)` as now that it's not `frozen=True`?



##########
python/pyspark/sql/udtf.py:
##########
@@ -85,7 +85,7 @@ class OrderingColumn:
     overrideNullsFirst: Optional[bool] = None
 
 
-@dataclass(frozen=True)
+@dataclass

Review Comment:
   Do we need this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' methods to consume previous 'analyze' result [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1350999129


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -284,6 +285,16 @@ object UserDefinedPythonTableFunction {
       val schema = DataType.fromJson(
         PythonWorkerUtils.readUTF(length, dataIn)).asInstanceOf[StructType]
 
+      // Receive the pickled AnalyzeResult buffer, if any.
+      val pickledAnalyzeResult: Option[Array[Byte]] = dataIn.readInt() match {
+        case 0 =>
+          None

Review Comment:
   So it won't happen here as this is running `analyze` method.
   I think `PythonUDTFAnalyzeResult.pickledAnalyzeResult` should be `Array[Byte]` and it should be wrapped with `Some` in `Analyzer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'analyze' to return a buffer to consume on each class creation [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1347981218


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -2309,6 +2309,55 @@ def terminate(self):
             + [Row(partition_col=42, count=3, total=3, last=None)],
         )
 
+    def test_udtf_with_prepare_string_from_analyze(self):
+        @udtf
+        class TestUDTF:
+            def __init__(self):
+                self._total = 0
+                self._buffer = None
+
+            @staticmethod
+            def analyze(argument, _):
+                if (
+                    argument.value is None
+                    or argument.is_table
+                    or not isinstance(argument.value, str)
+                    or len(argument.value) == 0
+                ):
+                    raise Exception("The first argument must be non-empty string")
+                assert argument.data_type == StringType()
+                assert not argument.is_table
+                return AnalyzeResult(
+                    schema=StructType().add("total", IntegerType()).add("buffer", StringType()),
+                    prepare_buffer=argument.value,
+                    with_single_partition=True,
+                )
+
+            def prepare(self, buffer):

Review Comment:
   We want to pass in some string values returned from the `analyze` method when initiating the UDTF. Should this be part of the `__init__`  method of the UDTF?



##########
python/pyspark/sql/udtf.py:
##########
@@ -107,12 +107,20 @@ class AnalyzeResult:
         If non-empty, this is a sequence of columns that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partition_by' list must also be non-empty
         in this case.
+    prepare_buffer: str
+        If non-empty, this string represents state computed once within the 'analyze' method to be
+        propagated to each instance of the UDTF class at the time of its creation, using its
+        'prepare' method. The format this buffer is opaque and known only to the data source. Common
+        use cases include serializing protocol buffers or JSON configurations into this buffer so
+        that potentially expensive initialization work done at 'analyze' time does not need to be
+        recomputed later.
     """
 
     schema: StructType
     with_single_partition: bool = False
     partition_by: Sequence[PartitioningColumn] = field(default_factory=tuple)
     order_by: Sequence[OrderingColumn] = field(default_factory=tuple)
+    prepare_buffer: str = ""

Review Comment:
   Hmm I am not sure where this should be introduced. Currently, the `AnalyzeResult` represents some logical planning information such as output schema, ordering, and partitioning, and it's a bit confusing to me how to use this buffer (maybe we should change its name..)



##########
python/pyspark/sql/udtf.py:
##########
@@ -107,12 +107,20 @@ class AnalyzeResult:
         If non-empty, this is a sequence of columns that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partition_by' list must also be non-empty
         in this case.
+    prepare_buffer: str

Review Comment:
   I wonder why it's a string type. Anything that can be pickled should be fine right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' methods to consume previous 'analyze' result [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1353486102


##########
sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala:
##########
@@ -524,27 +524,42 @@ object IntegratedUDFTestUtils extends SQLHelper {
     val name: String = "UDTFWithSinglePartition"
     val pythonScript: String =
       s"""
+        |import json
+        |from dataclasses import dataclass
         |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, PartitioningColumn
         |from pyspark.sql.types import IntegerType, Row, StructType
+        |
+        |@dataclass
+        |class AnalyzeResultWithBuffer(AnalyzeResult):
+        |    buffer: str = ""
+        |
         |class $name:
         |    def __init__(self):
         |        self._count = 0
+        |        # self._count = json.loads(buffer)["initial_count"]

Review Comment:
   Yes, sorry for missing this.



##########
python/pyspark/sql/udtf.py:
##########
@@ -85,7 +85,7 @@ class OrderingColumn:
     overrideNullsFirst: Optional[bool] = None
 
 
-@dataclass(frozen=True)
+@dataclass

Review Comment:
   Sounds good, done.



##########
python/pyspark/worker.py:
##########
@@ -693,6 +699,21 @@ def read_udtf(pickleSer, infile, eval_type):
             f"The return type of a UDTF must be a struct type, but got {type(return_type)}."
         )
 
+    # Update the handler that creates a new UDTF instance to first try calling the UDTF constructor
+    # with one argument containing the previous AnalyzeResult. If that fails, then try a constructor
+    # with no arguments. In this way each UDTF class instance can decide if it wants to inspect the
+    # AnalyzeResult.
+    if has_pickled_analyze_result:
+        prev_handler = handler
+
+        def construct_udtf():
+            try:
+                return prev_handler(dataclasses.replace(pickled_analyze_result))
+            except TypeError:
+                return prev_handler()

Review Comment:
   Yes, this is what it means.
   Good point, I added a variable to record this and we can set it to false in this code path and avoid trying to pass the AnalyzeResult again for future calls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'analyze' to return a buffer to consume on each class creation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1348084412


##########
python/pyspark/sql/udtf.py:
##########
@@ -107,12 +107,20 @@ class AnalyzeResult:
         If non-empty, this is a sequence of columns that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partition_by' list must also be non-empty
         in this case.
+    prepare_buffer: str
+        If non-empty, this string represents state computed once within the 'analyze' method to be
+        propagated to each instance of the UDTF class at the time of its creation, using its
+        'prepare' method. The format this buffer is opaque and known only to the data source. Common
+        use cases include serializing protocol buffers or JSON configurations into this buffer so
+        that potentially expensive initialization work done at 'analyze' time does not need to be
+        recomputed later.
     """
 
     schema: StructType
     with_single_partition: bool = False
     partition_by: Sequence[PartitioningColumn] = field(default_factory=tuple)
     order_by: Sequence[OrderingColumn] = field(default_factory=tuple)
+    prepare_buffer: str = ""

Review Comment:
   We decided to remove this and just pickle the AnalyzeResult instance itself. So this AnalyzeResult class doesn't need to change.



##########
python/pyspark/sql/udtf.py:
##########
@@ -107,12 +107,20 @@ class AnalyzeResult:
         If non-empty, this is a sequence of columns that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partition_by' list must also be non-empty
         in this case.
+    prepare_buffer: str
+        If non-empty, this string represents state computed once within the 'analyze' method to be
+        propagated to each instance of the UDTF class at the time of its creation, using its
+        'prepare' method. The format this buffer is opaque and known only to the data source. Common
+        use cases include serializing protocol buffers or JSON configurations into this buffer so
+        that potentially expensive initialization work done at 'analyze' time does not need to be
+        recomputed later.
     """
 
     schema: StructType
     with_single_partition: bool = False
     partition_by: Sequence[PartitioningColumn] = field(default_factory=tuple)
     order_by: Sequence[OrderingColumn] = field(default_factory=tuple)
+    prepare_buffer: str = ""

Review Comment:
   We decided to remove this and just pickle the AnalyzeResult instance itself. So this AnalyzeResult class doesn't need to change.



##########
python/pyspark/sql/udtf.py:
##########
@@ -107,12 +107,20 @@ class AnalyzeResult:
         If non-empty, this is a sequence of columns that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partition_by' list must also be non-empty
         in this case.
+    prepare_buffer: str

Review Comment:
   We decided to remove this and just pickle the `AnalyzeResult` instance itself. So this `AnalyzeResult` class doesn't need to change.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -167,22 +169,26 @@ abstract class UnevaluableGenerator extends Generator {
  * @param udfDeterministic true if this function is deterministic wherein it returns the same result
  *                         rows for every call with the same input arguments
  * @param resultId unique expression ID for this function invocation
- * @param pythonUDTFPartitionColumnIndexes holds the indexes of the TABLE argument to the Python
- *                                         UDTF call, if applicable
+ * @param pythonUDTFPartitionColumnIndexes holds the zero-based indexes of the projected results of
+ *                                         all PARTITION BY expressions within the TABLE argument of
+ *                                         the Python UDTF call, if applicable
  * @param analyzeResult holds the result of the polymorphic Python UDTF 'analze' method, if the UDTF
  *                      defined one
  */
 case class PythonUDTF(
     name: String,
     func: PythonFunction,
-    elementSchema: StructType,
+    analyzeResult: PythonUDTFAnalyzeResult,

Review Comment:
   We talked offline and now we need it because we need to pickle and send it back to the UDTF.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'analyze' to return a buffer to consume on each class creation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1350762771


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -2309,6 +2309,55 @@ def terminate(self):
             + [Row(partition_col=42, count=3, total=3, last=None)],
         )
 
+    def test_udtf_with_prepare_string_from_analyze(self):
+        @udtf
+        class TestUDTF:
+            def __init__(self):
+                self._total = 0
+                self._buffer = None
+
+            @staticmethod
+            def analyze(argument, _):
+                if (
+                    argument.value is None
+                    or argument.is_table
+                    or not isinstance(argument.value, str)
+                    or len(argument.value) == 0
+                ):
+                    raise Exception("The first argument must be non-empty string")
+                assert argument.data_type == StringType()
+                assert not argument.is_table
+                return AnalyzeResult(
+                    schema=StructType().add("total", IntegerType()).add("buffer", StringType()),
+                    prepare_buffer=argument.value,
+                    with_single_partition=True,
+                )
+
+            def prepare(self, buffer):

Review Comment:
   This is done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45402][SQL][PYTHON] Add UDTF API for 'eval' and 'terminate' methods to consume previous 'analyze' result [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #43204:
URL: https://github.com/apache/spark/pull/43204#issuecomment-1758789340

   Thanks! merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org