You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2018/10/04 10:37:25 UTC

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/22627

    [SPARK-25639] [DOCS] Added docs for foreachBatch, python foreach and multiple watermarks

    ## What changes were proposed in this pull request?
    
    Added
    - Python foreach
    - Scala, Java and Python foreachBatch
    - Multiple watermark policy
    - The semantics of what changes are allowed to the streaming between restarts.
    
    ## How was this patch tested?
    No tests
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-25639

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22627.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22627
    
----
commit f61c13ef0d4711a04b2774934641f7a4ac690165
Author: Tathagata Das <ta...@...>
Date:   2018-10-04T10:33:47Z

    Added docs

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r223481016
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    +      // Open connection
    +    }
    +
    +    void process(String record) {
    +      // Write string to connection
    +    }
    +
    +    void close(Throwable errorOrNull) {
    +      // Close the connection
    +    }
    +  }
    +).start();
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +In Python, you can invoke foreach in two ways: in a function or in an object. 
    +The function offers a simple way to express your processing logic but does not allow you to 
    +deduplicate generated data when failures cause reprocessing of some input data. 
    +For that situation you must specify the processing logic in an object.
    +
    +1. The function takes a row as input.
    +
    +  {% highlight python %}
    +      def processRow(row):
    +          // Write row to storage
    +          pass
    +      
    +      query = streamingDF.writeStream.foreach(processRow).start()  
    +  {% endhighlight %}
    +
    +2. The object has a process method and optional open and close methods: 
    +
    +  {% highlight python %}
    +      class ForeachWriter:
    +          def open(self, partition_id, epoch_id):
    +              // Open connection. This method is optional in Python.
    +      
    +          def process(self, row):
    +              // Write row to connection. This method is NOT optional in Python.
    +      
    +          def close(self, error):
    +              // Close the connection. This method in optional in Python.
    +      
    +      query = streamingDF.writeStream.foreach(ForeachWriter()).start()
    +  {% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +
    +**Execution semantics**
    +When the streaming query is started, Spark calls the function or the object’s methods in the following way:
    +
    +- A single copy of this object is responsible for all the data generated by a single task in a query. 
    +  In other words, one instance is responsible for processing one partition of the data generated in a distributed manner.
    +
    +- This object must be serializable, because each task will get a fresh serialized-deserialized copy 
    +  of the provided object. Hence, it is strongly recommended that any initialization for writing data 
    +  (for example. opening a connection or starting a transaction) is done after the open() method has 
    +  been called, which signifies that the task is ready to generate data.
    +
    +- The lifecycle of the methods are as follows:
    +
    +  - For each partition with partition_id:
     
    -- The writer must be serializable, as it will be serialized and sent to the executors for execution.
    +    - For each batch/epoch of streaming data with epoch_id:
     
    -- All the three methods, `open`, `process` and `close` will be called on the executors.
    +      - Method open(partitionId, epochId) is called.
     
    -- The writer must do all the initialization (e.g. opening connections, starting a transaction, etc.) only when the `open` method is called. Be aware that, if there is any initialization in the class as soon as the object is created, then that initialization will happen in the driver (because that is where the instance is being created), which may not be what you intend.
    +      - If open(...) returns true, for each row in the partition and batch/epoch, method process(row) is called.
     
    -- `version` and `partition` are two parameters in `open` that uniquely represent a set of rows that needs to be pushed out. `version` is a monotonically increasing id that increases with every trigger. `partition` is an id that represents a partition of the output, since the output is distributed and will be processed on multiple executors.
    +      - Method close(error) is called with error (if any) seen while processing rows.
     
    -- `open` can use the `version` and `partition` to choose whether it needs to write the sequence of rows. Accordingly, it can return `true` (proceed with writing), or `false` (no need to write). If `false` is returned, then `process` will not be called on any row. For example, after a partial failure, some of the output partitions of the failed trigger may have already been committed to a database. Based on metadata stored in the database, the writer can identify partitions that have already been committed and accordingly return false to skip committing them again. 
    +- The close() method (if it exists) is called if an open() method exists and returns successfully (irrespective of the return value), except if the JVM or Python process crashes in the middle.
     
    -- Whenever `open` is called, `close` will also be called (unless the JVM exits due to some error). This is true even if `open` returns false. If there is any error in processing and writing the data, `close` will be called with the error. It is your responsibility to clean up state (e.g. connections, transactions, etc.) that have been created in `open` such that there are no resource leaks.
    +- **Note:** The partitionId and epochId in the open() method can be used to deduplicate generated data 
    +  when failures cause reprocessing of some input data. This depends on the execution mode of the query. 
    +  If the streaming query is being executed in the micro-batch mode, then every partition represented 
    +  by a unique tuple (partition_id, epoch_id) is guaranteed to have the same data. 
    +  Hence, (partition_id, epoch_id) can be used to deduplicate and/or transactionally commit 
    +  data and achieve exactly-once guarantees. However, if the streaming query is being executed 
    +  in the continuous mode, then this guarantee does not hold and therefore should not be used for deduplication.
    --- End diff --
    
    Gotcha. Thanks for your explanation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222815072
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    --- End diff --
    
    missing `public`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222894110
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    --- End diff --
    
    `epochId` -> `epoch_id`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r223491087
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,214 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream().foreachBatch(
    +  new VoidFunction2<Dataset<String>, Long> {
    +    public void call(Dataset<String> dataset, Long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epoch_id):
    --- End diff --
    
    nit: `foreachBatchFunction` -> `foreach_batch_function`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222817163
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    +      // Open connection
    +    }
    +
    +    void process(String record) {
    +      // Write string to connection
    +    }
    +
    +    void close(Throwable errorOrNull) {
    +      // Close the connection
    +    }
    +  }
    +).start();
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +In Python, you can invoke foreach in two ways: in a function or in an object. 
    +The function offers a simple way to express your processing logic but does not allow you to 
    +deduplicate generated data when failures cause reprocessing of some input data. 
    +For that situation you must specify the processing logic in an object.
    +
    +1. The function takes a row as input.
    +
    +  {% highlight python %}
    +      def processRow(row):
    +          // Write row to storage
    +          pass
    +      
    +      query = streamingDF.writeStream.foreach(processRow).start()  
    +  {% endhighlight %}
    +
    +2. The object has a process method and optional open and close methods: 
    +
    +  {% highlight python %}
    +      class ForeachWriter:
    +          def open(self, partition_id, epoch_id):
    +              // Open connection. This method is optional in Python.
    +      
    +          def process(self, row):
    +              // Write row to connection. This method is NOT optional in Python.
    +      
    +          def close(self, error):
    +              // Close the connection. This method in optional in Python.
    +      
    +      query = streamingDF.writeStream.foreach(ForeachWriter()).start()
    +  {% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +
    +**Execution semantics**
    +When the streaming query is started, Spark calls the function or the object’s methods in the following way:
    +
    +- A single copy of this object is responsible for all the data generated by a single task in a query. 
    +  In other words, one instance is responsible for processing one partition of the data generated in a distributed manner.
    +
    +- This object must be serializable, because each task will get a fresh serialized-deserialized copy 
    +  of the provided object. Hence, it is strongly recommended that any initialization for writing data 
    +  (for example. opening a connection or starting a transaction) is done after the open() method has 
    +  been called, which signifies that the task is ready to generate data.
    +
    +- The lifecycle of the methods are as follows:
    +
    +  - For each partition with partition_id:
     
    -- The writer must be serializable, as it will be serialized and sent to the executors for execution.
    +    - For each batch/epoch of streaming data with epoch_id:
     
    -- All the three methods, `open`, `process` and `close` will be called on the executors.
    +      - Method open(partitionId, epochId) is called.
     
    -- The writer must do all the initialization (e.g. opening connections, starting a transaction, etc.) only when the `open` method is called. Be aware that, if there is any initialization in the class as soon as the object is created, then that initialization will happen in the driver (because that is where the instance is being created), which may not be what you intend.
    +      - If open(...) returns true, for each row in the partition and batch/epoch, method process(row) is called.
     
    -- `version` and `partition` are two parameters in `open` that uniquely represent a set of rows that needs to be pushed out. `version` is a monotonically increasing id that increases with every trigger. `partition` is an id that represents a partition of the output, since the output is distributed and will be processed on multiple executors.
    +      - Method close(error) is called with error (if any) seen while processing rows.
     
    -- `open` can use the `version` and `partition` to choose whether it needs to write the sequence of rows. Accordingly, it can return `true` (proceed with writing), or `false` (no need to write). If `false` is returned, then `process` will not be called on any row. For example, after a partial failure, some of the output partitions of the failed trigger may have already been committed to a database. Based on metadata stored in the database, the writer can identify partitions that have already been committed and accordingly return false to skip committing them again. 
    +- The close() method (if it exists) is called if an open() method exists and returns successfully (irrespective of the return value), except if the JVM or Python process crashes in the middle.
     
    -- Whenever `open` is called, `close` will also be called (unless the JVM exits due to some error). This is true even if `open` returns false. If there is any error in processing and writing the data, `close` will be called with the error. It is your responsibility to clean up state (e.g. connections, transactions, etc.) that have been created in `open` such that there are no resource leaks.
    +- **Note:** The partitionId and epochId in the open() method can be used to deduplicate generated data 
    +  when failures cause reprocessing of some input data. This depends on the execution mode of the query. 
    +  If the streaming query is being executed in the micro-batch mode, then every partition represented 
    +  by a unique tuple (partition_id, epoch_id) is guaranteed to have the same data. 
    +  Hence, (partition_id, epoch_id) can be used to deduplicate and/or transactionally commit 
    +  data and achieve exactly-once guarantees. However, if the streaming query is being executed 
    +  in the continuous mode, then this guarantee does not hold and therefore should not be used for deduplication.
    --- End diff --
    
    I think continuous processing will always reprocess the whole epoch after recovery and the user should be able to use `(partition_id, epoch_id)` to deduplicate. Is it not true?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222840402
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    +      // Open connection
    +    }
    +
    +    void process(String record) {
    +      // Write string to connection
    +    }
    +
    +    void close(Throwable errorOrNull) {
    +      // Close the connection
    +    }
    +  }
    +).start();
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +In Python, you can invoke foreach in two ways: in a function or in an object. 
    +The function offers a simple way to express your processing logic but does not allow you to 
    +deduplicate generated data when failures cause reprocessing of some input data. 
    +For that situation you must specify the processing logic in an object.
    +
    +1. The function takes a row as input.
    +
    +  {% highlight python %}
    +      def processRow(row):
    +          // Write row to storage
    +          pass
    +      
    +      query = streamingDF.writeStream.foreach(processRow).start()  
    +  {% endhighlight %}
    +
    +2. The object has a process method and optional open and close methods: 
    +
    +  {% highlight python %}
    +      class ForeachWriter:
    +          def open(self, partition_id, epoch_id):
    +              // Open connection. This method is optional in Python.
    +      
    +          def process(self, row):
    +              // Write row to connection. This method is NOT optional in Python.
    +      
    +          def close(self, error):
    +              // Close the connection. This method in optional in Python.
    +      
    +      query = streamingDF.writeStream.foreach(ForeachWriter()).start()
    +  {% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +
    +**Execution semantics**
    +When the streaming query is started, Spark calls the function or the object’s methods in the following way:
    +
    +- A single copy of this object is responsible for all the data generated by a single task in a query. 
    +  In other words, one instance is responsible for processing one partition of the data generated in a distributed manner.
    +
    +- This object must be serializable, because each task will get a fresh serialized-deserialized copy 
    +  of the provided object. Hence, it is strongly recommended that any initialization for writing data 
    +  (for example. opening a connection or starting a transaction) is done after the open() method has 
    +  been called, which signifies that the task is ready to generate data.
    +
    +- The lifecycle of the methods are as follows:
    +
    +  - For each partition with partition_id:
     
    -- The writer must be serializable, as it will be serialized and sent to the executors for execution.
    +    - For each batch/epoch of streaming data with epoch_id:
     
    -- All the three methods, `open`, `process` and `close` will be called on the executors.
    +      - Method open(partitionId, epochId) is called.
     
    -- The writer must do all the initialization (e.g. opening connections, starting a transaction, etc.) only when the `open` method is called. Be aware that, if there is any initialization in the class as soon as the object is created, then that initialization will happen in the driver (because that is where the instance is being created), which may not be what you intend.
    +      - If open(...) returns true, for each row in the partition and batch/epoch, method process(row) is called.
     
    -- `version` and `partition` are two parameters in `open` that uniquely represent a set of rows that needs to be pushed out. `version` is a monotonically increasing id that increases with every trigger. `partition` is an id that represents a partition of the output, since the output is distributed and will be processed on multiple executors.
    +      - Method close(error) is called with error (if any) seen while processing rows.
     
    -- `open` can use the `version` and `partition` to choose whether it needs to write the sequence of rows. Accordingly, it can return `true` (proceed with writing), or `false` (no need to write). If `false` is returned, then `process` will not be called on any row. For example, after a partial failure, some of the output partitions of the failed trigger may have already been committed to a database. Based on metadata stored in the database, the writer can identify partitions that have already been committed and accordingly return false to skip committing them again. 
    +- The close() method (if it exists) is called if an open() method exists and returns successfully (irrespective of the return value), except if the JVM or Python process crashes in the middle.
     
    -- Whenever `open` is called, `close` will also be called (unless the JVM exits due to some error). This is true even if `open` returns false. If there is any error in processing and writing the data, `close` will be called with the error. It is your responsibility to clean up state (e.g. connections, transactions, etc.) that have been created in `open` such that there are no resource leaks.
    +- **Note:** The partitionId and epochId in the open() method can be used to deduplicate generated data 
    +  when failures cause reprocessing of some input data. This depends on the execution mode of the query. 
    +  If the streaming query is being executed in the micro-batch mode, then every partition represented 
    +  by a unique tuple (partition_id, epoch_id) is guaranteed to have the same data. 
    +  Hence, (partition_id, epoch_id) can be used to deduplicate and/or transactionally commit 
    +  data and achieve exactly-once guarantees. However, if the streaming query is being executed 
    +  in the continuous mode, then this guarantee does not hold and therefore should not be used for deduplication.
    --- End diff --
    
    If my understanding is right, continuous processing doesn't guarantee same epoch id processes same offset range of source (since it will process as many as possible before it receives epoch marker), so epoch id can't be used for deduplicate.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222893841
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    +      // Open connection
    +    }
    +
    +    void process(String record) {
    +      // Write string to connection
    +    }
    +
    +    void close(Throwable errorOrNull) {
    +      // Close the connection
    +    }
    +  }
    +).start();
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +In Python, you can invoke foreach in two ways: in a function or in an object. 
    +The function offers a simple way to express your processing logic but does not allow you to 
    +deduplicate generated data when failures cause reprocessing of some input data. 
    +For that situation you must specify the processing logic in an object.
    +
    +1. The function takes a row as input.
    +
    +  {% highlight python %}
    +      def processRow(row):
    +          // Write row to storage
    +          pass
    +      
    +      query = streamingDF.writeStream.foreach(processRow).start()  
    +  {% endhighlight %}
    +
    +2. The object has a process method and optional open and close methods: 
    +
    +  {% highlight python %}
    +      class ForeachWriter:
    +          def open(self, partition_id, epoch_id):
    +              // Open connection. This method is optional in Python.
    --- End diff --
    
    Looks `pass` missing as well


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    @zsxwing 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r223479414
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    +      // Open connection
    +    }
    +
    +    void process(String record) {
    +      // Write string to connection
    +    }
    +
    +    void close(Throwable errorOrNull) {
    +      // Close the connection
    +    }
    +  }
    +).start();
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +In Python, you can invoke foreach in two ways: in a function or in an object. 
    +The function offers a simple way to express your processing logic but does not allow you to 
    +deduplicate generated data when failures cause reprocessing of some input data. 
    +For that situation you must specify the processing logic in an object.
    +
    +1. The function takes a row as input.
    +
    +  {% highlight python %}
    +      def processRow(row):
    +          // Write row to storage
    +          pass
    +      
    +      query = streamingDF.writeStream.foreach(processRow).start()  
    +  {% endhighlight %}
    +
    +2. The object has a process method and optional open and close methods: 
    +
    +  {% highlight python %}
    +      class ForeachWriter:
    +          def open(self, partition_id, epoch_id):
    +              // Open connection. This method is optional in Python.
    +      
    +          def process(self, row):
    +              // Write row to connection. This method is NOT optional in Python.
    +      
    +          def close(self, error):
    +              // Close the connection. This method in optional in Python.
    +      
    +      query = streamingDF.writeStream.foreach(ForeachWriter()).start()
    +  {% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +
    +**Execution semantics**
    +When the streaming query is started, Spark calls the function or the object’s methods in the following way:
    +
    +- A single copy of this object is responsible for all the data generated by a single task in a query. 
    +  In other words, one instance is responsible for processing one partition of the data generated in a distributed manner.
    +
    +- This object must be serializable, because each task will get a fresh serialized-deserialized copy 
    +  of the provided object. Hence, it is strongly recommended that any initialization for writing data 
    +  (for example. opening a connection or starting a transaction) is done after the open() method has 
    +  been called, which signifies that the task is ready to generate data.
    +
    +- The lifecycle of the methods are as follows:
    +
    +  - For each partition with partition_id:
     
    -- The writer must be serializable, as it will be serialized and sent to the executors for execution.
    +    - For each batch/epoch of streaming data with epoch_id:
     
    -- All the three methods, `open`, `process` and `close` will be called on the executors.
    +      - Method open(partitionId, epochId) is called.
     
    -- The writer must do all the initialization (e.g. opening connections, starting a transaction, etc.) only when the `open` method is called. Be aware that, if there is any initialization in the class as soon as the object is created, then that initialization will happen in the driver (because that is where the instance is being created), which may not be what you intend.
    +      - If open(...) returns true, for each row in the partition and batch/epoch, method process(row) is called.
     
    -- `version` and `partition` are two parameters in `open` that uniquely represent a set of rows that needs to be pushed out. `version` is a monotonically increasing id that increases with every trigger. `partition` is an id that represents a partition of the output, since the output is distributed and will be processed on multiple executors.
    +      - Method close(error) is called with error (if any) seen while processing rows.
     
    -- `open` can use the `version` and `partition` to choose whether it needs to write the sequence of rows. Accordingly, it can return `true` (proceed with writing), or `false` (no need to write). If `false` is returned, then `process` will not be called on any row. For example, after a partial failure, some of the output partitions of the failed trigger may have already been committed to a database. Based on metadata stored in the database, the writer can identify partitions that have already been committed and accordingly return false to skip committing them again. 
    +- The close() method (if it exists) is called if an open() method exists and returns successfully (irrespective of the return value), except if the JVM or Python process crashes in the middle.
     
    -- Whenever `open` is called, `close` will also be called (unless the JVM exits due to some error). This is true even if `open` returns false. If there is any error in processing and writing the data, `close` will be called with the error. It is your responsibility to clean up state (e.g. connections, transactions, etc.) that have been created in `open` such that there are no resource leaks.
    +- **Note:** The partitionId and epochId in the open() method can be used to deduplicate generated data 
    +  when failures cause reprocessing of some input data. This depends on the execution mode of the query. 
    +  If the streaming query is being executed in the micro-batch mode, then every partition represented 
    +  by a unique tuple (partition_id, epoch_id) is guaranteed to have the same data. 
    +  Hence, (partition_id, epoch_id) can be used to deduplicate and/or transactionally commit 
    +  data and achieve exactly-once guarantees. However, if the streaming query is being executed 
    +  in the continuous mode, then this guarantee does not hold and therefore should not be used for deduplication.
    --- End diff --
    
    I agree with @HeartSaVioR 
    I continuous processing, when a epoch is reprocessed, the engine and offset tracking will ensure that the starting offset of that epoch is same as what was recorded with the previous epoch's offset, but the ending offset is not guaranteed to be the same as what was processed before the failure. It may so happen that the epoch E of partition P processed offsets X to Y (and the output of partition P was written), but the query failed before Y was recorded (as other partitions may not have completed epoch E). So after restarting, it may so happens that the re-executed epoch E may process offsets X to Y + Z before the epoch is incremented. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    **[Test build #97129 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97129/testReport)** for PR 22627 at commit [`222bfc6`](https://github.com/apache/spark/commit/222bfc60f360ab4c102278be50e0a2c309b84065).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222812874
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222815770
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    +      // Open connection
    +    }
    +
    +    void process(String record) {
    +      // Write string to connection
    +    }
    +
    +    void close(Throwable errorOrNull) {
    +      // Close the connection
    +    }
    +  }
    +).start();
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +In Python, you can invoke foreach in two ways: in a function or in an object. 
    +The function offers a simple way to express your processing logic but does not allow you to 
    +deduplicate generated data when failures cause reprocessing of some input data. 
    +For that situation you must specify the processing logic in an object.
    +
    +1. The function takes a row as input.
    +
    +  {% highlight python %}
    +      def processRow(row):
    +          // Write row to storage
    --- End diff --
    
    `//` -> `#`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    **[Test build #97129 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97129/testReport)** for PR 22627 at commit [`222bfc6`](https://github.com/apache/spark/commit/222bfc60f360ab4c102278be50e0a2c309b84065).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    @holdenk yeah, i intend to backport this to 2.4


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222815845
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    +      // Open connection
    +    }
    +
    +    void process(String record) {
    +      // Write string to connection
    +    }
    +
    +    void close(Throwable errorOrNull) {
    +      // Close the connection
    +    }
    +  }
    +).start();
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +In Python, you can invoke foreach in two ways: in a function or in an object. 
    +The function offers a simple way to express your processing logic but does not allow you to 
    +deduplicate generated data when failures cause reprocessing of some input data. 
    +For that situation you must specify the processing logic in an object.
    +
    +1. The function takes a row as input.
    +
    +  {% highlight python %}
    +      def processRow(row):
    +          // Write row to storage
    +          pass
    +      
    +      query = streamingDF.writeStream.foreach(processRow).start()  
    +  {% endhighlight %}
    +
    +2. The object has a process method and optional open and close methods: 
    +
    +  {% highlight python %}
    +      class ForeachWriter:
    +          def open(self, partition_id, epoch_id):
    +              // Open connection. This method is optional in Python.
    +      
    +          def process(self, row):
    +              // Write row to connection. This method is NOT optional in Python.
    +      
    +          def close(self, error):
    +              // Close the connection. This method in optional in Python.
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222815089
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    +      // Open connection
    +    }
    +
    +    void process(String record) {
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222818441
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat
     </div>
     </div>
     
    +
    +## Recovery Semantics after Changes in a Streaming Query
    +There are limitations on what changes in a streaming query are allowed between restarts from the 
    +same checkpoint location. Here are a few kinds of changes that are either not allowed, or 
    +the effect of the change is not well-defined. For all of them:
    +
    +- The term *allowed* means you can do the specified change but whether the semantics of its effect 
    +  is well-defined depends on the query and the change.
    +
    +- The term *not allowed* means you should not do the specified change as the restarted query is likely 
    +  to fail with unpredictable errors. `sdf` represents a streaming DataFrame/Dataset 
    +  generated with sparkSession.readStream.
    +  
    +**Types of changes**
    +
    +- *Changes in the number or type (i.e. different source) of input sources*: This is not allowed.
    +
    +- *Changes in the parameters of input sources*: Whether this is allowed and whether the semantics 
    +  of the change are well-defined depends on the source and the query. Here are a few examples.
    +
    +  - Addition/deletion/modification of rate limits is allowed: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)`
    +
    +  - Changes to subscribed topics/files is generally not allowed as the results are unpredictable: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "newTopic")`
    +
    +- *Changes in the type of output sink*: Changes between a few specific combinations of sinks 
    +  are allowed. This needs to be verified on a case-by-case basis. Here are a few examples.
    +
    +  - File sink to Kafka sink is allowed. Kafka will see only the new data.
    +
    +  - Kafka sink to file sink is not allowed.
    +
    +  - Kafka sink changed to foreach, or vice versa is allowed.
    +
    +- *Changes in the parameters of output sink*: Whether this is allowed and whether the semantics of 
    +  the change are well-defined depends on the sink and the query. Here are a few examples.
    +
    +  - Changes to output directory of a file sink is not allowed: `sdf.writeStream.format("parquet").option("path", "/somePath")` to `sdf.writeStream.format("parquet").option("path", "/anotherPath")`
    +
    +  - Changes to output topic is allowed: `sdf.writeStream.format("kafka").option("topic", "someTopic")` to `sdf.writeStream.format("kafka").option("path", "anotherTopic")`
    +
    +  - Changes to the user-defined foreach sink (that is, the `ForeachWriter` code) is allowed, but the semantics of the change depends on the code.
    +
    +- *Changes in projection / filter / map-like operations**: Some cases are allowed. For example:
    +
    +  - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to `sdf.where(...).selectExpr("a").filter(...)`.
    +
    +  - Changes in projections with same output schema is allowed: `sdf.selectExpr("stringColumn AS json").writeStream` to `sdf.select(to_json(...).as("json")).writeStream`.
    --- End diff --
    
    this example changes the schema. Right? From `string` to `struct`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    **[Test build #96934 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96934/testReport)** for PR 22627 at commit [`f61c13e`](https://github.com/apache/spark/commit/f61c13ef0d4711a04b2774934641f7a4ac690165).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    **[Test build #96936 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96936/testReport)** for PR 22627 at commit [`d16cfeb`](https://github.com/apache/spark/commit/d16cfebc774181246c9c29fc9ab439f8f659b8e1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222815830
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    +      // Open connection
    +    }
    +
    +    void process(String record) {
    +      // Write string to connection
    +    }
    +
    +    void close(Throwable errorOrNull) {
    +      // Close the connection
    +    }
    +  }
    +).start();
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +In Python, you can invoke foreach in two ways: in a function or in an object. 
    +The function offers a simple way to express your processing logic but does not allow you to 
    +deduplicate generated data when failures cause reprocessing of some input data. 
    +For that situation you must specify the processing logic in an object.
    +
    +1. The function takes a row as input.
    +
    +  {% highlight python %}
    +      def processRow(row):
    +          // Write row to storage
    +          pass
    +      
    +      query = streamingDF.writeStream.foreach(processRow).start()  
    +  {% endhighlight %}
    +
    +2. The object has a process method and optional open and close methods: 
    +
    +  {% highlight python %}
    +      class ForeachWriter:
    +          def open(self, partition_id, epoch_id):
    +              // Open connection. This method is optional in Python.
    +      
    +          def process(self, row):
    +              // Write row to connection. This method is NOT optional in Python.
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222872973
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    --- End diff --
    
    Not a big deal but methods like `foreachBatch` are sometimes rendered that way, and sometimes without code font like foreachBatch(). It's nice to back-tick-quote class and method names if you are doing another pass.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3676/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    **[Test build #96934 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96934/testReport)** for PR 22627 at commit [`f61c13e`](https://github.com/apache/spark/commit/f61c13ef0d4711a04b2774934641f7a4ac690165).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).`
      * `In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22627


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r223456079
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    --- End diff --
    
    Yes. I missed a few, and I want to fix them all. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96934/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/96936/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r223491101
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,214 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream().foreachBatch(
    +  new VoidFunction2<Dataset<String>, Long> {
    +    public void call(Dataset<String> dataset, Long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epoch_id):
    +    # Transform and write batchDF
    +    pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    --- End diff --
    
    nit: `foreachBatchFunction` -> `foreach_batch_function`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3804/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/3805/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r223490811
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,214 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream().foreachBatch(
    +  new VoidFunction2<Dataset<String>, Long> {
    +    public void call(Dataset<String> dataset, Long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epoch_id):
    +    # Transform and write batchDF
    +    pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With `foreachBatch`, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using `foreachBatch`, you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.persist()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.unpersist()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using `foreachBatch`, you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String): Unit = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream().foreach(
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222894028
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    --- End diff --
    
    4 space indentation


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97131/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222893790
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    +      // Open connection
    +    }
    +
    +    void process(String record) {
    +      // Write string to connection
    +    }
    +
    +    void close(Throwable errorOrNull) {
    +      // Close the connection
    +    }
    +  }
    +).start();
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +In Python, you can invoke foreach in two ways: in a function or in an object. 
    +The function offers a simple way to express your processing logic but does not allow you to 
    +deduplicate generated data when failures cause reprocessing of some input data. 
    +For that situation you must specify the processing logic in an object.
    +
    +1. The function takes a row as input.
    +
    +  {% highlight python %}
    +      def processRow(row):
    --- End diff --
    
    `processRow` -> `process_row`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    I think we should consider this for backport to 2.4 given that it documents new behaviour in 2.4 unless folks object.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222812757
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    --- End diff --
    
    long -> Long. I noticed the current Java API actually is wrong. Submitted https://github.com/apache/spark/pull/22633 to fix it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97129/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222812936
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    --- End diff --
    
    nit: `writeStream()`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222893720
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    --- End diff --
    
    and `@Override`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222818615
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat
     </div>
     </div>
     
    +
    +## Recovery Semantics after Changes in a Streaming Query
    +There are limitations on what changes in a streaming query are allowed between restarts from the 
    +same checkpoint location. Here are a few kinds of changes that are either not allowed, or 
    +the effect of the change is not well-defined. For all of them:
    +
    +- The term *allowed* means you can do the specified change but whether the semantics of its effect 
    +  is well-defined depends on the query and the change.
    +
    +- The term *not allowed* means you should not do the specified change as the restarted query is likely 
    +  to fail with unpredictable errors. `sdf` represents a streaming DataFrame/Dataset 
    +  generated with sparkSession.readStream.
    +  
    +**Types of changes**
    +
    +- *Changes in the number or type (i.e. different source) of input sources*: This is not allowed.
    +
    +- *Changes in the parameters of input sources*: Whether this is allowed and whether the semantics 
    +  of the change are well-defined depends on the source and the query. Here are a few examples.
    +
    +  - Addition/deletion/modification of rate limits is allowed: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)`
    +
    +  - Changes to subscribed topics/files is generally not allowed as the results are unpredictable: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "newTopic")`
    +
    +- *Changes in the type of output sink*: Changes between a few specific combinations of sinks 
    +  are allowed. This needs to be verified on a case-by-case basis. Here are a few examples.
    +
    +  - File sink to Kafka sink is allowed. Kafka will see only the new data.
    +
    +  - Kafka sink to file sink is not allowed.
    +
    +  - Kafka sink changed to foreach, or vice versa is allowed.
    +
    +- *Changes in the parameters of output sink*: Whether this is allowed and whether the semantics of 
    +  the change are well-defined depends on the sink and the query. Here are a few examples.
    +
    +  - Changes to output directory of a file sink is not allowed: `sdf.writeStream.format("parquet").option("path", "/somePath")` to `sdf.writeStream.format("parquet").option("path", "/anotherPath")`
    +
    +  - Changes to output topic is allowed: `sdf.writeStream.format("kafka").option("topic", "someTopic")` to `sdf.writeStream.format("kafka").option("path", "anotherTopic")`
    +
    +  - Changes to the user-defined foreach sink (that is, the `ForeachWriter` code) is allowed, but the semantics of the change depends on the code.
    +
    +- *Changes in projection / filter / map-like operations**: Some cases are allowed. For example:
    +
    +  - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to `sdf.where(...).selectExpr("a").filter(...)`.
    +
    +  - Changes in projections with same output schema is allowed: `sdf.selectExpr("stringColumn AS json").writeStream` to `sdf.select(to_json(...).as("json")).writeStream`.
    +
    +  - Changes in projections with different output schema are conditionally allowed: `sdf.selectExpr("a").writeStream` to `sdf.selectExpr("b").writeStream` is allowed only if the output sink allows the schema change from `"a"` to `"b"`.
    +
    +- *Changes in stateful operations*: Some operations in streaming queries need to maintain
    +  state data in order to continuously update the result. Structured Streaming automatically checkpoints
    +  the state data to fault-tolerant storage (for example, DBFS, AWS S3, Azure Blob storage) and restores it after restart.
    --- End diff --
    
    remove `DBFS`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222814573
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    --- End diff --
    
    `streamingDF.writeStream` -> `streamingDatasetOfString.writeStream()`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222813906
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    --- End diff --
    
    `uncache()` -> `unpersist()`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222894056
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    --- End diff --
    
    `foreachBatchFunction` -> `foreach_batch_function`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222815808
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    +      // Open connection
    +    }
    +
    +    void process(String record) {
    +      // Write string to connection
    +    }
    +
    +    void close(Throwable errorOrNull) {
    +      // Close the connection
    +    }
    +  }
    +).start();
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +In Python, you can invoke foreach in two ways: in a function or in an object. 
    +The function offers a simple way to express your processing logic but does not allow you to 
    +deduplicate generated data when failures cause reprocessing of some input data. 
    +For that situation you must specify the processing logic in an object.
    +
    +1. The function takes a row as input.
    +
    +  {% highlight python %}
    +      def processRow(row):
    +          // Write row to storage
    +          pass
    +      
    +      query = streamingDF.writeStream.foreach(processRow).start()  
    +  {% endhighlight %}
    +
    +2. The object has a process method and optional open and close methods: 
    +
    +  {% highlight python %}
    +      class ForeachWriter:
    +          def open(self, partition_id, epoch_id):
    +              // Open connection. This method is optional in Python.
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    **[Test build #96936 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/96936/testReport)** for PR 22627 at commit [`d16cfeb`](https://github.com/apache/spark/commit/d16cfebc774181246c9c29fc9ab439f8f659b8e1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222815111
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    +      // Open connection
    +    }
    +
    +    void process(String record) {
    +      // Write string to connection
    +    }
    +
    +    void close(Throwable errorOrNull) {
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r223456294
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat
     </div>
     </div>
     
    +
    +## Recovery Semantics after Changes in a Streaming Query
    +There are limitations on what changes in a streaming query are allowed between restarts from the 
    +same checkpoint location. Here are a few kinds of changes that are either not allowed, or 
    +the effect of the change is not well-defined. For all of them:
    +
    +- The term *allowed* means you can do the specified change but whether the semantics of its effect 
    +  is well-defined depends on the query and the change.
    +
    +- The term *not allowed* means you should not do the specified change as the restarted query is likely 
    +  to fail with unpredictable errors. `sdf` represents a streaming DataFrame/Dataset 
    +  generated with sparkSession.readStream.
    +  
    +**Types of changes**
    +
    +- *Changes in the number or type (i.e. different source) of input sources*: This is not allowed.
    +
    +- *Changes in the parameters of input sources*: Whether this is allowed and whether the semantics 
    +  of the change are well-defined depends on the source and the query. Here are a few examples.
    +
    +  - Addition/deletion/modification of rate limits is allowed: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)`
    +
    +  - Changes to subscribed topics/files is generally not allowed as the results are unpredictable: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "newTopic")`
    +
    +- *Changes in the type of output sink*: Changes between a few specific combinations of sinks 
    +  are allowed. This needs to be verified on a case-by-case basis. Here are a few examples.
    +
    +  - File sink to Kafka sink is allowed. Kafka will see only the new data.
    +
    +  - Kafka sink to file sink is not allowed.
    +
    +  - Kafka sink changed to foreach, or vice versa is allowed.
    +
    +- *Changes in the parameters of output sink*: Whether this is allowed and whether the semantics of 
    +  the change are well-defined depends on the sink and the query. Here are a few examples.
    +
    +  - Changes to output directory of a file sink is not allowed: `sdf.writeStream.format("parquet").option("path", "/somePath")` to `sdf.writeStream.format("parquet").option("path", "/anotherPath")`
    +
    +  - Changes to output topic is allowed: `sdf.writeStream.format("kafka").option("topic", "someTopic")` to `sdf.writeStream.format("kafka").option("path", "anotherTopic")`
    +
    +  - Changes to the user-defined foreach sink (that is, the `ForeachWriter` code) is allowed, but the semantics of the change depends on the code.
    +
    +- *Changes in projection / filter / map-like operations**: Some cases are allowed. For example:
    +
    +  - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to `sdf.where(...).selectExpr("a").filter(...)`.
    +
    +  - Changes in projections with same output schema is allowed: `sdf.selectExpr("stringColumn AS json").writeStream` to `sdf.select(to_json(...).as("json")).writeStream`.
    --- End diff --
    
    Right. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r223490770
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,214 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream().foreachBatch(
    +  new VoidFunction2<Dataset<String>, Long> {
    +    public void call(Dataset<String> dataset, Long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epoch_id):
    +    # Transform and write batchDF
    +    pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With `foreachBatch`, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using `foreachBatch`, you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.persist()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.unpersist()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using `foreachBatch`, you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    --- End diff --
    
    nit: `streamingDF` -> `streamingDatasetOfString`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222893572
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
    -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
    -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
    +logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
    +and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
    +  in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    --- End diff --
    
    nit: return type `Unit`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    **[Test build #97131 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97131/testReport)** for PR 22627 at commit [`9d60534`](https://github.com/apache/spark/commit/9d60534ebff24d326b8e54fd036016bd56d9a0be).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r222818119
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat
     </div>
     </div>
     
    +
    +## Recovery Semantics after Changes in a Streaming Query
    +There are limitations on what changes in a streaming query are allowed between restarts from the 
    +same checkpoint location. Here are a few kinds of changes that are either not allowed, or 
    +the effect of the change is not well-defined. For all of them:
    +
    +- The term *allowed* means you can do the specified change but whether the semantics of its effect 
    +  is well-defined depends on the query and the change.
    +
    +- The term *not allowed* means you should not do the specified change as the restarted query is likely 
    +  to fail with unpredictable errors. `sdf` represents a streaming DataFrame/Dataset 
    +  generated with sparkSession.readStream.
    +  
    +**Types of changes**
    +
    +- *Changes in the number or type (i.e. different source) of input sources*: This is not allowed.
    +
    +- *Changes in the parameters of input sources*: Whether this is allowed and whether the semantics 
    +  of the change are well-defined depends on the source and the query. Here are a few examples.
    +
    +  - Addition/deletion/modification of rate limits is allowed: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)`
    +
    +  - Changes to subscribed topics/files is generally not allowed as the results are unpredictable: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "newTopic")`
    +
    +- *Changes in the type of output sink*: Changes between a few specific combinations of sinks 
    +  are allowed. This needs to be verified on a case-by-case basis. Here are a few examples.
    +
    +  - File sink to Kafka sink is allowed. Kafka will see only the new data.
    +
    +  - Kafka sink to file sink is not allowed.
    +
    +  - Kafka sink changed to foreach, or vice versa is allowed.
    +
    +- *Changes in the parameters of output sink*: Whether this is allowed and whether the semantics of 
    +  the change are well-defined depends on the sink and the query. Here are a few examples.
    +
    +  - Changes to output directory of a file sink is not allowed: `sdf.writeStream.format("parquet").option("path", "/somePath")` to `sdf.writeStream.format("parquet").option("path", "/anotherPath")`
    +
    +  - Changes to output topic is allowed: `sdf.writeStream.format("kafka").option("topic", "someTopic")` to `sdf.writeStream.format("kafka").option("path", "anotherTopic")`
    --- End diff --
    
    nit: `path` -> `topic`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22627
  
    **[Test build #97131 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97131/testReport)** for PR 22627 at commit [`9d60534`](https://github.com/apache/spark/commit/9d60534ebff24d326b8e54fd036016bd56d9a0be).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r223479786
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat
     </div>
     </div>
     
    +
    +## Recovery Semantics after Changes in a Streaming Query
    +There are limitations on what changes in a streaming query are allowed between restarts from the 
    +same checkpoint location. Here are a few kinds of changes that are either not allowed, or 
    +the effect of the change is not well-defined. For all of them:
    +
    +- The term *allowed* means you can do the specified change but whether the semantics of its effect 
    +  is well-defined depends on the query and the change.
    +
    +- The term *not allowed* means you should not do the specified change as the restarted query is likely 
    +  to fail with unpredictable errors. `sdf` represents a streaming DataFrame/Dataset 
    +  generated with sparkSession.readStream.
    +  
    +**Types of changes**
    +
    +- *Changes in the number or type (i.e. different source) of input sources*: This is not allowed.
    +
    +- *Changes in the parameters of input sources*: Whether this is allowed and whether the semantics 
    +  of the change are well-defined depends on the source and the query. Here are a few examples.
    +
    +  - Addition/deletion/modification of rate limits is allowed: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)`
    +
    +  - Changes to subscribed topics/files is generally not allowed as the results are unpredictable: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "newTopic")`
    +
    +- *Changes in the type of output sink*: Changes between a few specific combinations of sinks 
    +  are allowed. This needs to be verified on a case-by-case basis. Here are a few examples.
    +
    +  - File sink to Kafka sink is allowed. Kafka will see only the new data.
    +
    +  - Kafka sink to file sink is not allowed.
    +
    +  - Kafka sink changed to foreach, or vice versa is allowed.
    +
    +- *Changes in the parameters of output sink*: Whether this is allowed and whether the semantics of 
    +  the change are well-defined depends on the sink and the query. Here are a few examples.
    +
    +  - Changes to output directory of a file sink is not allowed: `sdf.writeStream.format("parquet").option("path", "/somePath")` to `sdf.writeStream.format("parquet").option("path", "/anotherPath")`
    +
    +  - Changes to output topic is allowed: `sdf.writeStream.format("kafka").option("topic", "someTopic")` to `sdf.writeStream.format("kafka").option("path", "anotherTopic")`
    +
    +  - Changes to the user-defined foreach sink (that is, the `ForeachWriter` code) is allowed, but the semantics of the change depends on the code.
    +
    +- *Changes in projection / filter / map-like operations**: Some cases are allowed. For example:
    +
    +  - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to `sdf.where(...).selectExpr("a").filter(...)`.
    +
    +  - Changes in projections with same output schema is allowed: `sdf.selectExpr("stringColumn AS json").writeStream` to `sdf.select(to_json(...).as("json")).writeStream`.
    +
    +  - Changes in projections with different output schema are conditionally allowed: `sdf.selectExpr("a").writeStream` to `sdf.selectExpr("b").writeStream` is allowed only if the output sink allows the schema change from `"a"` to `"b"`.
    +
    +- *Changes in stateful operations*: Some operations in streaming queries need to maintain
    +  state data in order to continuously update the result. Structured Streaming automatically checkpoints
    +  the state data to fault-tolerant storage (for example, DBFS, AWS S3, Azure Blob storage) and restores it after restart.
    --- End diff --
    
    replaced with HDFS


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org