You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by zsxwing <gi...@git.apache.org> on 2016/05/27 00:23:32 UTC

[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

GitHub user zsxwing opened a pull request:

    https://github.com/apache/spark/pull/13342

    [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery

    ## What changes were proposed in this pull request?
    
    * Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery
      * ForeachWriter is the interface for the user to consume partitions of data
    * Add a type parameter T to DataFrameWriter
    
    Usage
    ```Scala
    val ds = spark.read....stream().as[String]
    ds.....write
             .queryName(...)
            .option("checkpointLocation", ...)
            .foreach(new ForeachWriter[Int] {
              override def open(version: Long): Unit = {
                 // prepare some resources for a partition
              }
    
              override def process(value: Int): Unit = {
                  // process data
              }
    
              override def close(): Unit = {
                 // release resources for a partition
              }
            })
    ```
    
    ## How was this patch tested?
    
    New unit tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zsxwing/spark foreach

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/13342.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #13342
    
----
commit cda6ca329ec334083f5f2d0ac9b4e6fe76f0587d
Author: Shixiong Zhu <sh...@databricks.com>
Date:   2016-05-27T00:16:31Z

    Add DataFrameWriter.foreach to allow the user consuming data in ContinuousQuery

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #59713 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59713/consoleFull)** for PR 13342 at commit [`80c06bd`](https://github.com/apache/spark/commit/80c06bd8b84070aaa26dba41ff1cd3fb42f5986b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64955801
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]]. Each partition will use a new
    + * deserialized instance, so you usually should do the initialization work in the `open` method.
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    +
    +  /**
    +   * Called when starting to process one partition of new data in the executor side. `version` is
    +   * for data deduplication. When recovering from a failure, some data may be processed twice. But
    +   * it's guarantee that they will be opened with the same "version".
    +   *
    +   * If this method finds this is a partition from a duplicated data set, it can return `false` to
    +   * skip the further data processing. However, `close` still will be called for cleaning up
    +   * resources.
    +   *
    +   * @param version a unique id for data deduplication.
    +   * @return a flat that indicates if the data should be processed.
    +   */
    +  def open(version: Long): Boolean
    +
    +  /**
    +   * Called to process the data in the executor side.
    +   */
    +  def process(value: T): Unit
    +
    +  /**
    +   * Called when stopping to process one partition of new data in the executor side.
    +   *
    +   * @param isFailed Whether any error is thrown during processing data.
    +   * @param error The error thrown during processing data. if `isFailed` is `true`. Otherwise, it's
    +   *              undefined.
    +   */
    +  def close(isFailed: Boolean, error: Throwable): Unit
    --- End diff --
    
    Why not use one parameter and just pass null though.  Basically, there is redundant information in this API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/60078/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222230425
  
    **[Test build #59511 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59511/consoleFull)** for PR 13342 at commit [`427f900`](https://github.com/apache/spark/commit/427f9003c218d7ee6baad6abadf56ba827432969).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222035135
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59438/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66532767
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.sql.streaming.ContinuousQuery
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]]. Each partition will use a new
    + * deserialized instance, so you usually should do the initialization work in the `open` method.
    --- End diff --
    
    usually should --> should do all the initialization (e.g. opening a connection or initiating a transaction) in the `open` method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #60260 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/60260/consoleFull)** for PR 13342 at commit [`2f2e9b3`](https://github.com/apache/spark/commit/2f2e9b3f322a7b49de81b9ad15e98f50b6ff58b4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64844881
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -358,6 +360,35 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * :: Experimental ::
    +   * Starts the execution of the streaming query, which will continually send results to the given
    +   * [[ForeachWriter]] as new data arrives. The returned [[ContinuousQuery]] object can be used to
    +   * interact with the stream.
    +   *
    +   * @since 2.0.0
    +   */
    +  @Experimental
    +  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
    +    val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
    +    val checkpointLocation = extraOptions.get("checkpointLocation")
    +      .orElse {
    +        df.sparkSession.sessionState.conf.checkpointLocation.map { l =>
    +          new Path(l, queryName).toUri.toString
    +        }
    +      }.getOrElse {
    --- End diff --
    
    I think similar to memorysink, the user should not have to specify checkpoint location. 
    
    Also could you consolidate the code for finding checkpoint location across memory sink, file sink and foreach sink. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64843903
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]].
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    +
    +  /**
    +   * Called when starting to process one partition of new data in the executor side. `version` is
    +   * for data deduplication. When recovering from a failure, some data may be processed twice. But
    +   * it's guarantee that they will be opened with the same "version".
    +   *
    +   * @param version a unique id for data deduplication.
    +   */
    +  def open(version: Long): Unit
    +
    +  /**
    +   * Called to process the data in the executor side.
    +   */
    +  def process(value: T): Unit
    +
    +  /**
    +   * Called when stopping to process one partition of new data in the executor side.
    +   */
    +  def close(): Unit
    --- End diff --
    
    How do I implement clean up logic if there is any error. I think the close() method needs to take parameter that signifies, whether closing with error or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66541375
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -401,6 +383,91 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * :: Experimental ::
    +   * Starts the execution of the streaming query, which will continually send results to the given
    +   * [[ForeachWriter]] as as new data arrives. The [[ForeachWriter]] can be used to send the data
    +   * generated by the [[DataFrame]]/[[Dataset]] to an external system. The returned The returned
    +   * [[ContinuousQuery]] object can be used to interact with the stream.
    +   *
    +   * Scala example:
    +   * {{{
    +   *   datasetOfString.write.foreach(new ForeachWriter[String] {
    +   *     def open(partitionId: Long, version: Long): Boolean = {
    +  *        // open connection
    +   *     }
    +   *     def process(record: String) = {
    --- End diff --
    
    add blank line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66558301
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -232,7 +234,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
        * @since 1.4.0
        */
       @scala.annotation.varargs
    -  def partitionBy(colNames: String*): DataFrameWriter = {
    +  def partitionBy(colNames: String*): this.type = {
    --- End diff --
    
    sorry i didnt notice this earlier but why is this suddenly `this.type` and not `DataFrameWriter[T]` like other methods. Alternatively, why are we using `this.type` everywhere?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #60254 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/60254/consoleFull)** for PR 13342 at commit [`42286b0`](https://github.com/apache/spark/commit/42286b0a381cce2461d966d2dbad38b376a73b42).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #59713 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59713/consoleFull)** for PR 13342 at commit [`80c06bd`](https://github.com/apache/spark/commit/80c06bd8b84070aaa26dba41ff1cd3fb42f5986b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66545305
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala ---
    @@ -572,4 +572,25 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
     
         cq.awaitTermination(2000L)
       }
    +
    +  test("foreach") {
    +    import testImplicits._
    +
    +    val ds = spark.read
    +      .format("org.apache.spark.sql.streaming.test")
    +      .stream()
    +      .as[Int]
    +
    +    val cq = ds.write
    +      .format("console")
    +      .option("checkpointLocation", newMetadataDir)
    +      .trigger(ProcessingTime(2.seconds))
    +      .foreach(new ForeachWriter[Int] {
    +        override def open(partitionId: Long, version: Long): Boolean = true
    +        override def process(value: Int): Unit = {}
    +        override def close(errorOrNull: Throwable): Unit = {}
    +      })
    +
    +    cq.awaitTermination(2000L)
    --- End diff --
    
    I'm going to remove this one as it should be covered in ForeachSinkSuite


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64846219
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ForeachSinkSuite.scala ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import java.util.concurrent.ConcurrentLinkedQueue
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.TaskContext
    +import org.apache.spark.sql.execution.streaming.MemoryStream
    +import org.apache.spark.sql.test.SharedSQLContext
    +
    +class ForeachSinkSuite extends StreamTest with SharedSQLContext {
    --- End diff --
    
    move into some streaming pakcage? If there isn't a streaming package, name it StreamingForeachWriter ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66533244
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -401,6 +381,53 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * :: Experimental ::
    +   * Starts the execution of the streaming query, which will continually send results to the given
    +   * [[ForeachWriter]] as new data arrives. The returned [[ContinuousQuery]] object can be used to
    +   * interact with the stream.
    +   *
    +   * @since 2.0.0
    +   */
    +  @Experimental
    +  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
    +    assertNotBucketed("foreach")
    +    assertStreaming(
    +      "foreach() on streaming Datasets and DataFrames can only be called on continuous queries")
    --- End diff --
    
    foreach() can only be called on streaming Datasets/DataFrames.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66543397
  
    --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---
    @@ -238,7 +238,8 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet
           shuffleLeft: Boolean,
           shuffleRight: Boolean): Unit = {
         withTable("bucketed_table1", "bucketed_table2") {
    -      def withBucket(writer: DataFrameWriter, bucketSpec: Option[BucketSpec]): DataFrameWriter = {
    +      def withBucket(writer: DataFrameWriter[Row], bucketSpec: Option[BucketSpec]):
    +        DataFrameWriter[Row] = {
    --- End diff --
    
    isnt the better syntax 
    ```
    def withBucket(
       writer
       bucketSpec): DataFrameWriter[Row] = {
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64955520
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]]. Each partition will use a new
    + * deserialized instance, so you usually should do the initialization work in the `open` method.
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    +
    +  /**
    +   * Called when starting to process one partition of new data in the executor side. `version` is
    +   * for data deduplication. When recovering from a failure, some data may be processed twice. But
    +   * it's guarantee that they will be opened with the same "version".
    +   *
    +   * If this method finds this is a partition from a duplicated data set, it can return `false` to
    +   * skip the further data processing. However, `close` still will be called for cleaning up
    +   * resources.
    +   *
    +   * @param version a unique id for data deduplication.
    +   * @return a flat that indicates if the data should be processed.
    +   */
    +  def open(version: Long): Boolean
    +
    +  /**
    +   * Called to process the data in the executor side.
    +   */
    +  def process(value: T): Unit
    +
    +  /**
    +   * Called when stopping to process one partition of new data in the executor side.
    +   *
    +   * @param isFailed Whether any error is thrown during processing data.
    +   * @param error The error thrown during processing data. if `isFailed` is `true`. Otherwise, it's
    +   *              undefined.
    +   */
    +  def close(isFailed: Boolean, error: Throwable): Unit
    --- End diff --
    
    > Its kind of weird that we use Option in the listener but not here? We should decide if thats okay for public APIs or not and be consistent.
    
    It's not `Option` because we don't use `Option` in a Java API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66541245
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.ConcurrentLinkedQueue
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.ForeachWriter
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +
    +class ForeachSinkSuite extends StreamTest with SharedSQLContext {
    +
    +  import testImplicits._
    +
    +  test("foreach") {
    +    ForeachWriterEvent.clear()
    +    withTempDir { checkpointDir =>
    +      val input = MemoryStream[Int]
    +      val query = input.toDS().repartition(2).write
    +        .option("checkpointLocation", checkpointDir.getCanonicalPath)
    +        .foreach(new ForeachWriter[Int] {
    +
    +          private val events = mutable.ArrayBuffer[ForeachWriterEvent.Event]()
    +
    +          override def open(partitionId: Long, version: Long): Boolean = {
    +            events += ForeachWriterEvent.Open(partition = partitionId, version = version)
    +            true
    +          }
    +
    +          override def process(value: Int): Unit = {
    +            events += ForeachWriterEvent.Process(value)
    +          }
    +
    +          override def close(errorOrNull: Throwable): Unit = {
    +            events += ForeachWriterEvent.Close(error = Option(errorOrNull))
    +            ForeachWriterEvent.addEvents(events)
    +          }
    +        })
    +      input.addData(1, 2, 3, 4)
    +      query.processAllAvailable()
    +
    +      val expectedEventsForPartition0 = Seq(
    +        ForeachWriterEvent.Open(partition = 0, version = 0),
    +        ForeachWriterEvent.Process(value = 1),
    +        ForeachWriterEvent.Process(value = 3),
    +        ForeachWriterEvent.Close(None)
    +      )
    +      val expectedEventsForPartition1 = Seq(
    +        ForeachWriterEvent.Open(partition = 1, version = 0),
    +        ForeachWriterEvent.Process(value = 2),
    +        ForeachWriterEvent.Process(value = 4),
    +        ForeachWriterEvent.Close(None)
    +      )
    +
    +      val allEvents = ForeachWriterEvent.allEvents()
    +      assert(allEvents.size === 2)
    +      assert {
    +        allEvents === Seq(expectedEventsForPartition0, expectedEventsForPartition1) ||
    +          allEvents === Seq(expectedEventsForPartition1, expectedEventsForPartition0)
    +      }
    +      query.stop()
    +    }
    +  }
    +
    +  test("foreach error") {
    +    ForeachWriterEvent.clear()
    +    withTempDir { checkpointDir =>
    +      val input = MemoryStream[Int]
    +      val query = input.toDS().repartition(1).write
    +        .option("checkpointLocation", checkpointDir.getCanonicalPath)
    +        .foreach(new ForeachWriter[Int] {
    +
    +          private val events = mutable.ArrayBuffer[ForeachWriterEvent.Event]()
    +
    +          private var currentPartitionId = -1L
    +
    +          override def open(partitionId: Long, version: Long): Boolean = {
    +            currentPartitionId = partitionId
    +            events += ForeachWriterEvent.Open(partition = partitionId, version = version)
    +            true
    +          }
    +
    +          override def process(value: Int): Unit = {
    +            events += ForeachWriterEvent.Process(value)
    +            throw new RuntimeException("error")
    +          }
    +
    +          override def close(errorOrNull: Throwable): Unit = {
    +            events += ForeachWriterEvent.Close(error = Option(errorOrNull))
    +            ForeachWriterEvent.addEvents(events)
    +          }
    +        })
    +      input.addData(1, 2, 3, 4)
    +      query.processAllAvailable()
    +
    +      val allEvents = ForeachWriterEvent.allEvents()
    +      assert(allEvents.size === 1)
    +      assert(allEvents(0)(0) === ForeachWriterEvent.Open(partition = 0, version = 0))
    +      assert(allEvents(0)(1) ===  ForeachWriterEvent.Process(value = 1))
    +      val errorEvent = allEvents(0)(2).asInstanceOf[ForeachWriterEvent.Close]
    +      assert(errorEvent.error.get.isInstanceOf[RuntimeException])
    +      assert(errorEvent.error.get.getMessage === "error")
    +      query.stop()
    +    }
    +  }
    +}
    +
    +/** A global object to collect events in the executor side */
    +object ForeachWriterEvent {
    --- End diff --
    
    private 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #59757 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59757/consoleFull)** for PR 13342 at commit [`d2b19b7`](https://github.com/apache/spark/commit/d2b19b72f5a5882641d80a6b8ffcf4fdf9ab974c).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class DefaultSource extends StreamSourceProvider with StreamSinkProvider `
      * `class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter `
      * `class ForeachSinkSuite extends StreamTest with SharedSQLContext `
      * `  case class Open(partition: Long, version: Long) extends Event`
      * `  case class Process[T](value: T) extends Event`
      * `  case class Close(error: Option[Throwable]) extends Event`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66543331
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala ---
    @@ -572,4 +572,25 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
     
         cq.awaitTermination(2000L)
       }
    +
    +  test("foreach") {
    +    import testImplicits._
    +
    +    val ds = spark.read
    +      .format("org.apache.spark.sql.streaming.test")
    +      .stream()
    +      .as[Int]
    +
    +    val cq = ds.write
    +      .format("console")
    +      .option("checkpointLocation", newMetadataDir)
    +      .trigger(ProcessingTime(2.seconds))
    +      .foreach(new ForeachWriter[Int] {
    +        override def open(partitionId: Long, version: Long): Boolean = true
    +        override def process(value: Int): Unit = {}
    +        override def close(errorOrNull: Throwable): Unit = {}
    +      })
    +
    +    cq.awaitTermination(2000L)
    --- End diff --
    
    what does this test? whether it started correctly? Should it not set a global variable to make sure that this actually works rather than implicitly depending on behavior of awaitTermination?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222037191
  
    Its fine that we dont implement it in 2.0, it would be nice to have a abstract idea how it can be done. Ideally, the python way and the scala/java way should look similar. 
    
    Possible solution 1: Python will also have a class ForeachWriter, and a Java wrapper ForeachWriter will call the Python ForeachWriter.
    Possible solution 2: Similar to udfs, some of the registration by name, where Java writer can be registered and referred to from Python. 
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #60078 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/60078/consoleFull)** for PR 13342 at commit [`6279486`](https://github.com/apache/spark/commit/6279486eca263bccc13284df8e1d9e6d286bf7a5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #60254 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/60254/consoleFull)** for PR 13342 at commit [`42286b0`](https://github.com/apache/spark/commit/42286b0a381cce2461d966d2dbad38b376a73b42).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #59763 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59763/consoleFull)** for PR 13342 at commit [`56bf805`](https://github.com/apache/spark/commit/56bf805f3100b7af5dea96ecedb65aff5029c253).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66534481
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.sql.streaming.ContinuousQuery
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]]. Each partition will use a new
    + * deserialized instance, so you usually should do the initialization work in the `open` method.
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +abstract class ForeachWriter[T] extends Serializable {
    +
    +  /**
    +   * Called when starting to process one partition of new data in the executor side. `version` is
    +   * for data deduplication. When recovering from a failure, some data may be processed twice. But
    +   * it's guarantee that they will be opened with the same "version".
    +   *
    +   * If this method finds this is a partition from a duplicated data set, it can return `false` to
    +   * skip the further data processing. However, `close` still will be called for cleaning up
    +   * resources.
    +   *
    +   * @param partitionId the partition id.
    +   * @param version a unique id for data deduplication.
    +   * @return a flat that indicates if the data should be processed.
    --- End diff --
    
    return true if the corresponding partition and version id should be processed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59768/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #59763 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59763/consoleFull)** for PR 13342 at commit [`56bf805`](https://github.com/apache/spark/commit/56bf805f3100b7af5dea96ecedb65aff5029c253).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/60260/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59757/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #60077 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/60077/consoleFull)** for PR 13342 at commit [`529f3d4`](https://github.com/apache/spark/commit/529f3d4f9c8edffe1ec44e104892d6fb861ff305).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #60274 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/60274/consoleFull)** for PR 13342 at commit [`8cb7aa5`](https://github.com/apache/spark/commit/8cb7aa5593e0d845eaa92799f43792c5c9f1c02a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    LGTM, except a few minor nits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66533645
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -401,6 +381,53 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * :: Experimental ::
    +   * Starts the execution of the streaming query, which will continually send results to the given
    +   * [[ForeachWriter]] as new data arrives. The returned [[ContinuousQuery]] object can be used to
    --- End diff --
    
    ...as new data arrives. The [[ForeachWriter]] can be used to send the data generated by the DataFrame/Dataset to an external system. The returned...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222229325
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64845888
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -358,6 +360,35 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * :: Experimental ::
    +   * Starts the execution of the streaming query, which will continually send results to the given
    +   * [[ForeachWriter]] as new data arrives. The returned [[ContinuousQuery]] object can be used to
    +   * interact with the stream.
    +   *
    +   * @since 2.0.0
    +   */
    +  @Experimental
    +  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
    +    val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
    +    val checkpointLocation = extraOptions.get("checkpointLocation")
    +      .orElse {
    +        df.sparkSession.sessionState.conf.checkpointLocation.map { l =>
    +          new Path(l, queryName).toUri.toString
    +        }
    +      }.getOrElse {
    +      throw new AnalysisException("checkpointLocation must be specified either " +
    +        "through option() or SQLConf")
    +    }
    +
    +    df.sparkSession.sessionState.continuousQueryManager.startQuery(
    --- End diff --
    
    We have to clean the closure of the Writer and check for its serializability. Otherwise it might capture unexpected stuff in the closure and fail later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #59757 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59757/consoleFull)** for PR 13342 at commit [`d2b19b7`](https://github.com/apache/spark/commit/d2b19b72f5a5882641d80a6b8ffcf4fdf9ab974c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66537635
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.sql.streaming.ContinuousQuery
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]]. Each partition will use a new
    + * deserialized instance, so you usually should do the initialization work in the `open` method.
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +abstract class ForeachWriter[T] extends Serializable {
    +
    +  /**
    +   * Called when starting to process one partition of new data in the executor side. `version` is
    +   * for data deduplication. When recovering from a failure, some data may be processed twice. But
    +   * it's guarantee that they will be opened with the same "version".
    +   *
    +   * If this method finds this is a partition from a duplicated data set, it can return `false` to
    +   * skip the further data processing. However, `close` still will be called for cleaning up
    +   * resources.
    +   *
    +   * @param partitionId the partition id.
    +   * @param version a unique id for data deduplication.
    +   * @return a flat that indicates if the data should be processed.
    +   */
    +  def open(partitionId: Long, version: Long): Boolean
    +
    +  /**
    +   * Called to process the data in the executor side.
    --- End diff --
    
    Also say, 
    This method will be called only when open returns true.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/60254/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222035110
  
    **[Test build #59438 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59438/consoleFull)** for PR 13342 at commit [`cda6ca3`](https://github.com/apache/spark/commit/cda6ca329ec334083f5f2d0ac9b4e6fe76f0587d).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait ForeachWriter[T] extends Serializable `
      * `class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66541011
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.ConcurrentLinkedQueue
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.ForeachWriter
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +
    +class ForeachSinkSuite extends StreamTest with SharedSQLContext {
    +
    +  import testImplicits._
    +
    +  test("foreach") {
    +    ForeachWriterEvent.clear()
    +    withTempDir { checkpointDir =>
    --- End diff --
    
    test failures will not stop the query. might lead to cascading failures. adding a `after  {  ... } ` to stop all queries may be a good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/60077/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66541219
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.ConcurrentLinkedQueue
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.ForeachWriter
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +
    +class ForeachSinkSuite extends StreamTest with SharedSQLContext {
    +
    +  import testImplicits._
    +
    +  test("foreach") {
    +    ForeachWriterEvent.clear()
    +    withTempDir { checkpointDir =>
    +      val input = MemoryStream[Int]
    +      val query = input.toDS().repartition(2).write
    +        .option("checkpointLocation", checkpointDir.getCanonicalPath)
    +        .foreach(new ForeachWriter[Int] {
    +
    +          private val events = mutable.ArrayBuffer[ForeachWriterEvent.Event]()
    +
    +          override def open(partitionId: Long, version: Long): Boolean = {
    +            events += ForeachWriterEvent.Open(partition = partitionId, version = version)
    +            true
    +          }
    +
    +          override def process(value: Int): Unit = {
    +            events += ForeachWriterEvent.Process(value)
    +          }
    +
    +          override def close(errorOrNull: Throwable): Unit = {
    +            events += ForeachWriterEvent.Close(error = Option(errorOrNull))
    +            ForeachWriterEvent.addEvents(events)
    +          }
    +        })
    +      input.addData(1, 2, 3, 4)
    +      query.processAllAvailable()
    +
    +      val expectedEventsForPartition0 = Seq(
    +        ForeachWriterEvent.Open(partition = 0, version = 0),
    +        ForeachWriterEvent.Process(value = 1),
    +        ForeachWriterEvent.Process(value = 3),
    +        ForeachWriterEvent.Close(None)
    +      )
    +      val expectedEventsForPartition1 = Seq(
    +        ForeachWriterEvent.Open(partition = 1, version = 0),
    +        ForeachWriterEvent.Process(value = 2),
    +        ForeachWriterEvent.Process(value = 4),
    +        ForeachWriterEvent.Close(None)
    +      )
    +
    +      val allEvents = ForeachWriterEvent.allEvents()
    +      assert(allEvents.size === 2)
    +      assert {
    +        allEvents === Seq(expectedEventsForPartition0, expectedEventsForPartition1) ||
    +          allEvents === Seq(expectedEventsForPartition1, expectedEventsForPartition0)
    +      }
    +      query.stop()
    +    }
    +  }
    +
    +  test("foreach error") {
    +    ForeachWriterEvent.clear()
    +    withTempDir { checkpointDir =>
    +      val input = MemoryStream[Int]
    +      val query = input.toDS().repartition(1).write
    +        .option("checkpointLocation", checkpointDir.getCanonicalPath)
    +        .foreach(new ForeachWriter[Int] {
    +
    +          private val events = mutable.ArrayBuffer[ForeachWriterEvent.Event]()
    +
    +          private var currentPartitionId = -1L
    --- End diff --
    
    Not used anywhere. 
    Cant this writer be deduped?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #60274 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/60274/consoleFull)** for PR 13342 at commit [`8cb7aa5`](https://github.com/apache/spark/commit/8cb7aa5593e0d845eaa92799f43792c5c9f1c02a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222247009
  
    **[Test build #59511 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59511/consoleFull)** for PR 13342 at commit [`427f900`](https://github.com/apache/spark/commit/427f9003c218d7ee6baad6abadf56ba827432969).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #59703 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59703/consoleFull)** for PR 13342 at commit [`ada4422`](https://github.com/apache/spark/commit/ada44222a4db4d68809cfcd4d645e72797c6f45e).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222035133
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66541111
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.ConcurrentLinkedQueue
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.ForeachWriter
    +import org.apache.spark.sql.streaming.StreamTest
    +import org.apache.spark.sql.test.SharedSQLContext
    +
    +class ForeachSinkSuite extends StreamTest with SharedSQLContext {
    +
    +  import testImplicits._
    +
    +  test("foreach") {
    +    ForeachWriterEvent.clear()
    +    withTempDir { checkpointDir =>
    +      val input = MemoryStream[Int]
    +      val query = input.toDS().repartition(2).write
    +        .option("checkpointLocation", checkpointDir.getCanonicalPath)
    +        .foreach(new ForeachWriter[Int] {
    +
    +          private val events = mutable.ArrayBuffer[ForeachWriterEvent.Event]()
    +
    +          override def open(partitionId: Long, version: Long): Boolean = {
    +            events += ForeachWriterEvent.Open(partition = partitionId, version = version)
    +            true
    +          }
    +
    +          override def process(value: Int): Unit = {
    +            events += ForeachWriterEvent.Process(value)
    +          }
    +
    +          override def close(errorOrNull: Throwable): Unit = {
    +            events += ForeachWriterEvent.Close(error = Option(errorOrNull))
    +            ForeachWriterEvent.addEvents(events)
    +          }
    +        })
    +      input.addData(1, 2, 3, 4)
    +      query.processAllAvailable()
    +
    +      val expectedEventsForPartition0 = Seq(
    +        ForeachWriterEvent.Open(partition = 0, version = 0),
    +        ForeachWriterEvent.Process(value = 1),
    +        ForeachWriterEvent.Process(value = 3),
    +        ForeachWriterEvent.Close(None)
    +      )
    +      val expectedEventsForPartition1 = Seq(
    +        ForeachWriterEvent.Open(partition = 1, version = 0),
    +        ForeachWriterEvent.Process(value = 2),
    +        ForeachWriterEvent.Process(value = 4),
    +        ForeachWriterEvent.Close(None)
    +      )
    +
    +      val allEvents = ForeachWriterEvent.allEvents()
    +      assert(allEvents.size === 2)
    +      assert {
    +        allEvents === Seq(expectedEventsForPartition0, expectedEventsForPartition1) ||
    +          allEvents === Seq(expectedEventsForPartition1, expectedEventsForPartition0)
    +      }
    +      query.stop()
    +    }
    +  }
    +
    +  test("foreach error") {
    --- End diff --
    
    nit: foreach with error


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222247242
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59511/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13342
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #59754 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59754/consoleFull)** for PR 13342 at commit [`40b2508`](https://github.com/apache/spark/commit/40b2508a61cedff7a249146aed0de47d7b1d0263).
     * This patch **fails RAT tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ForeachSinkSuite extends StreamTest with SharedSQLContext `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r65412579
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,58 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]]. Each partition will use a new
    + * deserialized instance, so you usually should do the initialization work in the `open` method.
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    +
    +  /**
    +   * Called when starting to process one partition of new data in the executor side. `version` is
    +   * for data deduplication. When recovering from a failure, some data may be processed twice. But
    +   * it's guarantee that they will be opened with the same "version".
    +   *
    +   * If this method finds this is a partition from a duplicated data set, it can return `false` to
    +   * skip the further data processing. However, `close` still will be called for cleaning up
    +   * resources.
    +   *
    +   * @param partitionId the partition id.
    +   * @param version a unique id for data deduplication.
    +   * @return a flat that indicates if the data should be processed.
    +   */
    +  def open(partitionId: Long, version: Long): Boolean
    +
    +  /**
    +   * Called to process the data in the executor side.
    +   */
    +  def process(value: T): Unit
    +
    +  /**
    +   * Called when stopping to process one partition of new data in the executor side.
    --- End diff --
    
    We should be clear here and above that there are several cases where this method will not be called (i.e. the stream dying, the jvm dying, etc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222229040
  
    Updated `ForeachWriter` as per discussion with @tdas. I will add more tests for `open` and `close` once we all agree with the ForeachWriter APIs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222031038
  
    /cc @rxin @marmbrus @tdas 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66538562
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala ---
    @@ -0,0 +1,46 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import org.apache.spark.TaskContext
    +import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
    +
    +class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable {
    --- End diff --
    
    Add some scala docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64844799
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -358,6 +360,35 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * :: Experimental ::
    +   * Starts the execution of the streaming query, which will continually send results to the given
    +   * [[ForeachWriter]] as new data arrives. The returned [[ContinuousQuery]] object can be used to
    +   * interact with the stream.
    +   *
    +   * @since 2.0.0
    +   */
    +  @Experimental
    +  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
    +    val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
    +    val checkpointLocation = extraOptions.get("checkpointLocation")
    +      .orElse {
    +        df.sparkSession.sessionState.conf.checkpointLocation.map { l =>
    +          new Path(l, queryName).toUri.toString
    +        }
    +      }.getOrElse {
    +      throw new AnalysisException("checkpointLocation must be specified either " +
    --- End diff --
    
    incorrect indent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64954717
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]]. Each partition will use a new
    + * deserialized instance, so you usually should do the initialization work in the `open` method.
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    +
    +  /**
    +   * Called when starting to process one partition of new data in the executor side. `version` is
    +   * for data deduplication. When recovering from a failure, some data may be processed twice. But
    +   * it's guarantee that they will be opened with the same "version".
    +   *
    +   * If this method finds this is a partition from a duplicated data set, it can return `false` to
    +   * skip the further data processing. However, `close` still will be called for cleaning up
    +   * resources.
    +   *
    +   * @param version a unique id for data deduplication.
    +   * @return a flat that indicates if the data should be processed.
    +   */
    +  def open(version: Long): Boolean
    +
    +  /**
    +   * Called to process the data in the executor side.
    +   */
    +  def process(value: T): Unit
    +
    +  /**
    +   * Called when stopping to process one partition of new data in the executor side.
    +   *
    +   * @param isFailed Whether any error is thrown during processing data.
    +   * @param error The error thrown during processing data. if `isFailed` is `true`. Otherwise, it's
    +   *              undefined.
    +   */
    +  def close(isFailed: Boolean, error: Throwable): Unit
    --- End diff --
    
    Its kind of weird that we use `Option` in the listener but not here?  We should decide if thats okay for public APIs or not and be consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64845960
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]].
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    +
    +  /**
    +   * Called when starting to process one partition of new data in the executor side. `version` is
    +   * for data deduplication. When recovering from a failure, some data may be processed twice. But
    +   * it's guarantee that they will be opened with the same "version".
    +   *
    +   * @param version a unique id for data deduplication.
    +   */
    +  def open(version: Long): Unit
    --- End diff --
    
    On the executor side, is a single deserialized instance of the ForeachWriter going to be reused? Or is each instance going to be discarded after close is called. 
    
    Its important to define this behavior, and make sure its tested. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222031090
  
    **[Test build #59438 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59438/consoleFull)** for PR 13342 at commit [`cda6ca3`](https://github.com/apache/spark/commit/cda6ca329ec334083f5f2d0ac9b4e6fe76f0587d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/13342


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222036396
  
    Maybe we should do scala/java for now and think about Python later, since it is more complicated to do udfs in python.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59754/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #59768 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59768/consoleFull)** for PR 13342 at commit [`d6bb884`](https://github.com/apache/spark/commit/d6bb884efa85edbe3dd4b377d41b07996c35a45e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66552040
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.sql.streaming.ContinuousQuery
    +
    +/**
    + * :: Experimental ::
    + * A class to consume data generated by a [[ContinuousQuery]]. Typically this is used to send the
    + * generated data to external systems. Each partition will use a new deserialized instance, so you
    + * usually should do all the initialization (e.g. opening a connection or initiating a transaction)
    + * in the open method
    --- End diff --
    
    nit: missing period.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #60260 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/60260/consoleFull)** for PR 13342 at commit [`2f2e9b3`](https://github.com/apache/spark/commit/2f2e9b3f322a7b49de81b9ad15e98f50b6ff58b4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64845561
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]].
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    --- End diff --
    
    @tdas `open` has a `version` parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Merging this to master and 2.0


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66541395
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -401,6 +383,91 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * :: Experimental ::
    +   * Starts the execution of the streaming query, which will continually send results to the given
    +   * [[ForeachWriter]] as as new data arrives. The [[ForeachWriter]] can be used to send the data
    +   * generated by the [[DataFrame]]/[[Dataset]] to an external system. The returned The returned
    +   * [[ContinuousQuery]] object can be used to interact with the stream.
    +   *
    +   * Scala example:
    +   * {{{
    +   *   datasetOfString.write.foreach(new ForeachWriter[String] {
    +   *     def open(partitionId: Long, version: Long): Boolean = {
    +  *        // open connection
    +   *     }
    +   *     def process(record: String) = {
    +  *        // write string to connection
    +   *     }
    +   *     def close(errorOrNull: Throwable): Unit = {
    --- End diff --
    
    check indents.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64845425
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]].
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    --- End diff --
    
    Not really. I want that every time open() is called for some initialization, i want to know where I am in the stream. So that accordingly I can decide whether to consider or ignore the records (as it is a re-execution).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64847013
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]].
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    --- End diff --
    
    I buy that. For that, the Sink interface actually allows driver side stuff, as it is tied to the batch idea. And we can think of making that public later. 
    
    A different idea I discussed with @zsxwing offline is that if the writer can identify that the same version is being rerun again, then it should be able to skip walking through the whole partition. How about `open(version)` returning a boolean to signify whether to process the partition or not?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r65981082
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -401,6 +381,52 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * :: Experimental ::
    +   * Starts the execution of the streaming query, which will continually send results to the given
    +   * [[ForeachWriter]] as new data arrives. The returned [[ContinuousQuery]] object can be used to
    +   * interact with the stream.
    +   *
    +   * @since 2.0.0
    +   */
    +  @Experimental
    +  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
    +    assertNotBucketed()
    +    assertStreaming("startStream() can only be called on continuous queries")
    --- End diff --
    
    startStream() --> foreach() on streaming datasets and dataframes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13342
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59713/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    I'm okay with this interface, though I think we will probably need to add a less batch oriented version in the future.  Can you update the description?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r65412780
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -401,6 +381,52 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * :: Experimental ::
    +   * Starts the execution of the streaming query, which will continually send results to the given
    +   * [[ForeachWriter]] as new data arrives. The returned [[ContinuousQuery]] object can be used to
    +   * interact with the stream.
    +   *
    +   * @since 2.0.0
    +   */
    +  @Experimental
    +  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
    +    assertNotBucketed()
    +    assertStreaming("startStream() can only be called on continuous queries")
    +
    +    val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
    +    val sink = new ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.unresolvedTEncoder)
    +    df.sparkSession.sessionState.continuousQueryManager.startQuery(
    +      queryName,
    +      getCheckpointLocation(queryName, required = false),
    +      df,
    +      sink,
    +      outputMode,
    +      trigger)
    +  }
    +
    +  /**
    +   * Returns the checkpointLocation for a query. If `required` is `ture` but the checkpoint
    --- End diff --
    
    `true`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66532056
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -401,6 +381,53 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * :: Experimental ::
    +   * Starts the execution of the streaming query, which will continually send results to the given
    +   * [[ForeachWriter]] as new data arrives. The returned [[ContinuousQuery]] object can be used to
    +   * interact with the stream.
    +   *
    +   * @since 2.0.0
    +   */
    +  @Experimental
    +  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
    +    assertNotBucketed("foreach")
    +    assertStreaming(
    +      "foreach() on streaming Datasets and DataFrames can only be called on continuous queries")
    +
    +    val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
    +    val sink = new ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc)
    +    df.sparkSession.sessionState.continuousQueryManager.startQuery(
    +      queryName,
    +      getCheckpointLocation(queryName, required = false),
    +      df,
    +      sink,
    +      outputMode,
    +      trigger)
    +  }
    +
    +  /**
    +   * Returns the checkpointLocation for a query. If `required` is `true` but the checkpoint
    +   * location is not set, [[AnalysisException]] will be thrown. If `required` is `false`, a temp
    +   * folder will be created if the checkpoint location is not set.
    +   */
    +  private def getCheckpointLocation(queryName: String, required: Boolean): String = {
    +    extraOptions.get("checkpointLocation").map { userSpecified =>
    +      new Path(userSpecified).toUri.toString
    +    }.orElse {
    +      df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
    +        new Path(location, queryName).toUri.toString
    +      }
    +    }.getOrElse {
    +      if (required) {
    +        throw new AnalysisException("checkpointLocation must be specified either " +
    +          "through option() or SQLConf")
    --- End diff --
    
    `option()` --> `option("checkpointLocation", ...)`
    `SQLConf` --> `sqlContext.conf......` (complete it)
    Makes it easier for the user



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64957648
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]].
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    --- End diff --
    
    I agree that we can't ourselves guarantee exactly once here, but we should at least give the user the tools to implement that themselves.  Kafka has some good thoughts [here](https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer).  In particular I think we are missing several pieces of data:
     - the partition
     - some id for the record (unless we are promising they always come in the same order? and they are responsible for counting on their own.)
    
    It also seems by having open return a boolean we don't have a great mechanism to handle partial failures.  An alternative proposal might be:
    
    ```scala
    trait ForeachWriter[T] {
      def open(pid: Long): Unit
      def process(recId: Long, value: T): Unit
      def close(error: Throwable): Unit
    }
    ```
    
     - `pid` can be a combination of the `partitionId` and the `batchId`, though in the future we could change it to partition + checkpoint id.  The only promise here is that once we call `close` where  `error != null` you have seen all of the tuples that you will ever see for a given `pid` and if we repeat, that set will not change.  This lets you do de-duplication without remembering an id for every record ever seen forever.
     - `recId` needs to be consistent (i.e. always be the same tuple) for any given `pid`.  For now we can enforce this by sorting each partition on all columns, but there are certainly other faster ways we can explore in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64845656
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]].
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    --- End diff --
    
    Oh, I see. Since `open` runs in the executor side, `ForeachWriter` will be created for every partition and it cannot report any progress back to the driver.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66532405
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -401,6 +381,53 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * :: Experimental ::
    +   * Starts the execution of the streaming query, which will continually send results to the given
    +   * [[ForeachWriter]] as new data arrives. The returned [[ContinuousQuery]] object can be used to
    +   * interact with the stream.
    +   *
    +   * @since 2.0.0
    +   */
    +  @Experimental
    +  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
    +    assertNotBucketed("foreach")
    +    assertStreaming(
    +      "foreach() on streaming Datasets and DataFrames can only be called on continuous queries")
    +
    +    val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
    +    val sink = new ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc)
    +    df.sparkSession.sessionState.continuousQueryManager.startQuery(
    +      queryName,
    +      getCheckpointLocation(queryName, required = false),
    +      df,
    +      sink,
    +      outputMode,
    +      trigger)
    +  }
    +
    +  /**
    +   * Returns the checkpointLocation for a query. If `required` is `true` but the checkpoint
    +   * location is not set, [[AnalysisException]] will be thrown. If `required` is `false`, a temp
    +   * folder will be created if the checkpoint location is not set.
    +   */
    +  private def getCheckpointLocation(queryName: String, required: Boolean): String = {
    --- End diff --
    
    The semantics of this method is very confusing. `required` implies that it will throw error if there is not checkpoint location set. Its not intuitive that when it is not required it creates a temp checkpoint dir. Furthermore it creates temp one named on memory. Does not make sense. 
    
    Cleaner for this to return an Option[String] and `required` --> `failIfNotSet'.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #59703 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59703/consoleFull)** for PR 13342 at commit [`ada4422`](https://github.com/apache/spark/commit/ada44222a4db4d68809cfcd4d645e72797c6f45e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59763/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222036087
  
    @zsxwing What about python API? How do we do call ForeachWriter in python API?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/60274/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #60077 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/60077/consoleFull)** for PR 13342 at commit [`529f3d4`](https://github.com/apache/spark/commit/529f3d4f9c8edffe1ec44e104892d6fb861ff305).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `abstract class ForeachWriter[T] extends Serializable `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222229327
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59510/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66551851
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -68,9 +71,9 @@ import org.apache.spark.sql.streaming.ContinuousQuery
     abstract class ForeachWriter[T] extends Serializable {
     
       /**
    -   * Called when starting to process one partition of new data in the executor side. `version` is
    -   * for data deduplication. When recovering from a failure, some data may be processed twice. But
    -   * it's guarantee that they will be opened with the same "version".
    +   * Called when starting to process one partition of new data in the executor. The `version` is
    +   * for data deduplication when there are failures. When recovering from a failure, some data may
    +   * be generated multiple times but they will always have the same version.
        *
        * If this method finds this is a partition from a duplicated data set, it can return `false` to
    --- End diff --
    
    nit: finds using the `partitionId` and `version` that this partition has already been processed, it can return ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64953440
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -358,6 +360,35 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * :: Experimental ::
    +   * Starts the execution of the streaming query, which will continually send results to the given
    +   * [[ForeachWriter]] as new data arrives. The returned [[ContinuousQuery]] object can be used to
    +   * interact with the stream.
    +   *
    +   * @since 2.0.0
    +   */
    +  @Experimental
    +  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
    +    val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
    +    val checkpointLocation = extraOptions.get("checkpointLocation")
    +      .orElse {
    +        df.sparkSession.sessionState.conf.checkpointLocation.map { l =>
    +          new Path(l, queryName).toUri.toString
    +        }
    +      }.getOrElse {
    --- End diff --
    
    Will address this later


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64845746
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]].
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    +
    +  /**
    +   * Called when starting to process one partition of new data in the executor side. `version` is
    +   * for data deduplication. When recovering from a failure, some data may be processed twice. But
    +   * it's guarantee that they will be opened with the same "version".
    +   *
    +   * @param version a unique id for data deduplication.
    +   */
    +  def open(version: Long): Unit
    +
    +  /**
    +   * Called to process the data in the executor side.
    +   */
    +  def process(value: T): Unit
    +
    +  /**
    +   * Called when stopping to process one partition of new data in the executor side.
    +   */
    +  def close(): Unit
    --- End diff --
    
    yea that's a good idea


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66539666
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.sql.streaming.ContinuousQuery
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]]. Each partition will use a new
    + * deserialized instance, so you usually should do the initialization work in the `open` method.
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +abstract class ForeachWriter[T] extends Serializable {
    +
    +  /**
    +   * Called when starting to process one partition of new data in the executor side. `version` is
    +   * for data deduplication. When recovering from a failure, some data may be processed twice. But
    +   * it's guarantee that they will be opened with the same "version".
    +   *
    +   * If this method finds this is a partition from a duplicated data set, it can return `false` to
    +   * skip the further data processing. However, `close` still will be called for cleaning up
    +   * resources.
    +   *
    +   * @param partitionId the partition id.
    +   * @param version a unique id for data deduplication.
    +   * @return a flat that indicates if the data should be processed.
    +   */
    +  def open(partitionId: Long, version: Long): Boolean
    +
    +  /**
    +   * Called to process the data in the executor side.
    +   */
    +  def process(value: T): Unit
    +
    +  /**
    +   * Called when stopping to process one partition of new data in the executor side. This is
    +   * guaranteed to be called when a `Throwable` is thrown during processing data. However,
    +   * `close` won't be called in the following cases:
    +   *  - JVM crashes without throwing a `Throwable`
    +   *  - `open` throws a `Throwable`.
    +   *
    +   * @param errorOrNull the error thrown during processing data or null if nothing is thrown.
    --- End diff --
    
    > This method will be called only if open has returned true.
    
    We should call `false` even if `open` returns `false`. The user may open some connections to check the existing data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66540602
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.sql.streaming.ContinuousQuery
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]]. Each partition will use a new
    + * deserialized instance, so you usually should do the initialization work in the `open` method.
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +abstract class ForeachWriter[T] extends Serializable {
    +
    +  /**
    +   * Called when starting to process one partition of new data in the executor side. `version` is
    +   * for data deduplication. When recovering from a failure, some data may be processed twice. But
    +   * it's guarantee that they will be opened with the same "version".
    --- End diff --
    
    for data deduplication **when there are failures.**
    
    When recovering from a failure, some data may be **generated multiple times** but they will always have the same version. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13342#issuecomment-222247240
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #59754 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59754/consoleFull)** for PR 13342 at commit [`40b2508`](https://github.com/apache/spark/commit/40b2508a61cedff7a249146aed0de47d7b1d0263).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64846038
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -156,7 +156,7 @@ private[sql] object Dataset {
     class Dataset[T] private[sql](
         @transient val sparkSession: SparkSession,
         @DeveloperApi @transient val queryExecution: QueryExecution,
    -    encoder: Encoder[T])
    +    private[sql] val encoder: Encoder[T])
    --- End diff --
    
    should we use unresolvedTEncoder or resolvedTEncoder? @marmbrus mentioned in the past that we should probably get rid of some of these to make it more obvious what to use.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #60078 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/60078/consoleFull)** for PR 13342 at commit [`6279486`](https://github.com/apache/spark/commit/6279486eca263bccc13284df8e1d9e6d286bf7a5).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    **[Test build #59768 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59768/consoleFull)** for PR 13342 at commit [`d6bb884`](https://github.com/apache/spark/commit/d6bb884efa85edbe3dd4b377d41b07996c35a45e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class DefaultSource extends StreamSourceProvider with StreamSinkProvider `
      * `class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66562319
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -232,7 +234,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
        * @since 1.4.0
        */
       @scala.annotation.varargs
    -  def partitionBy(colNames: String*): DataFrameWriter = {
    +  def partitionBy(colNames: String*): this.type = {
    --- End diff --
    
    Forgot to change it. No difference in byte codes but it will show `DataFrameWriter.this.type` in scala doc. I will update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13342
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64844941
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]].
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    --- End diff --
    
    @tdas do you mean we should provide a feature to run codes in the driver side?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66532833
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.sql.streaming.ContinuousQuery
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]]. Each partition will use a new
    --- End diff --
    
    A writer to **write** data generated 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66537102
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -401,6 +381,53 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * :: Experimental ::
    +   * Starts the execution of the streaming query, which will continually send results to the given
    +   * [[ForeachWriter]] as new data arrives. The returned [[ContinuousQuery]] object can be used to
    +   * interact with the stream.
    +   *
    +   * @since 2.0.0
    +   */
    +  @Experimental
    +  def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
    +    assertNotBucketed("foreach")
    +    assertStreaming(
    +      "foreach() on streaming Datasets and DataFrames can only be called on continuous queries")
    +
    +    val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
    +    val sink = new ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc)
    +    df.sparkSession.sessionState.continuousQueryManager.startQuery(
    +      queryName,
    +      getCheckpointLocation(queryName, required = false),
    +      df,
    +      sink,
    +      outputMode,
    +      trigger)
    +  }
    +
    +  /**
    +   * Returns the checkpointLocation for a query. If `required` is `true` but the checkpoint
    +   * location is not set, [[AnalysisException]] will be thrown. If `required` is `false`, a temp
    +   * folder will be created if the checkpoint location is not set.
    +   */
    +  private def getCheckpointLocation(queryName: String, required: Boolean): String = {
    +    extraOptions.get("checkpointLocation").map { userSpecified =>
    +      new Path(userSpecified).toUri.toString
    +    }.orElse {
    +      df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
    +        new Path(location, queryName).toUri.toString
    +      }
    +    }.getOrElse {
    +      if (required) {
    +        throw new AnalysisException("checkpointLocation must be specified either " +
    +          "through option() or SQLConf")
    --- End diff --
    
    SQLConf is actually not a user facing concept.
    
    Just say `SparkSession.conf.set`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64846783
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]].
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    --- End diff --
    
    Yea this thing is not meant to be exactly once. I don't think you can do exactly once without synchronizing on the drivers. But that's probably OK here. We just want something simple that can be used to implement extra functionalities.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66538640
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.sql.streaming.ContinuousQuery
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]]. Each partition will use a new
    + * deserialized instance, so you usually should do the initialization work in the `open` method.
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +abstract class ForeachWriter[T] extends Serializable {
    +
    +  /**
    +   * Called when starting to process one partition of new data in the executor side. `version` is
    --- End diff --
    
    in the executor ~side~.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13342
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66533974
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -401,6 +381,53 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       }
     
       /**
    +   * :: Experimental ::
    +   * Starts the execution of the streaming query, which will continually send results to the given
    +   * [[ForeachWriter]] as new data arrives. The returned [[ContinuousQuery]] object can be used to
    --- End diff --
    
    Also can you add a simple example code showing the use of ForeachWriter.
    ```
    datasetOfString.write.foreach(new ForeachWriter[String] {
        def open(): Boolean = {   // open connection }
        def process(record: String) = {   // write string to connection }
        def close(errorOrNull: Throwable): Unit = { // close the connection } 
    } 
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r64844061
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]].
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +trait ForeachWriter[T] extends Serializable {
    --- End diff --
    
    @zsxwing @rxin @marmbrus  I think this is significantly less useful if there is not sense progress in the form of batch id or time. Otherwise there is no way to identify re-executions of a partition. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13342: [SPARK-15593][SQL]Add DataFrameWriter.foreach to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13342#discussion_r66537560
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala ---
    @@ -0,0 +1,63 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.sql.streaming.ContinuousQuery
    +
    +/**
    + * :: Experimental ::
    + * A writer to consume data generated by a [[ContinuousQuery]]. Each partition will use a new
    + * deserialized instance, so you usually should do the initialization work in the `open` method.
    + *
    + * @since 2.0.0
    + */
    +@Experimental
    +abstract class ForeachWriter[T] extends Serializable {
    +
    +  /**
    +   * Called when starting to process one partition of new data in the executor side. `version` is
    +   * for data deduplication. When recovering from a failure, some data may be processed twice. But
    +   * it's guarantee that they will be opened with the same "version".
    +   *
    +   * If this method finds this is a partition from a duplicated data set, it can return `false` to
    +   * skip the further data processing. However, `close` still will be called for cleaning up
    +   * resources.
    +   *
    +   * @param partitionId the partition id.
    +   * @param version a unique id for data deduplication.
    +   * @return a flat that indicates if the data should be processed.
    +   */
    +  def open(partitionId: Long, version: Long): Boolean
    +
    +  /**
    +   * Called to process the data in the executor side.
    +   */
    +  def process(value: T): Unit
    +
    +  /**
    +   * Called when stopping to process one partition of new data in the executor side. This is
    +   * guaranteed to be called when a `Throwable` is thrown during processing data. However,
    +   * `close` won't be called in the following cases:
    +   *  - JVM crashes without throwing a `Throwable`
    +   *  - `open` throws a `Throwable`.
    +   *
    +   * @param errorOrNull the error thrown during processing data or null if nothing is thrown.
    --- End diff --
    
    if nothing is thrown --> if there was no error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15593][SQL]Add DataFrameWriter.foreach to allow t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13342
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59703/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org