You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "xieshuaihu (via GitHub)" <gi...@apache.org> on 2023/11/09 08:34:37 UTC

[PR] [SPARK-45814][CONNECT][SQL] Make ArrowConverters.createEmptyArrowBatch call close() to avoid memory leak [spark]

xieshuaihu opened a new pull request, #43728:
URL: https://github.com/apache/spark/pull/43728

   ### What changes were proposed in this pull request?
   
   Make `ArrowBatchIterator` implement `AutoCloseable` and `ArrowConverters.createEmptyArrowBatch()` call close() to avoid memory leak.
   
   ### Why are the changes needed?
   
   `ArrowConverters.createEmptyArrowBatch` don't call `super.hasNext`, if `TaskContext.get` returns `None`, then memory allocated in `ArrowBatchIterator` is leaked.
   
   In spark connect, `createEmptyArrowBatch` is called in [SparkConnectPlanner](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L2558) and [SparkConnectPlanExecution](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala#L224), which cause a long running driver consume all off-heap memory specified by `-XX:MaxDirectMemorySize`.
   
   This is the exception stack:
   ```
   org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
   	at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67)
   	at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
   	at org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
   	at org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
   	at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:354)
   	at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:349)
   	at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:337)
   	at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:315)
   	at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:279)
   	at org.apache.arrow.vector.BaseValueVector.allocFixedDataAndValidityBufs(BaseValueVector.java:192)
   	at org.apache.arrow.vector.BaseFixedWidthVector.allocateBytes(BaseFixedWidthVector.java:338)
   	at org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:308)
   	at org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:273)
   	at org.apache.spark.sql.execution.arrow.ArrowWriter$.$anonfun$create$1(ArrowWriter.scala:44)
   	at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
   	at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
   	at scala.collection.convert.JavaCollectionWrappers$JListWrapper.map(JavaCollectionWrappers.scala:103)
   	at org.apache.spark.sql.execution.arrow.ArrowWriter$.create(ArrowWriter.scala:43)
   	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.<init>(ArrowConverters.scala:93)
   	at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.<init>(ArrowConverters.scala:138)
   	at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.<init>(ArrowConverters.scala:231)
   	at org.apache.spark.sql.execution.arrow.ArrowConverters$.createEmptyArrowBatch(ArrowConverters.scala:229)
   	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2481)
   	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2426)
   	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202)
   	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158)
   	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132)
   	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:189)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
   	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:189)
   	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
   	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:176)
   	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:178)
   	at org.apache.spark.sql.connect.service.SessionHolder.withContextClassLoader(SessionHolder.scala:175)
   	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:188)
   	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:132)
   	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:84)
   	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:228)
   Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 4194304 byte(s) of direct memory (used: 1069547799, max: 1073741824)
   	at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:845)
   	at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:774)
   	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:721)
   	at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:696)
   	at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:215)
   	at io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:180)
   	at io.netty.buffer.PoolArena.allocate(PoolArena.java:137)
   	at io.netty.buffer.PoolArena.allocate(PoolArena.java:129)
   	at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:181)
   	at io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:214)
   	at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
   	... 37 more
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Manually test
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45814][CONNECT][SQL][3.4] Make ArrowConverters.createEmptyArrowBatch call close() to avoid memory leak [spark]

Posted by "xieshuaihu (via GitHub)" <gi...@apache.org>.
xieshuaihu commented on PR #43728:
URL: https://github.com/apache/spark/pull/43728#issuecomment-1804973799

   @LuciferYang All ut passed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45814][CONNECT][SQL][3.4] Make ArrowConverters.createEmptyArrowBatch call close() to avoid memory leak [spark]

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on PR #43728:
URL: https://github.com/apache/spark/pull/43728#issuecomment-1805072132

   Merged into branch-3.4. Thanks @xieshuaihu 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45814][CONNECT][SQL] Make ArrowConverters.createEmptyArrowBatch call close() to avoid memory leak [spark]

Posted by "xieshuaihu (via GitHub)" <gi...@apache.org>.
xieshuaihu commented on PR #43728:
URL: https://github.com/apache/spark/pull/43728#issuecomment-1803373987

   @LuciferYang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45814][CONNECT][SQL][3.4] Make ArrowConverters.createEmptyArrowBatch call close() to avoid memory leak [spark]

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang closed pull request #43728: [SPARK-45814][CONNECT][SQL][3.4] Make ArrowConverters.createEmptyArrowBatch call close() to avoid memory leak
URL: https://github.com/apache/spark/pull/43728


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org