You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dtenedor (via GitHub)" <gi...@apache.org> on 2024/01/11 01:07:17 UTC

[PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for 'eval' and 'terminate' methods [spark]

dtenedor opened a new pull request, #44678:
URL: https://github.com/apache/spark/pull/44678

   ### What changes were proposed in this pull request?
   
   This PR creates a Python UDTF API to acquire execution memory for 'eval' and 'terminate' methods.
   
   For example, this UDTF accepts an argument representing the requested amount of memory to allocate (of which it will receive some subset), and a second argument containing the minimum memory allowed. It sets a status column to indicate an "out of memory" condition if the latter exceeds the memory allocated by Spark:
   
   ```
   from dataclasses import dataclass
   from pyspark.sql.functions import AnalyzeResult
   from pyspark.sql.types import IntegerType, LongType, StringType, StructType
   
   @dataclass
   class CustomAnalyzeResult(AnalyzeResult):
       minMemoryMb: int = 0
   
   class $name:
       def __init__(self, analyze_result):
           self._analyze_result = analyze_result
   
       @staticmethod
       def analyze(**kwargs):
           argument = kwargs.get("argument")
           min_memory_mb = kwargs.get("min_memory_mb").value
           if argument is not None:
               assert(argument.dataType == IntegerType() or argument.dataType == LongType())
               argument_value = argument.value
           else:
               argument_value = None
           return CustomAnalyzeResult(
               schema=StructType()
                   .add("initial_request", LongType())
                   .add("acquired_memory", LongType())
                   .add("min_memory", LongType())
                   .add("status", StringType()),
               acquireExecutionMemoryMbRequested=argument_value,
               minMemoryMb=min_memory_mb)
   
       def eval(self, **kwargs):
           pass
   
       def terminate(self):
             yield (
                 self._analyze_result.acquireExecutionMemoryMbRequested,
                 self._analyze_result.acquireExecutionMemoryMbActual,
                 self._analyze_result.minMemoryMb,
                 "OK" if self._analyze_result.acquireExecutionMemoryMbActual >=
                     self._analyze_result.minMemoryMb else "Insufficient memory")
   ```
   
   Invoking it yields the following:
   
   ```
   SELECT * FROM UDTFAcquireExecutionMemory(argument => 4, min_memory_mb => 0)
   > 4	4	0	OK
   
   SELECT * FROM UDTFAcquireExecutionMemory(argument => 4, min_memory_mb => 10)
   > 4	4	10	Insufficient memory
   ```
   
   ### Why are the changes needed?
   
   Python UDTFs that import large libraries or otherwise use up a lot of memory from storing many input rows in memory need to register this memory usage with Spark executors in order to protect against OOM crashes.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, see above.
   
   ### How was this patch tested?
   
   This PR adds test coverage.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1456402360


##########
python/pyspark/sql/worker/analyze_udtf.py:
##########
@@ -225,6 +226,19 @@ def format_error(msg: str) -> str:
                 write_int(1, outfile)
             else:
                 write_int(2, outfile)
+        # Return the requested amount of execution memory to acquire, if any.
+        write_long(
+            0
+            if result.acquireExecutionMemoryMbRequested is None
+            else result.acquireExecutionMemoryMbRequested,
+            outfile,
+        )
+        write_long(
+            0
+            if result.acquireExecutionMemoryMbActual is None
+            else result.acquireExecutionMemoryMbActual,
+            outfile,
+        )

Review Comment:
   Good catch, it appears not. I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "nickstanishadb (via GitHub)" <gi...@apache.org>.
nickstanishadb commented on PR #44678:
URL: https://github.com/apache/spark/pull/44678#issuecomment-1900503295

   @dtenedor I just had another thought about making something like this more usable for python users. It would be awesome to also provide a utility for python users to inspect their current instantaneous memory usage.
   
   They could use this during `analyze()` to get the size of their in-scope libraries and, assuming things are comparable on the executor, could request that amount + X MB for any additional data they would collect. Additionally they could use this inside `eval()` and `terminate()` and possibly execute some conditional logic if they are close to their allocated memory limit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1450991957


##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
         If non-empty, this is a sequence of expressions that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list must also be non-empty
         in this case.
+    acquireExecutionMemoryMbRequested: long

Review Comment:
   Good question; this should just indicate the max memory that the Python UDTF could consume, regardless of the Spark cluster size. Then the future call to `TaskMemoryManager.acquireExecutionMemory` will fail if the cluster does not have enough memory (possibly after asking existing operators to shrink their own memory usage by means such as spilling to disk).
   
   Best practices: In general, the Spark cluster owner should endeavor to provide enough executor memory to accommodate requests of this size so to support these functions. By the same token, UDTF developers should aim to keep requested (and actual) memory usage low enough to fit in common Spark cluster configurations. For most cases, this means at most in the low hundreds of megabytes for a single function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1450999974


##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
         If non-empty, this is a sequence of expressions that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list must also be non-empty
         in this case.
+    acquireExecutionMemoryMbRequested: long
+        If this is not None, this represents the amount of memory in megabytes that the UDTF should
+        request from each Spark executor that it runs on. Then the UDTF takes responsibility to use
+        at most this much memory, including all allocated objects. The purpose of this functionality
+        is to prevent executors from crashing by running out of memory due to the extra memory
+        consumption invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods. Spark will
+        then call 'TaskMemoryManager.acquireExecutionMemory' with the requested number of megabytes.
+    acquireExecutionMemoryMbActual: long
+        If there is a task context available, Spark will assign this field to the number of
+        megabytes returned from the call to the TaskMemoryManager.acquireExecutionMemory' method, as
+        consumed by the UDTF's'__init__' method. Therefore, its 'eval' and 'terminate' and 'cleanup'
+        methods will know it thereafter and can ensure to bound memory usage to at most this number.
+        Note that there is no effect if the UDTF's 'analyze' method assigns a value to this; it will
+        be overwritten.
     """

Review Comment:
   The `TaskMemoryManager.acquireExecutionMemory` API is a memory reservation system. The idea is that an operator that will consume memory in the future should call this method beforehand with the peak expected future memory usage on the executor. This comprises a memory reservation such that the sum of all such reservations may not reach or exceed the total memory available on the executor (less some fixed overhead for the executor's own data structures).
   
   For the case of Python UDTFs, the function should set `AnalyzeResult. acquireExecutionMemoryMbRequested` to the max expected memory use of the python process, including memory allocated by the function itself as well as any imported libraries, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #44678:
URL: https://github.com/apache/spark/pull/44678#issuecomment-1900726776

   @nickstanishadb how would this be different from calling Python's `resource.getrusage` within the `analyze` method, and then adding the resulting memory number into a subclass of `AnalyzeResult` to be propagated to subsequent `__init__` and `eval`/`terminate` methods? https://github.com/apache/spark/blob/master/python/docs/source/user_guide/sql/python_udtf.rst


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "nickstanishadb (via GitHub)" <gi...@apache.org>.
nickstanishadb commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1449007899


##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
         If non-empty, this is a sequence of expressions that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list must also be non-empty
         in this case.
+    acquireExecutionMemoryMbRequested: long
+        If this is not None, this represents the amount of memory in megabytes that the UDTF should
+        request from each Spark executor that it runs on. Then the UDTF takes responsibility to use
+        at most this much memory, including all allocated objects. The purpose of this functionality
+        is to prevent executors from crashing by running out of memory due to the extra memory
+        consumption invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods. Spark will
+        then call 'TaskMemoryManager.acquireExecutionMemory' with the requested number of megabytes.
+    acquireExecutionMemoryMbActual: long
+        If there is a task context available, Spark will assign this field to the number of
+        megabytes returned from the call to the TaskMemoryManager.acquireExecutionMemory' method, as
+        consumed by the UDTF's'__init__' method. Therefore, its 'eval' and 'terminate' and 'cleanup'
+        methods will know it thereafter and can ensure to bound memory usage to at most this number.
+        Note that there is no effect if the UDTF's 'analyze' method assigns a value to this; it will
+        be overwritten.
     """

Review Comment:
   @dtenedor I did some dumb memory profiling running the following UDTF
   
   ```python
   from pyspark.sql.functions import udtf
   import resource
   
   @udtf(returnType="step: int, memory: int")
   class SimpleUDTF:
       def __init__(self, *args, **kwargs):
           self.step_id = 0
       
       @staticmethod
       def get_peak_memory_usage_kb() -> int:
           return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
   
       def eval(self, *args, **kwargs):
           yield self.step_id, self.get_peak_memory_usage_kb()
           self.step_id += 1
   
       def terminate(self, *args, **kwargs):
           yield self.step_id, self.get_peak_memory_usage_kb()
   
   spark.udtf.register("pyUdtfMemProfile", SimpleUDTF)
   ```
   
   I'm not entirely confident in the results because I noticed running a high memory UDTF before the profiling UDTF would increase the peak memory usage (I guess they share a process?). But on a fresh 14.2 cluster I ran this and got max memory usage across 100 pyUdtfMemProfile invocations of `45440 KB`, which seems reasonable.
   
   Is this memory already accounted for / pre-allocated on the executor? It's not clear to me if a user should request `processSize + additionalClassMemory` or just `additionalClassMemory` when they're specifying `acquireExecutionMemoryMbRequested` and `minMemoryMb`. If it's the total then it seems like `minMemoryMb` should always be `>=1`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "nickstanishadb (via GitHub)" <gi...@apache.org>.
nickstanishadb commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1453648020


##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
         If non-empty, this is a sequence of expressions that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list must also be non-empty
         in this case.
+    acquireExecutionMemoryMbRequested: long
+        If this is not None, this represents the amount of memory in megabytes that the UDTF should
+        request from each Spark executor that it runs on. Then the UDTF takes responsibility to use
+        at most this much memory, including all allocated objects. The purpose of this functionality
+        is to prevent executors from crashing by running out of memory due to the extra memory
+        consumption invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods. Spark will
+        then call 'TaskMemoryManager.acquireExecutionMemory' with the requested number of megabytes.
+    acquireExecutionMemoryMbActual: long
+        If there is a task context available, Spark will assign this field to the number of
+        megabytes returned from the call to the TaskMemoryManager.acquireExecutionMemory' method, as
+        consumed by the UDTF's'__init__' method. Therefore, its 'eval' and 'terminate' and 'cleanup'
+        methods will know it thereafter and can ensure to bound memory usage to at most this number.
+        Note that there is no effect if the UDTF's 'analyze' method assigns a value to this; it will
+        be overwritten.
     """

Review Comment:
   Makes sense! What do you think about this being difficult to set, especially for UDTF developers? If you think the test I did with `pyUdtfMemProfile` is a reasonable estimate, what do we think of setting a global `minMemoryMb` to something like 100MB? I think that could make the manual memory assignment less prone to user-error. It's probably also good to have a floor on some level that way the number of UDTFs simultaneously running on an executor has a hard ceiling



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1454267605


##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
         If non-empty, this is a sequence of expressions that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list must also be non-empty
         in this case.
+    acquireExecutionMemoryMbRequested: long
+        If this is not None, this represents the amount of memory in MB that the UDTF should request
+        from each Spark executor that it runs on. Then the UDTF takes responsibility to use at most
+        this much memory, including all allocated objects. The purpose of this functionality is to
+        prevent executors from crashing by running out of memory due to the extra memory consumption
+        invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods. Spark will then call
+        'TaskMemoryManager.acquireExecutionMemory' with the requested number of MB.
+    acquireExecutionMemoryMbActual: long
+        If there is a task context available, Spark will assign this field to the number of MB
+        returned from the call to the TaskMemoryManager.acquireExecutionMemory' method, as consumed
+        by the UDTF's'__init__' method. Therefore, its 'eval' and 'terminate' and 'cleanup' methods
+        will know it thereafter and can ensure to bound memory usage to at most this number.
+        Note that there is no effect if the UDTF's 'analyze' method assigns a value to this; it will
+        be overwritten.
     """
 
     schema: StructType
     withSinglePartition: bool = False
     partitionBy: Sequence[PartitioningColumn] = field(default_factory=tuple)
     orderBy: Sequence[OrderingColumn] = field(default_factory=tuple)
+    acquireExecutionMemoryMbRequested: Optional[int] = 100

Review Comment:
   Why is the default value `100`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -99,7 +100,19 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
         projection(inputRow)
       }
 
-      val outputRowIterator = evaluate(argMetas, projectedRowIter, schema, context)
+      memoryConsumer.foreach { consumer =>
+        val acquireMemoryMbActual: Long = consumer.acquireMemory()
+        udtf.acquireMemoryMbActual = Some(acquireMemoryMbActual)
+      }
+      val outputRowIterator = try {
+        evaluate(argMetas, projectedRowIter, schema, context)
+      } finally {
+        if (TaskContext.get() != null) {
+          memoryConsumer.foreach { consumer =>
+            consumer.freeMemory()
+          }
+        }
+      }

Review Comment:
   This acquires memory while only creating the iterator. The actually execution won't be affected.



##########
python/pyspark/sql/worker/analyze_udtf.py:
##########
@@ -225,6 +226,19 @@ def format_error(msg: str) -> str:
                 write_int(1, outfile)
             else:
                 write_int(2, outfile)
+        # Return the requested amount of execution memory to acquire, if any.
+        write_long(
+            0
+            if result.acquireExecutionMemoryMbRequested is None
+            else result.acquireExecutionMemoryMbRequested,
+            outfile,
+        )
+        write_long(
+            0
+            if result.acquireExecutionMemoryMbActual is None
+            else result.acquireExecutionMemoryMbActual,
+            outfile,
+        )

Review Comment:
   Do we need to send this?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -137,4 +150,46 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
       }
     }
   }
+
+  lazy val memoryConsumer: Option[PythonUDTFMemoryConsumer] = {

Review Comment:
   Why do we need this?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -99,7 +100,19 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
         projection(inputRow)
       }
 
-      val outputRowIterator = evaluate(argMetas, projectedRowIter, schema, context)
+      memoryConsumer.foreach { consumer =>
+        val acquireMemoryMbActual: Long = consumer.acquireMemory()
+        udtf.acquireMemoryMbActual = Some(acquireMemoryMbActual)
+      }
+      val outputRowIterator = try {
+        evaluate(argMetas, projectedRowIter, schema, context)
+      } finally {
+        if (TaskContext.get() != null) {

Review Comment:
   Why do we need this check?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -137,4 +150,46 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
       }
     }
   }
+
+  lazy val memoryConsumer: Option[PythonUDTFMemoryConsumer] = {
+    if (TaskContext.get() != null) {
+      Some(PythonUDTFMemoryConsumer(udtf))
+    } else {
+      None
+    }
+  }
+}
+
+/**
+ * This class takes responsibility to allocate execution memory for UDTF evaluation before it begins
+ * and free the memory after the evaluation is over.
+ *
+ * Background: If the UDTF's 'analyze' method returns an 'AnalyzeResult' with a non-empty
+ * 'acquireExecutionMemoryMb' value, this value represents the amount of memory in MB that
+ * the UDTF should request from each Spark executor that it runs on. Then the UDTF takes
+ * responsibility to use at most this much memory, including all allocated objects. The purpose of
+ * this functionality is to prevent executors from crashing by running out of memory due to the
+ * extra memory consumption invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods.
+ *
+ * In this class, Spark calls 'TaskMemoryManager.acquireExecutionMemory' with the requested number
+ * of MB, and when Spark calls __init__  of the UDTF later, it updates the acquiredExecutionMemory
+ * integer passed into the UDTF constructor to the actual number returned from
+ * 'TaskMemoryManager.acquireExecutionMemory', so the 'eval' and 'terminate' and 'cleanup' methods
+ * know it and can ensure to bound memory usage to at most this number.
+ */
+case class PythonUDTFMemoryConsumer(udtf: PythonUDTF)
+  extends MemoryConsumer(TaskContext.get().taskMemoryManager(), MemoryMode.ON_HEAP) {

Review Comment:
   I'm not sure about this and how this helps to avoid OOM.
   IIUC, `MemoryConsumer` is supposed to manage the JVM memory, and configured as `ON_HEAP`. The on-heap memory should be reserved by JVM and won't be reduced once it's reserved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1456374420


##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
         If non-empty, this is a sequence of expressions that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list must also be non-empty
         in this case.
+    acquireExecutionMemoryMbRequested: long
+        If this is not None, this represents the amount of memory in MB that the UDTF should request
+        from each Spark executor that it runs on. Then the UDTF takes responsibility to use at most
+        this much memory, including all allocated objects. The purpose of this functionality is to
+        prevent executors from crashing by running out of memory due to the extra memory consumption
+        invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods. Spark will then call
+        'TaskMemoryManager.acquireExecutionMemory' with the requested number of MB.
+    acquireExecutionMemoryMbActual: long
+        If there is a task context available, Spark will assign this field to the number of MB
+        returned from the call to the TaskMemoryManager.acquireExecutionMemory' method, as consumed
+        by the UDTF's'__init__' method. Therefore, its 'eval' and 'terminate' and 'cleanup' methods
+        will know it thereafter and can ensure to bound memory usage to at most this number.
+        Note that there is no effect if the UDTF's 'analyze' method assigns a value to this; it will
+        be overwritten.
     """
 
     schema: StructType
     withSinglePartition: bool = False
     partitionBy: Sequence[PartitioningColumn] = field(default_factory=tuple)
     orderBy: Sequence[OrderingColumn] = field(default_factory=tuple)
+    acquireExecutionMemoryMbRequested: Optional[int] = 100

Review Comment:
   I put a short comment that we set a default of 100 MB here, which the UDTF may override to a more accurate number.
   



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -99,7 +100,19 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
         projection(inputRow)
       }
 
-      val outputRowIterator = evaluate(argMetas, projectedRowIter, schema, context)
+      memoryConsumer.foreach { consumer =>
+        val acquireMemoryMbActual: Long = consumer.acquireMemory()
+        udtf.acquireMemoryMbActual = Some(acquireMemoryMbActual)
+      }
+      val outputRowIterator = try {
+        evaluate(argMetas, projectedRowIter, schema, context)
+      } finally {
+        if (TaskContext.get() != null) {
+          memoryConsumer.foreach { consumer =>
+            consumer.freeMemory()
+          }
+        }
+      }

Review Comment:
   Good catch, I didn't see that `evaluate` just returned an iterator instead of actually invoking it :) I moved the memory free step later to after the function actually evaluates.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -137,4 +150,46 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
       }
     }
   }
+
+  lazy val memoryConsumer: Option[PythonUDTFMemoryConsumer] = {

Review Comment:
   This object serves the purpose of acquiring execution memory before evaluating the UDTF. After the evaluation is complete, we call its 'freeMemory' method to release the memory. I added a comment here.
   



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -99,7 +100,19 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
         projection(inputRow)
       }
 
-      val outputRowIterator = evaluate(argMetas, projectedRowIter, schema, context)
+      memoryConsumer.foreach { consumer =>
+        val acquireMemoryMbActual: Long = consumer.acquireMemory()
+        udtf.acquireMemoryMbActual = Some(acquireMemoryMbActual)
+      }
+      val outputRowIterator = try {
+        evaluate(argMetas, projectedRowIter, schema, context)
+      } finally {
+        if (TaskContext.get() != null) {

Review Comment:
   This was left-over from before I moved the check into the `memoryConsumer` object itself:
   
   ```
     lazy val memoryConsumer: Option[PythonUDTFMemoryConsumer] = {
       if (TaskContext.get() != null) {
         Some(PythonUDTFMemoryConsumer(udtf))
       } else {
         None
       }
     }
   ```
   
   I removed this check here now.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -137,4 +150,46 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
       }
     }
   }
+
+  lazy val memoryConsumer: Option[PythonUDTFMemoryConsumer] = {
+    if (TaskContext.get() != null) {
+      Some(PythonUDTFMemoryConsumer(udtf))
+    } else {
+      None
+    }
+  }
+}
+
+/**
+ * This class takes responsibility to allocate execution memory for UDTF evaluation before it begins
+ * and free the memory after the evaluation is over.
+ *
+ * Background: If the UDTF's 'analyze' method returns an 'AnalyzeResult' with a non-empty
+ * 'acquireExecutionMemoryMb' value, this value represents the amount of memory in MB that
+ * the UDTF should request from each Spark executor that it runs on. Then the UDTF takes
+ * responsibility to use at most this much memory, including all allocated objects. The purpose of
+ * this functionality is to prevent executors from crashing by running out of memory due to the
+ * extra memory consumption invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods.
+ *
+ * In this class, Spark calls 'TaskMemoryManager.acquireExecutionMemory' with the requested number
+ * of MB, and when Spark calls __init__  of the UDTF later, it updates the acquiredExecutionMemory
+ * integer passed into the UDTF constructor to the actual number returned from
+ * 'TaskMemoryManager.acquireExecutionMemory', so the 'eval' and 'terminate' and 'cleanup' methods
+ * know it and can ensure to bound memory usage to at most this number.
+ */
+case class PythonUDTFMemoryConsumer(udtf: PythonUDTF)
+  extends MemoryConsumer(TaskContext.get().taskMemoryManager(), MemoryMode.ON_HEAP) {

Review Comment:
   My mistake, I should have written `MemoryMode.OFF_HEAP` here to indicate that the memory consumed by the Python subprocess is outside the JVM. Fixed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for 'eval' and 'terminate' methods [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #44678:
URL: https://github.com/apache/spark/pull/44678#issuecomment-1886024403

   cc @ueshin @allisonwang-db 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #44678:
URL: https://github.com/apache/spark/pull/44678#issuecomment-1899314211

   After offline discussion, I am going to close this PR since the memory PySpark uses is off-heap which as @ueshin points out is not the same memory as the on-heap memory generally tracked by the `TaskMemoryManager`. We can always reopen it later if we want to revive the approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "nickstanishadb (via GitHub)" <gi...@apache.org>.
nickstanishadb commented on PR #44678:
URL: https://github.com/apache/spark/pull/44678#issuecomment-1887363721

   @dtenedor I did some dumb memory profiling running the following UDTF
   
   ```python
   from pyspark.sql.functions import udtf
   import resource
   
   @udtf(returnType="step: int, memory: int")
   class SimpleUDTF:
       def __init__(self, *args, **kwargs):
           self.step_id = 0
       
       @staticmethod
       def get_peak_memory_usage_kb() -> int:
           return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
   
       def eval(self, *args, **kwargs):
           yield self.step_id, self.get_peak_memory_usage_kb()
           self.step_id += 1
   
       def terminate(self, *args, **kwargs):
           yield self.step_id, self.get_peak_memory_usage_kb()
   
   spark.udtf.register("pyUdtfMemProfile", SimpleUDTF)
   ```
   
   I'm not entirely confident in the results because I noticed running a high memory UDTF before the profiling UDTF would increase the peak memory usage (I guess they share a process?). But on a fresh 14.2 cluster I ran this and got max memory usage across 100 UDTFs of `45440 KB`, which seems reasonable. Peak memory usage of my vanilla conda distribution on my mac is 16400 KB and it makes sense that UDTF framework would need extra stuff loaded into memory.
   
   What do you think about designing UDTFs so that we take a save overestimate of this (say 50000 KB) as the memory floor handled entirely by Scala (i.e. the UDTF framework will always retry if it can't get at least this amount of memory). Then in the user-provided part of the code, users can specify how much _**additional memory**_ they would like to request just for holding their class instance attributes (e.g. for AI_FORECAST, just the size of the training data).
   
   If this sounds reasonable to you, we could also add some unit tests making sure that the no-modifications Py UDTF memory requirements don't exceed 50KB


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "nickstanishadb (via GitHub)" <gi...@apache.org>.
nickstanishadb commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1449007899


##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
         If non-empty, this is a sequence of expressions that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list must also be non-empty
         in this case.
+    acquireExecutionMemoryMbRequested: long
+        If this is not None, this represents the amount of memory in megabytes that the UDTF should
+        request from each Spark executor that it runs on. Then the UDTF takes responsibility to use
+        at most this much memory, including all allocated objects. The purpose of this functionality
+        is to prevent executors from crashing by running out of memory due to the extra memory
+        consumption invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods. Spark will
+        then call 'TaskMemoryManager.acquireExecutionMemory' with the requested number of megabytes.
+    acquireExecutionMemoryMbActual: long
+        If there is a task context available, Spark will assign this field to the number of
+        megabytes returned from the call to the TaskMemoryManager.acquireExecutionMemory' method, as
+        consumed by the UDTF's'__init__' method. Therefore, its 'eval' and 'terminate' and 'cleanup'
+        methods will know it thereafter and can ensure to bound memory usage to at most this number.
+        Note that there is no effect if the UDTF's 'analyze' method assigns a value to this; it will
+        be overwritten.
     """

Review Comment:
   @dtenedor I did some dumb memory profiling running the following UDTF
   
   ```python
   from pyspark.sql.functions import udtf
   import resource
   
   @udtf(returnType="step: int, memory: int")
   class SimpleUDTF:
       def __init__(self, *args, **kwargs):
           self.step_id = 0
       
       @staticmethod
       def get_peak_memory_usage_kb() -> int:
           return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
   
       def eval(self, *args, **kwargs):
           yield self.step_id, self.get_peak_memory_usage_kb()
           self.step_id += 1
   
       def terminate(self, *args, **kwargs):
           yield self.step_id, self.get_peak_memory_usage_kb()
   
   spark.udtf.register("pyUdtfMemProfile", SimpleUDTF)
   ```
   
   I'm not entirely confident in the results because I noticed running a high memory UDTF before the profiling UDTF would increase the peak memory usage (I guess they share a process?). But on a fresh 14.2 cluster I ran this and got max memory usage across 100 pyUdtfMemProfile invocations of `45440 KB`, which seems reasonable.
   
   Is this memory already accounted for / pre-allocated on the executor? It's not clear to me if a user should request `processSize + additionalClassMemory` or just `additionalClassMemory` when they're specifying `acquireExecutionMemoryMbRequested` and `minMemoryMb`. If it's the total then it seems like `minMemoryMb` should always be greater than this threshold



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor closed pull request #44678: [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation
URL: https://github.com/apache/spark/pull/44678


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "nickstanishadb (via GitHub)" <gi...@apache.org>.
nickstanishadb commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1449007899


##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
         If non-empty, this is a sequence of expressions that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list must also be non-empty
         in this case.
+    acquireExecutionMemoryMbRequested: long
+        If this is not None, this represents the amount of memory in megabytes that the UDTF should
+        request from each Spark executor that it runs on. Then the UDTF takes responsibility to use
+        at most this much memory, including all allocated objects. The purpose of this functionality
+        is to prevent executors from crashing by running out of memory due to the extra memory
+        consumption invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods. Spark will
+        then call 'TaskMemoryManager.acquireExecutionMemory' with the requested number of megabytes.
+    acquireExecutionMemoryMbActual: long
+        If there is a task context available, Spark will assign this field to the number of
+        megabytes returned from the call to the TaskMemoryManager.acquireExecutionMemory' method, as
+        consumed by the UDTF's'__init__' method. Therefore, its 'eval' and 'terminate' and 'cleanup'
+        methods will know it thereafter and can ensure to bound memory usage to at most this number.
+        Note that there is no effect if the UDTF's 'analyze' method assigns a value to this; it will
+        be overwritten.
     """

Review Comment:
   @dtenedor I did some dumb memory profiling running the following UDTF
   
   ```python
   from pyspark.sql.functions import udtf
   import resource
   
   @udtf(returnType="step: int, memory: int")
   class SimpleUDTF:
       def __init__(self, *args, **kwargs):
           self.step_id = 0
       
       @staticmethod
       def get_peak_memory_usage_kb() -> int:
           return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
   
       def eval(self, *args, **kwargs):
           yield self.step_id, self.get_peak_memory_usage_kb()
           self.step_id += 1
   
       def terminate(self, *args, **kwargs):
           yield self.step_id, self.get_peak_memory_usage_kb()
   
   spark.udtf.register("pyUdtfMemProfile", SimpleUDTF)
   ```
   
   I'm not entirely confident in the results because I noticed running a high memory UDTF before the profiling UDTF would increase the peak memory usage (I guess they share a process?). But on a fresh 14.2 cluster I ran this and got max memory usage across 100 pyUdtfMemProfile invocations of `45440 KB`, which seems reasonable.
   
   Is this memory already accounted for / pre-allocated on the executor? It's not clear to me if a user should request `processSize + additionalClassMemory` or just `additionalClassMemory` when they're specifying `acquireExecutionMemoryMbRequested` and `minMemoryMb`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1454199865


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -137,4 +150,46 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
       }
     }
   }
+
+  lazy val memoryConsumer: Option[PythonUDTFMemoryConsumer] = {
+    if (TaskContext.get() != null) {
+      Some(PythonUDTFMemoryConsumer(udtf))
+    } else {
+      None
+    }
+  }
+}
+
+/**
+ * This class takes responsibility to allocate execution memory for UDTF evaluation before it begins
+ * and free the memory after the evaluation is over.
+ *
+ * Background: If the UDTF's 'analyze' method returns an 'AnalyzeResult' with a non-empty
+ * 'acquireExecutionMemoryMb' value, this value represents the amount of memory in megabytes that
+ * the UDTF should request from each Spark executor that it runs on. Then the UDTF takes
+ * responsibility to use at most this much memory, including all allocated objects. The purpose of
+ * this functionality is to prevent executors from crashing by running out of memory due to the
+ * extra memory consumption invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods.
+ *
+ * In this class, Spark calls 'TaskMemoryManager.acquireExecutionMemory' with the requested number
+ * of megabytes, and when Spark calls __init__  of the UDTF later, it updates the
+ * acquiredExecutionMemory integer passed into the UDTF constructor to the actual number returned
+ * from 'TaskMemoryManager.acquireExecutionMemory', so the 'eval' and 'terminate' and 'cleanup'
+ * methods know it and can ensure to bound memory usage to at most this number.
+ */
+case class PythonUDTFMemoryConsumer(udtf: PythonUDTF)
+  extends MemoryConsumer(TaskContext.get().taskMemoryManager(), MemoryMode.ON_HEAP) {
+  private val BYTES_PER_MEGABYTE = 1024 * 1024

Review Comment:
   Done.



##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
         If non-empty, this is a sequence of expressions that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list must also be non-empty
         in this case.
+    acquireExecutionMemoryMbRequested: long
+        If this is not None, this represents the amount of memory in megabytes that the UDTF should
+        request from each Spark executor that it runs on. Then the UDTF takes responsibility to use
+        at most this much memory, including all allocated objects. The purpose of this functionality
+        is to prevent executors from crashing by running out of memory due to the extra memory
+        consumption invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods. Spark will
+        then call 'TaskMemoryManager.acquireExecutionMemory' with the requested number of megabytes.
+    acquireExecutionMemoryMbActual: long
+        If there is a task context available, Spark will assign this field to the number of
+        megabytes returned from the call to the TaskMemoryManager.acquireExecutionMemory' method, as
+        consumed by the UDTF's'__init__' method. Therefore, its 'eval' and 'terminate' and 'cleanup'
+        methods will know it thereafter and can ensure to bound memory usage to at most this number.
+        Note that there is no effect if the UDTF's 'analyze' method assigns a value to this; it will
+        be overwritten.
     """

Review Comment:
   Sounds good, I added a default of 100MB, which any UDTF can (and probably should) override to a more specific number.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1456395139


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -137,4 +150,46 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
       }
     }
   }
+
+  lazy val memoryConsumer: Option[PythonUDTFMemoryConsumer] = {
+    if (TaskContext.get() != null) {
+      Some(PythonUDTFMemoryConsumer(udtf))
+    } else {
+      None
+    }
+  }
+}
+
+/**
+ * This class takes responsibility to allocate execution memory for UDTF evaluation before it begins
+ * and free the memory after the evaluation is over.
+ *
+ * Background: If the UDTF's 'analyze' method returns an 'AnalyzeResult' with a non-empty
+ * 'acquireExecutionMemoryMb' value, this value represents the amount of memory in MB that
+ * the UDTF should request from each Spark executor that it runs on. Then the UDTF takes
+ * responsibility to use at most this much memory, including all allocated objects. The purpose of
+ * this functionality is to prevent executors from crashing by running out of memory due to the
+ * extra memory consumption invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods.
+ *
+ * In this class, Spark calls 'TaskMemoryManager.acquireExecutionMemory' with the requested number
+ * of MB, and when Spark calls __init__  of the UDTF later, it updates the acquiredExecutionMemory
+ * integer passed into the UDTF constructor to the actual number returned from
+ * 'TaskMemoryManager.acquireExecutionMemory', so the 'eval' and 'terminate' and 'cleanup' methods
+ * know it and can ensure to bound memory usage to at most this number.
+ */
+case class PythonUDTFMemoryConsumer(udtf: PythonUDTF)
+  extends MemoryConsumer(TaskContext.get().taskMemoryManager(), MemoryMode.ON_HEAP) {

Review Comment:
   Good question. The idea is that while the memory consumed by Python is not on the JVM, making such a reservation nevertheless prevents the executor from accepting new workload while the Python is running, since both the on-heap and off-heap memory contribute to the total memory consumed. Let me know what you think about this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "benhurdelhey (via GitHub)" <gi...@apache.org>.
benhurdelhey commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1452538115


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonUDTFExec.scala:
##########
@@ -137,4 +150,46 @@ trait EvalPythonUDTFExec extends UnaryExecNode {
       }
     }
   }
+
+  lazy val memoryConsumer: Option[PythonUDTFMemoryConsumer] = {
+    if (TaskContext.get() != null) {
+      Some(PythonUDTFMemoryConsumer(udtf))
+    } else {
+      None
+    }
+  }
+}
+
+/**
+ * This class takes responsibility to allocate execution memory for UDTF evaluation before it begins
+ * and free the memory after the evaluation is over.
+ *
+ * Background: If the UDTF's 'analyze' method returns an 'AnalyzeResult' with a non-empty
+ * 'acquireExecutionMemoryMb' value, this value represents the amount of memory in megabytes that
+ * the UDTF should request from each Spark executor that it runs on. Then the UDTF takes
+ * responsibility to use at most this much memory, including all allocated objects. The purpose of
+ * this functionality is to prevent executors from crashing by running out of memory due to the
+ * extra memory consumption invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods.
+ *
+ * In this class, Spark calls 'TaskMemoryManager.acquireExecutionMemory' with the requested number
+ * of megabytes, and when Spark calls __init__  of the UDTF later, it updates the
+ * acquiredExecutionMemory integer passed into the UDTF constructor to the actual number returned
+ * from 'TaskMemoryManager.acquireExecutionMemory', so the 'eval' and 'terminate' and 'cleanup'
+ * methods know it and can ensure to bound memory usage to at most this number.
+ */
+case class PythonUDTFMemoryConsumer(udtf: PythonUDTF)
+  extends MemoryConsumer(TaskContext.get().taskMemoryManager(), MemoryMode.ON_HEAP) {
+  private val BYTES_PER_MEGABYTE = 1024 * 1024

Review Comment:
   let's decide for either megabyte (then use 1000 * 1000 here) or call it mebibyte and change it to MiB everywhere :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1456357135


##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
         If non-empty, this is a sequence of expressions that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list must also be non-empty
         in this case.
+    acquireExecutionMemoryMbRequested: long
+        If this is not None, this represents the amount of memory in MB that the UDTF should request
+        from each Spark executor that it runs on. Then the UDTF takes responsibility to use at most
+        this much memory, including all allocated objects. The purpose of this functionality is to
+        prevent executors from crashing by running out of memory due to the extra memory consumption
+        invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods. Spark will then call
+        'TaskMemoryManager.acquireExecutionMemory' with the requested number of MB.
+    acquireExecutionMemoryMbActual: long
+        If there is a task context available, Spark will assign this field to the number of MB
+        returned from the call to the TaskMemoryManager.acquireExecutionMemory' method, as consumed
+        by the UDTF's'__init__' method. Therefore, its 'eval' and 'terminate' and 'cleanup' methods
+        will know it thereafter and can ensure to bound memory usage to at most this number.
+        Note that there is no effect if the UDTF's 'analyze' method assigns a value to this; it will
+        be overwritten.
     """
 
     schema: StructType
     withSinglePartition: bool = False
     partitionBy: Sequence[PartitioningColumn] = field(default_factory=tuple)
     orderBy: Sequence[OrderingColumn] = field(default_factory=tuple)
+    acquireExecutionMemoryMbRequested: Optional[int] = 100

Review Comment:
   ![image](https://github.com/apache/spark/assets/99207096/10e67a8a-04d4-4465-955f-3bee7fbad2ba)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1448491901


##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
         If non-empty, this is a sequence of expressions that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list must also be non-empty
         in this case.
+    acquireExecutionMemoryMbRequested: long

Review Comment:
   Hmm how should user determine this without knowing the spark cluster size?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46638][Python] Create Python UDTF API to acquire execution memory for function evaluation [spark]

Posted by "nickstanishadb (via GitHub)" <gi...@apache.org>.
nickstanishadb commented on code in PR #44678:
URL: https://github.com/apache/spark/pull/44678#discussion_r1453648020


##########
python/pyspark/sql/udtf.py:
##########
@@ -133,12 +133,28 @@ class AnalyzeResult:
         If non-empty, this is a sequence of expressions that the UDTF is specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list must also be non-empty
         in this case.
+    acquireExecutionMemoryMbRequested: long
+        If this is not None, this represents the amount of memory in megabytes that the UDTF should
+        request from each Spark executor that it runs on. Then the UDTF takes responsibility to use
+        at most this much memory, including all allocated objects. The purpose of this functionality
+        is to prevent executors from crashing by running out of memory due to the extra memory
+        consumption invoked by the UDTF's 'eval' and 'terminate' and 'cleanup' methods. Spark will
+        then call 'TaskMemoryManager.acquireExecutionMemory' with the requested number of megabytes.
+    acquireExecutionMemoryMbActual: long
+        If there is a task context available, Spark will assign this field to the number of
+        megabytes returned from the call to the TaskMemoryManager.acquireExecutionMemory' method, as
+        consumed by the UDTF's'__init__' method. Therefore, its 'eval' and 'terminate' and 'cleanup'
+        methods will know it thereafter and can ensure to bound memory usage to at most this number.
+        Note that there is no effect if the UDTF's 'analyze' method assigns a value to this; it will
+        be overwritten.
     """

Review Comment:
   Makes sense! What do you think about this being difficult to set, especially for UDTF developers? If you think the test I did with `pyUdtfMemProfile` is a reasonable estimate, what do we think of setting a global `minMemoryMb` to something like 100MB? I think that could make the manual memory assignment less prone to user error. It's probably also good to have a floor on some level that way the number of UDTFs simultaneously running on an executor has a hard ceiling



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org