You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2019/08/23 14:53:00 UTC
[jira] [Assigned] (SPARK-27330) ForeachWriter is not being closed
once a batch is aborted
[ https://issues.apache.org/jira/browse/SPARK-27330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun reassigned SPARK-27330:
-------------------------------------
Assignee: Eyal Zituny
> ForeachWriter is not being closed once a batch is aborted
> ---------------------------------------------------------
>
> Key: SPARK-27330
> URL: https://issues.apache.org/jira/browse/SPARK-27330
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.4.0
> Reporter: Eyal Zituny
> Assignee: Eyal Zituny
> Priority: Major
> Fix For: 2.4.4, 3.0.0
>
>
> in cases where a micro batch is being killed (interrupted), not during actual processing done by the {{ForeachDataWriter}} (when iterating the iterator), {{DataWritingSparkTask}} will handle the interruption and call {{dataWriter.abort()}}
> the problem is that {{ForeachDataWriter}} has an empty implementation for the abort method.
> due to that, I have tasks which uses the foreach writer and according to the [documentation|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach] they are opening connections in the "open" method and closing the connections on the "close" method but since the "close" is never called, the connections are never closed
> this wasn't the behavior pre spark 2.4
> my suggestion is to call {{ForeachWriter.abort()}} when {{DataWriter.abort()}} is called, in order to notify the foreach writer that this task has failed
>
> {code:java}
> stack trace from the exception i have encountered:
> org.apache.spark.TaskKilledException: null
> at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149)
> at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
> at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
> at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
> at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
> at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
> at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.2#803003)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org