You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by M Singh <ma...@yahoo.com.INVALID> on 2018/01/03 17:53:57 UTC

Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

Hi:
The documentation for Sink.addBatch is as follows:
  /**   * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if   * this method is called more than once with the same batchId (which will happen in the case of   * failures), then `data` should only be added once.   *   * Note 1: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`).   * Otherwise, you may get a wrong result.   *   * Note 2: The method is supposed to be executed synchronously, i.e. the method should only return   * after data is consumed by sink successfully.   */  def addBatch(batchId: Long, data: DataFrame): Unit
A few questions about the data is each DataFrame passed as the argument to addBatch - 1. Is it all the data in a partition for each trigger or is it all the data in that trigger ?  2. Is there a way to control the size in each addBatch invocation to make sure that we don't run into OOM exception on the executor while calling collect ?
Thanks

Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

Posted by M Singh <ma...@yahoo.com.INVALID>.
Hi Jacek:

The javadoc mentions that we can only consume data from the data frame in the addBatch method.  So, if I would like to save the data to a new sink then I believe that I will need to collect the data and then save it.  This is the reason I am asking about how to control the size of the data in each invocation of the addBatch method.  Let me know if I am interpreting the javadoc incorrectly.  Here it is:
/**   * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if   * this method is called more than once with the same batchId (which will happen in the case of   * failures), then `data` should only be added once.   *   * Note 1: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`).   * Otherwise, you may get a wrong result.   *   * Note 2: The method is supposed to be executed synchronously, i.e. the method should only return   * after data is consumed by sink successfully.   */  def addBatch(batchId: Long, data: DataFrame): Unit


Thanks
Mans  

    On Thursday, January 4, 2018 2:19 PM, Jacek Laskowski <ja...@japila.pl> wrote:
 

 Hi,
> If the data is very large then a collect may result in OOM.
That's a general case even in any part of Spark, incl. Spark Structured Streaming. Why would you collect in addBatch? It's on the driver side and as anything on the driver, it's a single JVM (and usually not fault tolerant)
> Do you have any other suggestion/recommendation ?
What's wrong with the current solution? I don't think you should change how you do things currently. You should just avoid collect on large datasets (which you have to do anywhere in Spark).
Pozdrawiam,Jacek Laskowski----https://about.me/JacekLaskowskiMastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Thu, Jan 4, 2018 at 10:49 PM, M Singh <ma...@yahoo.com.invalid> wrote:

Thanks Tathagata for your answer.
The reason I was asking about controlling data size is that the javadoc indicate you can use foreach or collect on the dataframe.  If the data is very large then a collect may result in OOM.
From your answer it appears that the only way to control the size (in 2.2) would be control the trigger interval. However, in my case, I have to dedup the elements in one minute interval, which I am using a trigger interval and cannot reduce it.  Do you have any other suggestion/recommendation ?
Also, do you have any timeline for the availability of DataSourceV2/Spark 2.3 ?
Thanks again. 

    On Wednesday, January 3, 2018 2:27 PM, Tathagata Das <ta...@gmail.com> wrote:
 

 1. It is all the result data in that trigger. Note that it takes a DataFrame which is a purely logical representation of data and has no association with partitions, etc. which are physical representations.
2. If you want to limit the amount of data that is processed in a trigger, then you should either control the trigger interval or use the rate limit options on sources that support it (e.g. for kafka, you can use the option "maxOffsetsPerTrigger", see the guide).
Related note, these APIs are subject to change. In fact in the upcoming release 2.3, we are adding a DataSource V2 API for batch/microbatch-streaming/ continuous-streaming sources and sinks.
On Wed, Jan 3, 2018 at 11:23 PM, M Singh <ma...@yahoo.com.invalid> wrote:

Hi:
The documentation for Sink.addBatch is as follows:
  /**   * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if   * this method is called more than once with the same batchId (which will happen in the case of   * failures), then `data` should only be added once.   *   * Note 1: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`).   * Otherwise, you may get a wrong result.   *   * Note 2: The method is supposed to be executed synchronously, i.e. the method should only return   * after data is consumed by sink successfully.   */  def addBatch(batchId: Long, data: DataFrame): Unit
A few questions about the data is each DataFrame passed as the argument to addBatch - 1. Is it all the data in a partition for each trigger or is it all the data in that trigger ?  2. Is there a way to control the size in each addBatch invocation to make sure that we don't run into OOM exception on the executor while calling collect ?
Thanks



   



   

Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi,

> If the data is very large then a collect may result in OOM.

That's a general case even in any part of Spark, incl. Spark Structured
Streaming. Why would you collect in addBatch? It's on the driver side and
as anything on the driver, it's a single JVM (and usually not fault
tolerant)

> Do you have any other suggestion/recommendation ?

What's wrong with the current solution? I don't think you should change how
you do things currently. You should just avoid collect on large datasets
(which you have to do anywhere in Spark).

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Thu, Jan 4, 2018 at 10:49 PM, M Singh <ma...@yahoo.com.invalid>
wrote:

> Thanks Tathagata for your answer.
>
> The reason I was asking about controlling data size is that the javadoc
> indicate you can use foreach or collect on the dataframe.  If the data is
> very large then a collect may result in OOM.
>
> From your answer it appears that the only way to control the size (in 2.2)
> would be control the trigger interval. However, in my case, I have to dedup
> the elements in one minute interval, which I am using a trigger interval
> and cannot reduce it.  Do you have any other suggestion/recommendation ?
>
> Also, do you have any timeline for the availability of DataSourceV2/Spark
> 2.3 ?
>
> Thanks again.
>
>
> On Wednesday, January 3, 2018 2:27 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>
> 1. It is all the result data in that trigger. Note that it takes a
> DataFrame which is a purely logical representation of data and has no
> association with partitions, etc. which are physical representations.
>
> 2. If you want to limit the amount of data that is processed in a trigger,
> then you should either control the trigger interval or use the rate limit
> options on sources that support it (e.g. for kafka, you can use the option
> "maxOffsetsPerTrigger", see the guide
> <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html>
> ).
>
> Related note, these APIs are subject to change. In fact in the upcoming
> release 2.3, we are adding a DataSource V2 API for
> batch/microbatch-streaming/continuous-streaming sources and sinks.
>
> On Wed, Jan 3, 2018 at 11:23 PM, M Singh <ma...@yahoo.com.invalid>
> wrote:
>
> Hi:
>
> The documentation for Sink.addBatch is as follows:
>
>   /**
>    * Adds a batch of data to this sink. The data for a given `batchId` is
> deterministic and if
>    * this method is called more than once with the same batchId (which
> will happen in the case of
>    * failures), then `data` should only be added once.
>    *
>    * Note 1: You cannot apply any operators on `data` except consuming it
> (e.g., `collect/foreach`).
>    * Otherwise, you may get a wrong result.
>    *
>    * Note 2: The method is supposed to be executed synchronously, i.e.
> the method should only return
>    * after data is consumed by sink successfully.
>    */
>   def addBatch(batchId: Long, data: DataFrame): Unit
>
> A few questions about the data is each DataFrame passed as the argument to
> addBatch -
> 1. Is it all the data in a partition for each trigger or is it all the
> data in that trigger ?
> 2. Is there a way to control the size in each addBatch invocation to make
> sure that we don't run into OOM exception on the executor while calling
> collect ?
>
> Thanks
>
>
>
>
>

Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

Posted by M Singh <ma...@yahoo.com.INVALID>.
Thanks Tathagata for your answer.
The reason I was asking about controlling data size is that the javadoc indicate you can use foreach or collect on the dataframe.  If the data is very large then a collect may result in OOM.
From your answer it appears that the only way to control the size (in 2.2) would be control the trigger interval. However, in my case, I have to dedup the elements in one minute interval, which I am using a trigger interval and cannot reduce it.  Do you have any other suggestion/recommendation ?
Also, do you have any timeline for the availability of DataSourceV2/Spark 2.3 ?
Thanks again. 

    On Wednesday, January 3, 2018 2:27 PM, Tathagata Das <ta...@gmail.com> wrote:
 

 1. It is all the result data in that trigger. Note that it takes a DataFrame which is a purely logical representation of data and has no association with partitions, etc. which are physical representations.
2. If you want to limit the amount of data that is processed in a trigger, then you should either control the trigger interval or use the rate limit options on sources that support it (e.g. for kafka, you can use the option "maxOffsetsPerTrigger", see the guide).
Related note, these APIs are subject to change. In fact in the upcoming release 2.3, we are adding a DataSource V2 API for batch/microbatch-streaming/continuous-streaming sources and sinks.
On Wed, Jan 3, 2018 at 11:23 PM, M Singh <ma...@yahoo.com.invalid> wrote:

Hi:
The documentation for Sink.addBatch is as follows:
  /**   * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if   * this method is called more than once with the same batchId (which will happen in the case of   * failures), then `data` should only be added once.   *   * Note 1: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`).   * Otherwise, you may get a wrong result.   *   * Note 2: The method is supposed to be executed synchronously, i.e. the method should only return   * after data is consumed by sink successfully.   */  def addBatch(batchId: Long, data: DataFrame): Unit
A few questions about the data is each DataFrame passed as the argument to addBatch - 1. Is it all the data in a partition for each trigger or is it all the data in that trigger ?  2. Is there a way to control the size in each addBatch invocation to make sure that we don't run into OOM exception on the executor while calling collect ?
Thanks



   

Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

Posted by Tathagata Das <ta...@gmail.com>.
1. It is all the result data in that trigger. Note that it takes a
DataFrame which is a purely logical representation of data and has no
association with partitions, etc. which are physical representations.

2. If you want to limit the amount of data that is processed in a trigger,
then you should either control the trigger interval or use the rate limit
options on sources that support it (e.g. for kafka, you can use the option
"maxOffsetsPerTrigger", see the guide
<https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html>
).

Related note, these APIs are subject to change. In fact in the upcoming
release 2.3, we are adding a DataSource V2 API for
batch/microbatch-streaming/continuous-streaming sources and sinks.

On Wed, Jan 3, 2018 at 11:23 PM, M Singh <ma...@yahoo.com.invalid>
wrote:

> Hi:
>
> The documentation for Sink.addBatch is as follows:
>
>   /**
>    * Adds a batch of data to this sink. The data for a given `batchId` is
> deterministic and if
>    * this method is called more than once with the same batchId (which
> will happen in the case of
>    * failures), then `data` should only be added once.
>    *
>    * Note 1: You cannot apply any operators on `data` except consuming it
> (e.g., `collect/foreach`).
>    * Otherwise, you may get a wrong result.
>    *
>    * Note 2: The method is supposed to be executed synchronously, i.e.
> the method should only return
>    * after data is consumed by sink successfully.
>    */
>   def addBatch(batchId: Long, data: DataFrame): Unit
>
> A few questions about the data is each DataFrame passed as the argument to
> addBatch -
> 1. Is it all the data in a partition for each trigger or is it all the
> data in that trigger ?
> 2. Is there a way to control the size in each addBatch invocation to make
> sure that we don't run into OOM exception on the executor while calling
> collect ?
>
> Thanks
>