You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2018/06/14 23:06:05 UTC

[GitHub] spark pull request #21571: [WIP][SPARK-24565][SS] Add API for in Structured ...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/21571

    [WIP][SPARK-24565][SS] Add API for in Structured Streaming for exposing output rows of each microbatch as a DataFrame

    ## What changes were proposed in this pull request?
    
    Currently, the micro-batches in the MicroBatchExecution is not exposed to the user through any public API. This was because we did not want to expose the micro-batches, so that all the APIs we expose, we can eventually support them in the Continuous engine. But now that we have better sense of buiding a ContinuousExecution, I am considering adding APIs which will run only the MicroBatchExecution. I have quite a few use cases where exposing the microbatch output as a dataframe is useful. 
    - Pass the output rows of each batch to a library that is designed only the batch jobs (example, uses many ML libraries need to collect() while learning).
    - Reuse batch data sources for output whose streaming version does not exists (e.g. redshift data source).
    - Writer the output rows to multiple places by writing twice for each batch. This is not the most elegant thing to do for multiple-output streaming queries but is likely to be better than running two streaming queries processing the same data twice.
    
    The proposal is to add a method `foreachBatch(f: Dataset[T] => Unit)` to Scala/Java/Python `DataStreamWriter`.
    
    ## How was this patch tested?
    New unit tests.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark foreachBatch

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21571.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21571
    
----
commit 21acc73810a5385f04ce540d0f8b1a575aea5da6
Author: Tathagata Das <ta...@...>
Date:   2018-06-08T04:13:19Z

    First cut of Scala foreachBatch

commit 4ac056e87fee94ed0b2742cc1e8644efc998a3ea
Author: Tathagata Das <ta...@...>
Date:   2018-06-08T09:27:36Z

    Added python support

commit 3b7b20d5c1b662abd935c0a812d9e54f8ab01b24
Author: Tathagata Das <ta...@...>
Date:   2018-06-08T09:36:49Z

    Merge remote-tracking branch 'apache-github/master' into foreachBatch

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #91994 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91994/testReport)** for PR 21571 at commit [`6f9fdf4`](https://github.com/apache/spark/commit/6f9fdf444aa704ea07e3c6ffc89474e1aeb59b6f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #91938 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91938/testReport)** for PR 21571 at commit [`e8073ea`](https://github.com/apache/spark/commit/e8073eae4f9c8c833bbc3c3a21bb5f175615b027).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #92018 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92018/testReport)** for PR 21571 at commit [`5b4252a`](https://github.com/apache/spark/commit/5b4252a8a7c40886bb73201327fff22dc921c3dc).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92018/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4138/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/160/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91875/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [SPARK-24565][SS] Add API for in Structured Strea...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r196627302
  
    --- Diff: python/pyspark/streaming/context.py ---
    @@ -79,22 +79,8 @@ def _ensure_initialized(cls):
             java_import(gw.jvm, "org.apache.spark.streaming.api.java.*")
             java_import(gw.jvm, "org.apache.spark.streaming.api.python.*")
     
    -        # start callback server
    -        # getattr will fallback to JVM, so we cannot test by hasattr()
    -        if "_callback_server" not in gw.__dict__ or gw._callback_server is None:
    -            gw.callback_server_parameters.eager_load = True
    -            gw.callback_server_parameters.daemonize = True
    -            gw.callback_server_parameters.daemonize_connections = True
    -            gw.callback_server_parameters.port = 0
    -            gw.start_callback_server(gw.callback_server_parameters)
    -            cbport = gw._callback_server.server_socket.getsockname()[1]
    -            gw._callback_server.port = cbport
    -            # gateway with real port
    -            gw._python_proxy_port = gw._callback_server.port
    -            # get the GatewayServer object in JVM by ID
    -            jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client)
    --- End diff --
    
    Nit: we could remove this import in this file though.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #92005 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92005/testReport)** for PR 21571 at commit [`9062fb9`](https://github.com/apache/spark/commit/9062fb9053b67d59a1f2357adc28a705bf9ba713).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [SPARK-24565][SS] Add API for in Structured Strea...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r195917011
  
    --- Diff: python/pyspark/sql/streaming.py ---
    @@ -1016,6 +1018,35 @@ def func_with_open_process_close(partition_id, iterator):
             self._jwrite.foreach(jForeachWriter)
             return self
     
    +    @since(2.4)
    +    def foreachBatch(self, func):
    +        """
    +        Sets the output of the streaming query to be processed using the provided
    +        function. This is supported only the in the micro-batch execution modes (that is, when the
    +        trigger is not continuous). In every micro-batch, the provided function will be called in
    +        every micro-batch with (i) the output rows as a DataFrame and (ii) the batch identifier.
    +        The batchId can be used deduplicate and transactionally write the output
    +        (that is, the provided Dataset) to external systems. The output DataFrame is guaranteed
    +        to exactly same for the same batchId (assuming all operations are deterministic in the
    +        query).
    +
    +        .. note:: Evolving.
    +
    +        >>> def func(batch_df, batch_id):
    +        ...     batch_df.collect()
    +        ...
    +        >>> writer = sdf.writeStream.foreach(func)
    +        """
    +
    +        from pyspark.java_gateway import ensure_callback_server_started
    +        gw = self._spark._sc._gateway
    +        java_import(gw.jvm, "org.apache.spark.sql.execution.streaming.sources.*")
    +
    +        wrapped_func = ForeachBatchFunction(self._spark, func)
    +        gw.jvm.PythonForeachBatchHelper.callForeachBatch(self._jwrite, wrapped_func)
    +        ensure_callback_server_started(gw)
    --- End diff --
    
    This is not possible because the callback from JVM ForeachBatch sink to Python is made ONLY after the query is started.  And the query cannot be started until this foreach() method finishes and start() is called.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/270/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    LGTM. Merging to master.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #91873 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91873/testReport)** for PR 21571 at commit [`3b7b20d`](https://github.com/apache/spark/commit/3b7b20d5c1b662abd935c0a812d9e54f8ab01b24).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class GBTClassificationModel(TreeEnsembleModel, JavaClassificationModel, JavaMLWritable,`
      * `class PrefixSpan(JavaParams):`
      * `public class MaskExpressionsUtils `
      * `case class ArrayRemove(left: Expression, right: Expression)`
      * `trait MaskLike `
      * `trait MaskLikeWithN extends MaskLike `
      * `case class Mask(child: Expression, upper: String, lower: String, digit: String)`
      * `case class MaskFirstN(`
      * `case class MaskLastN(`
      * `case class MaskShowFirstN(`
      * `case class MaskShowLastN(`
      * `case class MaskHash(child: Expression)`
      * `abstract class FileFormatDataWriter(`
      * `class EmptyDirectoryDataWriter(`
      * `class SingleDirectoryDataWriter(`
      * `class DynamicPartitionDataWriter(`
      * `class WriteJobDescription(`
      * `case class WriteTaskResult(commitMsg: TaskCommitMessage, summary: ExecutedWriteSummary)`
      * `case class ExecutedWriteSummary(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92005/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91938/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/211/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4165/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/243/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91897/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [SPARK-24565][SS] Add API for in Structured Strea...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r196625816
  
    --- Diff: python/pyspark/sql/streaming.py ---
    @@ -1016,6 +1018,35 @@ def func_with_open_process_close(partition_id, iterator):
             self._jwrite.foreach(jForeachWriter)
             return self
     
    +    @since(2.4)
    +    def foreachBatch(self, func):
    +        """
    +        Sets the output of the streaming query to be processed using the provided
    +        function. This is supported only the in the micro-batch execution modes (that is, when the
    +        trigger is not continuous). In every micro-batch, the provided function will be called in
    +        every micro-batch with (i) the output rows as a DataFrame and (ii) the batch identifier.
    +        The batchId can be used deduplicate and transactionally write the output
    +        (that is, the provided Dataset) to external systems. The output DataFrame is guaranteed
    +        to exactly same for the same batchId (assuming all operations are deterministic in the
    +        query).
    +
    +        .. note:: Evolving.
    +
    +        >>> def func(batch_df, batch_id):
    +        ...     batch_df.collect()
    +        ...
    +        >>> writer = sdf.writeStream.foreach(func)
    +        """
    +
    +        from pyspark.java_gateway import ensure_callback_server_started
    +        gw = self._spark._sc._gateway
    +        java_import(gw.jvm, "org.apache.spark.sql.execution.streaming.sources.*")
    +
    +        wrapped_func = ForeachBatchFunction(self._spark, func)
    +        gw.jvm.PythonForeachBatchHelper.callForeachBatch(self._jwrite, wrapped_func)
    +        ensure_callback_server_started(gw)
    --- End diff --
    
    I am sorry if I'm mistaken but can't we still put this above? Looks weird we ensure the callback server at the end.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Seems fine to me. Left one question but not a big deal.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4073/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91873/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/260/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #91897 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91897/testReport)** for PR 21571 at commit [`687402c`](https://github.com/apache/spark/commit/687402c1d0c84207ff06cf56b3a726ea8104e0fc).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [WIP][SPARK-24565][SS] Add API for in Structured ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r195597031
  
    --- Diff: python/pyspark/sql/streaming.py ---
    @@ -854,6 +856,20 @@ def trigger(self, processingTime=None, once=None, continuous=None):
             self._jwrite = self._jwrite.trigger(jTrigger)
             return self
     
    +    def foreachBatch(self, func):
    --- End diff --
    
    add docs


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91940/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    jenkins retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Jenkins retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    @zsxwing @HyukjinKwon @HeartSaVioR @JoshRosen 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91994/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4155/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4102/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #92005 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92005/testReport)** for PR 21571 at commit [`9062fb9`](https://github.com/apache/spark/commit/9062fb9053b67d59a1f2357adc28a705bf9ba713).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [SPARK-24565][SS] Add API for in Structured Strea...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r196626934
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -2126,6 +2126,42 @@ class WriterWithNonCallableClose(WithProcess):
             tester.assert_invalid_writer(WriterWithNonCallableClose(),
                                          "'close' in provided object is not callable")
     
    +    def test_streaming_foreachBatch(self):
    +        q = None
    +        collected = dict()
    +
    +        def collectBatch(batch_df, batch_id):
    --- End diff --
    
    collectBatch -> collect_batch per PEP 8.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/159/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #91897 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91897/testReport)** for PR 21571 at commit [`687402c`](https://github.com/apache/spark/commit/687402c1d0c84207ff06cf56b3a726ea8104e0fc).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [SPARK-24565][SS] Add API for in Structured Strea...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r195882373
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---
    @@ -322,6 +338,45 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         this
       }
     
    +  /**
    +   * :: Experimental ::
    +   *
    +   * (Scala-specific) Sets the output of the streaming query to be processed using the provided
    +   * function. This is supported only the in the micro-batch execution modes (that is, when the
    +   * trigger is not continuous). In every micro-batch, the provided function will be called in
    +   * every micro-batch with (i) the output rows as a Dataset and (ii) the batch identifier.
    +   * The batchId can be used deduplicate and transactionally write the output
    +   * (that is, the provided Dataset) to external systems. The output Dataset is guaranteed
    +   * to exactly same for the same batchId (assuming all operations are deterministic in the query).
    +   *
    +   * @since 2.4.0
    +   */
    +  @InterfaceStability.Evolving
    +  def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T] = {
    --- End diff --
    
    goood point.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #91873 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91873/testReport)** for PR 21571 at commit [`3b7b20d`](https://github.com/apache/spark/commit/3b7b20d5c1b662abd935c0a812d9e54f8ab01b24).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [SPARK-24565][SS] Add API for in Structured Strea...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r195861903
  
    --- Diff: python/pyspark/java_gateway.py ---
    @@ -145,3 +145,26 @@ def do_server_auth(conn, auth_secret):
         if reply != "ok":
             conn.close()
             raise Exception("Unexpected reply from iterator server.")
    +
    +
    +def ensure_callback_server_started(gw):
    --- End diff --
    
    This was copied verbatim from python streaming/context.py


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [SPARK-24565][SS] Add API for in Structured Strea...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r195862257
  
    --- Diff: dev/sparktestsupport/modules.py ---
    @@ -389,19 +389,19 @@ def __hash__(self):
             "python/pyspark/sql"
         ],
         python_test_goals=[
    -        "pyspark.sql.types",
    -        "pyspark.sql.context",
    -        "pyspark.sql.session",
    -        "pyspark.sql.conf",
    -        "pyspark.sql.catalog",
    -        "pyspark.sql.column",
    -        "pyspark.sql.dataframe",
    -        "pyspark.sql.group",
    -        "pyspark.sql.functions",
    -        "pyspark.sql.readwriter",
    -        "pyspark.sql.streaming",
    -        "pyspark.sql.udf",
    -        "pyspark.sql.window",
    +        #        "pyspark.sql.types",
    +        #        "pyspark.sql.context",
    +        #        "pyspark.sql.session",
    +        #        "pyspark.sql.conf",
    +        #        "pyspark.sql.catalog",
    +        #        "pyspark.sql.column",
    +        #        "pyspark.sql.dataframe",
    +        #        "pyspark.sql.group",
    +        #        "pyspark.sql.functions",
    +        #        "pyspark.sql.readwriter",
    +        #        "pyspark.sql.streaming",
    +        #        "pyspark.sql.udf",
    +        #        "pyspark.sql.window",
    --- End diff --
    
    temp changes only to speed up local testing. will revert after first round of review.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #91938 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91938/testReport)** for PR 21571 at commit [`e8073ea`](https://github.com/apache/spark/commit/e8073eae4f9c8c833bbc3c3a21bb5f175615b027).
     * This patch **fails PySpark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4051/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4050/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #91982 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91982/testReport)** for PR 21571 at commit [`6f9fdf4`](https://github.com/apache/spark/commit/6f9fdf444aa704ea07e3c6ffc89474e1aeb59b6f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #91994 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91994/testReport)** for PR 21571 at commit [`6f9fdf4`](https://github.com/apache/spark/commit/6f9fdf444aa704ea07e3c6ffc89474e1aeb59b6f).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [WIP][SPARK-24565][SS] Add API for in Structured ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r195596984
  
    --- Diff: dev/sparktestsupport/modules.py ---
    @@ -389,19 +389,19 @@ def __hash__(self):
             "python/pyspark/sql"
         ],
         python_test_goals=[
    -        "pyspark.sql.types",
    -        "pyspark.sql.context",
    -        "pyspark.sql.session",
    -        "pyspark.sql.conf",
    -        "pyspark.sql.catalog",
    -        "pyspark.sql.column",
    -        "pyspark.sql.dataframe",
    -        "pyspark.sql.group",
    -        "pyspark.sql.functions",
    -        "pyspark.sql.readwriter",
    -        "pyspark.sql.streaming",
    -        "pyspark.sql.udf",
    -        "pyspark.sql.window",
    +#        "pyspark.sql.types",
    +#        "pyspark.sql.context",
    +#        "pyspark.sql.session",
    +#        "pyspark.sql.conf",
    +#        "pyspark.sql.catalog",
    +#        "pyspark.sql.column",
    +#        "pyspark.sql.dataframe",
    +#        "pyspark.sql.group",
    +#        "pyspark.sql.functions",
    +#        "pyspark.sql.readwriter",
    +#        "pyspark.sql.streaming",
    +#        "pyspark.sql.udf",
    +#        "pyspark.sql.window",
    --- End diff --
    
    temporary change for faster local testing. will remove before finalizing.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/181/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #92018 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92018/testReport)** for PR 21571 at commit [`5b4252a`](https://github.com/apache/spark/commit/5b4252a8a7c40886bb73201327fff22dc921c3dc).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [SPARK-24565][SS] Add API for in Structured Strea...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r195880511
  
    --- Diff: python/pyspark/sql/utils.py ---
    @@ -62,6 +62,7 @@ def deco(*a, **kw):
             try:
                 return f(*a, **kw)
             except py4j.protocol.Py4JJavaError as e:
    +            print(str(e))
    --- End diff --
    
    nit: remove this


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #91940 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91940/testReport)** for PR 21571 at commit [`0763a44`](https://github.com/apache/spark/commit/0763a44ff24de9ad2285c82954e6f8670f9cbc9b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #91875 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91875/testReport)** for PR 21571 at commit [`985a4fe`](https://github.com/apache/spark/commit/985a4fe017623137b09dee8b84e568514e07e70d).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [WIP][SPARK-24565][SS] Add API for in Structured Streami...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    **[Test build #91875 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91875/testReport)** for PR 21571 at commit [`985a4fe`](https://github.com/apache/spark/commit/985a4fe017623137b09dee8b84e568514e07e70d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [SPARK-24565][SS] Add API for in Structured Strea...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r195875712
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---
    @@ -322,6 +338,45 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         this
       }
     
    +  /**
    +   * :: Experimental ::
    +   *
    +   * (Scala-specific) Sets the output of the streaming query to be processed using the provided
    +   * function. This is supported only the in the micro-batch execution modes (that is, when the
    +   * trigger is not continuous). In every micro-batch, the provided function will be called in
    +   * every micro-batch with (i) the output rows as a Dataset and (ii) the batch identifier.
    +   * The batchId can be used deduplicate and transactionally write the output
    +   * (that is, the provided Dataset) to external systems. The output Dataset is guaranteed
    +   * to exactly same for the same batchId (assuming all operations are deterministic in the query).
    +   *
    +   * @since 2.4.0
    +   */
    +  @InterfaceStability.Evolving
    +  def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T] = {
    --- End diff --
    
    it's unclear that only can one of `foreachBatch` and `foreach` be set. Reading from the doc, the user may think he can set both of them. Maybe we should disallow this case?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [SPARK-24565][SS] Add API for in Structured Strea...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r195917161
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---
    @@ -322,6 +338,45 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         this
       }
     
    +  /**
    +   * :: Experimental ::
    +   *
    +   * (Scala-specific) Sets the output of the streaming query to be processed using the provided
    +   * function. This is supported only the in the micro-batch execution modes (that is, when the
    +   * trigger is not continuous). In every micro-batch, the provided function will be called in
    +   * every micro-batch with (i) the output rows as a Dataset and (ii) the batch identifier.
    +   * The batchId can be used deduplicate and transactionally write the output
    +   * (that is, the provided Dataset) to external systems. The output Dataset is guaranteed
    +   * to exactly same for the same batchId (assuming all operations are deterministic in the query).
    +   *
    +   * @since 2.4.0
    +   */
    +  @InterfaceStability.Evolving
    +  def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T] = {
    --- End diff --
    
    Well... that is an existing problem because one can write the following confusion code
    ```
    df.writeStream.format("kafka").foreach(...).start()
    ```
    This will execute the foreach but it looks confusing nonetheless.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [SPARK-24565][SS] Add API for in Structured Strea...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21571


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [WIP][SPARK-24565][SS] Add API for in Structured ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r195597051
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -269,6 +269,7 @@ def test_struct_field_type_name(self):
             struct_field = StructField("a", IntegerType())
             self.assertRaises(TypeError, struct_field.typeName)
     
    +'''
    --- End diff --
    
    temporary change for faster local testing. will remove before finalizing.
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21571: [SPARK-24565][SS] Add API for in Structured Strea...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r195878970
  
    --- Diff: python/pyspark/sql/streaming.py ---
    @@ -1016,6 +1018,35 @@ def func_with_open_process_close(partition_id, iterator):
             self._jwrite.foreach(jForeachWriter)
             return self
     
    +    @since(2.4)
    +    def foreachBatch(self, func):
    +        """
    +        Sets the output of the streaming query to be processed using the provided
    +        function. This is supported only the in the micro-batch execution modes (that is, when the
    +        trigger is not continuous). In every micro-batch, the provided function will be called in
    +        every micro-batch with (i) the output rows as a DataFrame and (ii) the batch identifier.
    +        The batchId can be used deduplicate and transactionally write the output
    +        (that is, the provided Dataset) to external systems. The output DataFrame is guaranteed
    +        to exactly same for the same batchId (assuming all operations are deterministic in the
    +        query).
    +
    +        .. note:: Evolving.
    +
    +        >>> def func(batch_df, batch_id):
    +        ...     batch_df.collect()
    +        ...
    +        >>> writer = sdf.writeStream.foreach(func)
    +        """
    +
    +        from pyspark.java_gateway import ensure_callback_server_started
    +        gw = self._spark._sc._gateway
    +        java_import(gw.jvm, "org.apache.spark.sql.execution.streaming.sources.*")
    +
    +        wrapped_func = ForeachBatchFunction(self._spark, func)
    +        gw.jvm.PythonForeachBatchHelper.callForeachBatch(self._jwrite, wrapped_func)
    +        ensure_callback_server_started(gw)
    --- End diff --
    
    this should be above otherwise there is a race that the streaming query calls this python func before the callback server is started.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21571: [SPARK-24565][SS] Add API for in Structured Streaming fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21571
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/252/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org