You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:02:34 UTC

[jira] [Updated] (SPARK-19738) Consider adding error handler to DataStreamWriter

     [ https://issues.apache.org/jira/browse/SPARK-19738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-19738:
---------------------------------
    Labels: bulk-closed  (was: )

> Consider adding error handler to DataStreamWriter
> -------------------------------------------------
>
>                 Key: SPARK-19738
>                 URL: https://issues.apache.org/jira/browse/SPARK-19738
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.1.0
>            Reporter: Jayesh lalwani
>            Priority: Major
>              Labels: bulk-closed
>
> For Structured streaming implementations, it is important that the applications stay always On. However, right now, errors stop the driver. In some cases, this is not desirable behavior. For example, I have the following application
> {code}
> import org.apache.spark.sql.types._
> val userSchema = new StructType().add("name", "string").add("age", "integer")
> val csvDF = spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
> csvDF.writeStream.format("console").start()
> {code}
> I send the following input to it 
> {quote}
> 1,Iron man
> 2,SUperman
> {quote}
> Obviously, the data is bad. This causes the executor to throw an exception that propogates to the driver, which promptly shuts down. The driver is running in supervised mode, and it gets restarted. The application reads the same bad input and shuts down again. This goes ad-infinitum. This behavior is desirable, in cases, the error is recoverable. For example, if the executor cannot talk to the database, we want the application to keep trying the same input again and again till the database recovers. However, for some cases, this behavior is undesirable. We do not want this to happen when the input is bad. We want to put the bad record in some sort of dead letter queue. Or maybe we want to kill the driver only when the number of errors have crossed a certain threshold. Or maybe we want to email someone.
> Proposal:
> Add a error handler to the data stream. When the executor fails, it should call the error handler and pass the Exception to the error handler. The error handler could eat the exception, or transform it, or update counts in an accumulator, etc
>  {code}
> import org.apache.spark.sql.types._
> val userSchema = new StructType().add("name", "string").add("age", "integer")
> val csvDF = spark.readStream.schema(userSchema).csv("s3://bucket/jayesh/streamingerror/")
> csvDF.writeStream.format("console").errorhandler("com.jayesh.ErrorHandler").start()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org