You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/04/02 00:28:00 UTC

[jira] [Comment Edited] (SPARK-27330) ForeachWriter is not being closed once a batch is aborted

    [ https://issues.apache.org/jira/browse/SPARK-27330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806654#comment-16806654 ] 

Hyukjin Kwon edited comment on SPARK-27330 at 4/2/19 12:27 AM:
---------------------------------------------------------------

I have updated the description, eventually i have tasks which are being killed without calling the "close" method and due to that i have connections which are never closed which lead to a leak in my connection pool.

from the docs:

{code}
streamingDatasetOfString.writeStream.foreach( new ForeachWriter[String] {
  def open(partitionId: Long, version: Long): Boolean = {
    // Open connection
  }

  def process(record: String): Unit = {
    // Write string to connection
  }

  def close(errorOrNull: Throwable): Unit = {
    // Close the connection
  }}).start()
{code}

 


was (Author: eyalzit):
I have updated the description, eventually i have tasks which are being killed without calling the "close" method and due to that i have connections which are never closed which lead to a leak in my connection pool.

from the docs:
{{streamingDatasetOfString.writeStream.foreach( new ForeachWriter[String] { }}

{{def open(partitionId: Long, version: Long): Boolean = {}}

{{ // Open connection}}

{{ } }}

{{def process(record: String): Unit = { }}

{{// Write string to connection}}

{{ } }}

{{def close(errorOrNull: Throwable): Unit = { }}

{{// Close the connection}}

{{ } } ).start()}}

 

> ForeachWriter is not being closed once a batch is aborted
> ---------------------------------------------------------
>
>                 Key: SPARK-27330
>                 URL: https://issues.apache.org/jira/browse/SPARK-27330
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Eyal Zituny
>            Priority: Major
>
> in cases where a micro batch is being killed (interrupted), not during actual processing done by the ForeachDataWriter (when iterating the iterator), DataWritingSparkTask will handle the interruption and call  dataWriter.abort()
> the problem is that ForeachDataWriter has an empty implementation for the abort method.
> due to that, I have tasks which uses the foreach writer and according to the [documentation|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach] they are opening connections in the "open" method and closing the connections on the "close" method but since the "close" is never called, the connections are never closed
> this wasn't the behavior pre spark 2.4
> my suggestion is to call ForeachWriter.close() when DataWriter.abort() is called, and exception should also be provided in order to notify the foreach writer that this task has failed
>  
> stack trace from the exception i have encountered:
>  org.apache.spark.TaskKilledException: null
>  at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149)
>  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>  at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
>  at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
>  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
>  at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
>  at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
>  at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org