You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jose-torres <gi...@git.apache.org> on 2018/02/08 20:29:42 UTC

[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

GitHub user jose-torres opened a pull request:

    https://github.com/apache/spark/pull/20552

    [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

    ## What changes were proposed in this pull request?
    
    Migrate the foreach sink to the DataSourceV2 API.
    
    ## How was this patch tested?
    
    existing unit tests, and new test to verify edge case


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jose-torres/spark foreach-sink

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20552.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20552
    
----
commit 44de1ea878fb65e4e04ac6cd594f2e7c72ea2d5e
Author: Jose Torres <jo...@...>
Date:   2018-02-08T20:28:03Z

    migrate foreach sink

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    /cc @tdas 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20552#discussion_r167120724
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala ---
    @@ -17,52 +17,119 @@
     
     package org.apache.spark.sql.execution.streaming
     
    +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    +
     import org.apache.spark.TaskContext
    -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
    -import org.apache.spark.sql.catalyst.encoders.encoderFor
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
    +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.sql.streaming.OutputMode
    +import org.apache.spark.sql.types.StructType
     
    -/**
    - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by
    - * [[ForeachWriter]].
    - *
    - * @param writer The [[ForeachWriter]] to process all data.
    - * @tparam T The expected type of the sink.
    - */
    -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable {
    -
    -  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    -    // This logic should've been as simple as:
    -    // ```
    -    //   data.as[T].foreachPartition { iter => ... }
    -    // ```
    -    //
    -    // Unfortunately, doing that would just break the incremental planing. The reason is,
    -    // `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will
    -    // create a new plan. Because StreamExecution uses the existing plan to collect metrics and
    -    // update watermark, we should never create a new plan. Otherwise, metrics and watermark are
    -    // updated in the new plan, and StreamExecution cannot retrieval them.
    -    //
    -    // Hence, we need to manually convert internal rows to objects using encoder.
    +
    +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport {
    +  override def createStreamWriter(
    +      queryId: String,
    +      schema: StructType,
    +      mode: OutputMode,
    +      options: DataSourceOptions): StreamWriter = {
         val encoder = encoderFor[T].resolveAndBind(
    -      data.logicalPlan.output,
    -      data.sparkSession.sessionState.analyzer)
    -    data.queryExecution.toRdd.foreachPartition { iter =>
    -      if (writer.open(TaskContext.getPartitionId(), batchId)) {
    -        try {
    -          while (iter.hasNext) {
    -            writer.process(encoder.fromRow(iter.next()))
    -          }
    -        } catch {
    -          case e: Throwable =>
    -            writer.close(e)
    -            throw e
    -        }
    -        writer.close(null)
    -      } else {
    -        writer.close(null)
    +      schema.toAttributes,
    +      SparkSession.getActiveSession.get.sessionState.analyzer)
    +    ForeachInternalWriter(writer, encoder)
    +  }
    +}
    +
    +case class ForeachInternalWriter[T: Encoder](
    --- End diff --
    
    actually.. probably should not inline this. its outer closure may not be serializable in that case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    **[Test build #87231 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87231/testReport)** for PR 20552 at commit [`87d0bc8`](https://github.com/apache/spark/commit/87d0bc8ce23ab5a95ba0b5432d6b58042b32bdac).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20552#discussion_r167080181
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala ---
    @@ -17,52 +17,119 @@
     
     package org.apache.spark.sql.execution.streaming
     
    +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    +
     import org.apache.spark.TaskContext
    -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
    -import org.apache.spark.sql.catalyst.encoders.encoderFor
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
    +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.sql.streaming.OutputMode
    +import org.apache.spark.sql.types.StructType
     
    -/**
    - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by
    - * [[ForeachWriter]].
    - *
    - * @param writer The [[ForeachWriter]] to process all data.
    - * @tparam T The expected type of the sink.
    - */
    -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable {
    -
    -  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    -    // This logic should've been as simple as:
    -    // ```
    -    //   data.as[T].foreachPartition { iter => ... }
    -    // ```
    -    //
    -    // Unfortunately, doing that would just break the incremental planing. The reason is,
    -    // `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will
    -    // create a new plan. Because StreamExecution uses the existing plan to collect metrics and
    -    // update watermark, we should never create a new plan. Otherwise, metrics and watermark are
    -    // updated in the new plan, and StreamExecution cannot retrieval them.
    -    //
    -    // Hence, we need to manually convert internal rows to objects using encoder.
    +
    +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport {
    +  override def createStreamWriter(
    +      queryId: String,
    +      schema: StructType,
    +      mode: OutputMode,
    +      options: DataSourceOptions): StreamWriter = {
         val encoder = encoderFor[T].resolveAndBind(
    -      data.logicalPlan.output,
    -      data.sparkSession.sessionState.analyzer)
    -    data.queryExecution.toRdd.foreachPartition { iter =>
    -      if (writer.open(TaskContext.getPartitionId(), batchId)) {
    -        try {
    -          while (iter.hasNext) {
    -            writer.process(encoder.fromRow(iter.next()))
    -          }
    -        } catch {
    -          case e: Throwable =>
    -            writer.close(e)
    -            throw e
    -        }
    -        writer.close(null)
    -      } else {
    -        writer.close(null)
    +      schema.toAttributes,
    +      SparkSession.getActiveSession.get.sessionState.analyzer)
    +    ForeachInternalWriter(writer, encoder)
    +  }
    +}
    +
    +case class ForeachInternalWriter[T: Encoder](
    +    writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
    +    extends StreamWriter with SupportsWriteInternalRow {
    +  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
    +  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
    +
    +  override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = {
    +    ForeachWriterFactory(writer, encoder)
    +  }
    +}
    +
    +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
    +    extends DataWriterFactory[InternalRow] {
    +  override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = {
    +    new ForeachDataWriter(writer, encoder, partitionId)
    +  }
    +}
    +
    +class ForeachDataWriter[T : Encoder](
    +    private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], partitionId: Int)
    +    extends DataWriter[InternalRow] {
    +  private val initialEpochId: Long = {
    +    // Start with the microbatch ID. If it's not there, we're in continuous execution,
    +    // so get the start epoch.
    +    // This ID will be incremented as commits happen.
    +    TaskContext.get().getLocalProperty(MicroBatchExecution.BATCH_ID_KEY) match {
    +      case null => TaskContext.get().getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +      case batch => batch.toLong
    +    }
    +  }
    +  private var currentEpochId = initialEpochId
    +
    +  // The lifecycle of the ForeachWriter is incompatible with the lifecycle of DataSourceV2 writers.
    +  // Unfortunately, we cannot migrate ForeachWriter, as its implementations live in user code. So
    +  // we need a small state machine to shim between them.
    +  //  * CLOSED means close() has been called.
    +  //  * OPENED
    +  private object WriterState extends Enumeration {
    +    type WriterState = Value
    +    val CLOSED, OPENED, OPENED_SKIP_PROCESSING = Value
    +  }
    +  import WriterState._
    +
    +  private var state = CLOSED
    +
    +  private def openAndSetState(epochId: Long) = {
    +    // Create a new writer by roundtripping through the serialization for compatibility.
    +    // In the old API, a writer instantiation would never get reused.
    +    val byteStream = new ByteArrayOutputStream()
    --- End diff --
    
    Why are you serializing and deserializing here? If you are reserializing the ForeachWriter, doesnt this mean that you are going to retain state (of the non-transient fields) across them? Is that what you want?
    
    seems the best thing to do is to serialize the writer at the driver, send the bytes to the task, and then deserialize repeatedly. then you only incur the cost of deserializing between epochs and you always start with a fresh copy of the ForeachWriter?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    It's my intent to say that other data sources built by general developers aren't supposed to use batch ids in the executors for any purpose. In addition to the issue you mentioned, I don't think there's a compelling reason to do so in the DataSourceV2 model, and I worry it's easy to write implementations that seem correct but aren't that way.
    
    Since this interface is still evolving, I think it makes sense to revisit the question if we notice a scenario where it's infeasible to rewrite a piece of transactional logic to not use the batch ID in the executor.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    **[Test build #87230 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87230/testReport)** for PR 20552 at commit [`44de1ea`](https://github.com/apache/spark/commit/44de1ea878fb65e4e04ac6cd594f2e7c72ea2d5e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87231/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20552#discussion_r167078671
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala ---
    @@ -17,52 +17,119 @@
     
     package org.apache.spark.sql.execution.streaming
     
    +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    +
     import org.apache.spark.TaskContext
    -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
    -import org.apache.spark.sql.catalyst.encoders.encoderFor
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
    +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.sql.streaming.OutputMode
    +import org.apache.spark.sql.types.StructType
     
    -/**
    - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by
    - * [[ForeachWriter]].
    - *
    - * @param writer The [[ForeachWriter]] to process all data.
    - * @tparam T The expected type of the sink.
    - */
    -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable {
    -
    -  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    -    // This logic should've been as simple as:
    -    // ```
    -    //   data.as[T].foreachPartition { iter => ... }
    -    // ```
    -    //
    -    // Unfortunately, doing that would just break the incremental planing. The reason is,
    -    // `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will
    -    // create a new plan. Because StreamExecution uses the existing plan to collect metrics and
    -    // update watermark, we should never create a new plan. Otherwise, metrics and watermark are
    -    // updated in the new plan, and StreamExecution cannot retrieval them.
    -    //
    -    // Hence, we need to manually convert internal rows to objects using encoder.
    +
    +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport {
    +  override def createStreamWriter(
    +      queryId: String,
    +      schema: StructType,
    +      mode: OutputMode,
    +      options: DataSourceOptions): StreamWriter = {
         val encoder = encoderFor[T].resolveAndBind(
    -      data.logicalPlan.output,
    -      data.sparkSession.sessionState.analyzer)
    -    data.queryExecution.toRdd.foreachPartition { iter =>
    -      if (writer.open(TaskContext.getPartitionId(), batchId)) {
    -        try {
    -          while (iter.hasNext) {
    -            writer.process(encoder.fromRow(iter.next()))
    -          }
    -        } catch {
    -          case e: Throwable =>
    -            writer.close(e)
    -            throw e
    -        }
    -        writer.close(null)
    -      } else {
    -        writer.close(null)
    +      schema.toAttributes,
    +      SparkSession.getActiveSession.get.sessionState.analyzer)
    +    ForeachInternalWriter(writer, encoder)
    +  }
    +}
    +
    +case class ForeachInternalWriter[T: Encoder](
    +    writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
    +    extends StreamWriter with SupportsWriteInternalRow {
    +  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
    +  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
    +
    +  override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = {
    +    ForeachWriterFactory(writer, encoder)
    +  }
    +}
    +
    +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
    +    extends DataWriterFactory[InternalRow] {
    +  override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = {
    +    new ForeachDataWriter(writer, encoder, partitionId)
    +  }
    +}
    +
    +class ForeachDataWriter[T : Encoder](
    --- End diff --
    
    add docs describing the implementation of this DataWriter, especially the lifecycle of ForeachWriter (should go here than inline comments).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    **[Test build #87231 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87231/testReport)** for PR 20552 at commit [`87d0bc8`](https://github.com/apache/spark/commit/87d0bc8ce23ab5a95ba0b5432d6b58042b32bdac).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87230/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20552#discussion_r167077542
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala ---
    @@ -17,52 +17,119 @@
     
     package org.apache.spark.sql.execution.streaming
     
    +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    +
     import org.apache.spark.TaskContext
    -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
    -import org.apache.spark.sql.catalyst.encoders.encoderFor
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
    +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.sql.streaming.OutputMode
    +import org.apache.spark.sql.types.StructType
     
    -/**
    - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by
    - * [[ForeachWriter]].
    - *
    - * @param writer The [[ForeachWriter]] to process all data.
    - * @tparam T The expected type of the sink.
    - */
    -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable {
    -
    -  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    -    // This logic should've been as simple as:
    -    // ```
    -    //   data.as[T].foreachPartition { iter => ... }
    -    // ```
    -    //
    -    // Unfortunately, doing that would just break the incremental planing. The reason is,
    -    // `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will
    -    // create a new plan. Because StreamExecution uses the existing plan to collect metrics and
    -    // update watermark, we should never create a new plan. Otherwise, metrics and watermark are
    -    // updated in the new plan, and StreamExecution cannot retrieval them.
    -    //
    -    // Hence, we need to manually convert internal rows to objects using encoder.
    +
    +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport {
    +  override def createStreamWriter(
    +      queryId: String,
    +      schema: StructType,
    +      mode: OutputMode,
    +      options: DataSourceOptions): StreamWriter = {
         val encoder = encoderFor[T].resolveAndBind(
    -      data.logicalPlan.output,
    -      data.sparkSession.sessionState.analyzer)
    -    data.queryExecution.toRdd.foreachPartition { iter =>
    -      if (writer.open(TaskContext.getPartitionId(), batchId)) {
    -        try {
    -          while (iter.hasNext) {
    -            writer.process(encoder.fromRow(iter.next()))
    -          }
    -        } catch {
    -          case e: Throwable =>
    -            writer.close(e)
    -            throw e
    -        }
    -        writer.close(null)
    -      } else {
    -        writer.close(null)
    +      schema.toAttributes,
    +      SparkSession.getActiveSession.get.sessionState.analyzer)
    +    ForeachInternalWriter(writer, encoder)
    +  }
    +}
    +
    +case class ForeachInternalWriter[T: Encoder](
    --- End diff --
    
    nit: This is really a small class. Maybe inline this rather than define a confusing name`...InternalWriter`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20552#discussion_r167078037
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala ---
    @@ -17,52 +17,119 @@
     
     package org.apache.spark.sql.execution.streaming
     
    +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    +
     import org.apache.spark.TaskContext
    -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
    -import org.apache.spark.sql.catalyst.encoders.encoderFor
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
    +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.sql.streaming.OutputMode
    +import org.apache.spark.sql.types.StructType
     
    -/**
    - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by
    - * [[ForeachWriter]].
    - *
    - * @param writer The [[ForeachWriter]] to process all data.
    - * @tparam T The expected type of the sink.
    - */
    -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable {
    -
    -  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    -    // This logic should've been as simple as:
    -    // ```
    -    //   data.as[T].foreachPartition { iter => ... }
    -    // ```
    -    //
    -    // Unfortunately, doing that would just break the incremental planing. The reason is,
    -    // `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will
    -    // create a new plan. Because StreamExecution uses the existing plan to collect metrics and
    -    // update watermark, we should never create a new plan. Otherwise, metrics and watermark are
    -    // updated in the new plan, and StreamExecution cannot retrieval them.
    -    //
    -    // Hence, we need to manually convert internal rows to objects using encoder.
    +
    +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport {
    +  override def createStreamWriter(
    +      queryId: String,
    +      schema: StructType,
    +      mode: OutputMode,
    +      options: DataSourceOptions): StreamWriter = {
         val encoder = encoderFor[T].resolveAndBind(
    -      data.logicalPlan.output,
    -      data.sparkSession.sessionState.analyzer)
    -    data.queryExecution.toRdd.foreachPartition { iter =>
    -      if (writer.open(TaskContext.getPartitionId(), batchId)) {
    -        try {
    -          while (iter.hasNext) {
    -            writer.process(encoder.fromRow(iter.next()))
    -          }
    -        } catch {
    -          case e: Throwable =>
    -            writer.close(e)
    -            throw e
    -        }
    -        writer.close(null)
    -      } else {
    -        writer.close(null)
    +      schema.toAttributes,
    +      SparkSession.getActiveSession.get.sessionState.analyzer)
    +    ForeachInternalWriter(writer, encoder)
    +  }
    +}
    +
    +case class ForeachInternalWriter[T: Encoder](
    +    writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
    +    extends StreamWriter with SupportsWriteInternalRow {
    +  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
    +  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
    +
    +  override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = {
    +    ForeachWriterFactory(writer, encoder)
    +  }
    +}
    +
    +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
    --- End diff --
    
    similarly ... maybe inline this class as well. its very small.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres closed the pull request at:

    https://github.com/apache/spark/pull/20552


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20552#discussion_r167081693
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala ---
    @@ -255,6 +255,32 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
           query.stop()
         }
       }
    +
    --- End diff --
    
    I think there should be a test with continuous processing + foreach.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20552#discussion_r167076621
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala ---
    @@ -17,52 +17,119 @@
     
     package org.apache.spark.sql.execution.streaming
     
    +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    +
     import org.apache.spark.TaskContext
    -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
    -import org.apache.spark.sql.catalyst.encoders.encoderFor
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
    +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.sql.streaming.OutputMode
    +import org.apache.spark.sql.types.StructType
     
    -/**
    - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by
    - * [[ForeachWriter]].
    - *
    - * @param writer The [[ForeachWriter]] to process all data.
    - * @tparam T The expected type of the sink.
    - */
    -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable {
    -
    -  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    -    // This logic should've been as simple as:
    -    // ```
    -    //   data.as[T].foreachPartition { iter => ... }
    -    // ```
    -    //
    -    // Unfortunately, doing that would just break the incremental planing. The reason is,
    -    // `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will
    -    // create a new plan. Because StreamExecution uses the existing plan to collect metrics and
    -    // update watermark, we should never create a new plan. Otherwise, metrics and watermark are
    -    // updated in the new plan, and StreamExecution cannot retrieval them.
    -    //
    -    // Hence, we need to manually convert internal rows to objects using encoder.
    +
    +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport {
    +  override def createStreamWriter(
    +      queryId: String,
    +      schema: StructType,
    +      mode: OutputMode,
    +      options: DataSourceOptions): StreamWriter = {
         val encoder = encoderFor[T].resolveAndBind(
    -      data.logicalPlan.output,
    -      data.sparkSession.sessionState.analyzer)
    -    data.queryExecution.toRdd.foreachPartition { iter =>
    -      if (writer.open(TaskContext.getPartitionId(), batchId)) {
    -        try {
    -          while (iter.hasNext) {
    -            writer.process(encoder.fromRow(iter.next()))
    -          }
    -        } catch {
    -          case e: Throwable =>
    -            writer.close(e)
    -            throw e
    -        }
    -        writer.close(null)
    -      } else {
    -        writer.close(null)
    +      schema.toAttributes,
    +      SparkSession.getActiveSession.get.sessionState.analyzer)
    +    ForeachInternalWriter(writer, encoder)
    +  }
    +}
    +
    +case class ForeachInternalWriter[T: Encoder](
    +    writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
    --- End diff --
    
    nit: params on different lines


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    **[Test build #87417 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87417/testReport)** for PR 20552 at commit [`66270c5`](https://github.com/apache/spark/commit/66270c530ff7d7a82f9661b42d158d0b4544b8b7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    **[Test build #87230 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87230/testReport)** for PR 20552 at commit [`44de1ea`](https://github.com/apache/spark/commit/44de1ea878fb65e4e04ac6cd594f2e7c72ea2d5e).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport `
      * `case class ForeachInternalWriter[T: Encoder](`
      * `case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T])`
      * `class ForeachDataWriter[T : Encoder](`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    Filed SPARK-23416 for the unrelated failure in build 87241.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20552#discussion_r167126838
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala ---
    @@ -17,52 +17,119 @@
     
     package org.apache.spark.sql.execution.streaming
     
    +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    +
     import org.apache.spark.TaskContext
    -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
    -import org.apache.spark.sql.catalyst.encoders.encoderFor
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
    +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.sql.streaming.OutputMode
    +import org.apache.spark.sql.types.StructType
     
    -/**
    - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by
    - * [[ForeachWriter]].
    - *
    - * @param writer The [[ForeachWriter]] to process all data.
    - * @tparam T The expected type of the sink.
    - */
    -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable {
    -
    -  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    -    // This logic should've been as simple as:
    -    // ```
    -    //   data.as[T].foreachPartition { iter => ... }
    -    // ```
    -    //
    -    // Unfortunately, doing that would just break the incremental planing. The reason is,
    -    // `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will
    -    // create a new plan. Because StreamExecution uses the existing plan to collect metrics and
    -    // update watermark, we should never create a new plan. Otherwise, metrics and watermark are
    -    // updated in the new plan, and StreamExecution cannot retrieval them.
    -    //
    -    // Hence, we need to manually convert internal rows to objects using encoder.
    +
    +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport {
    +  override def createStreamWriter(
    +      queryId: String,
    +      schema: StructType,
    +      mode: OutputMode,
    +      options: DataSourceOptions): StreamWriter = {
         val encoder = encoderFor[T].resolveAndBind(
    -      data.logicalPlan.output,
    -      data.sparkSession.sessionState.analyzer)
    -    data.queryExecution.toRdd.foreachPartition { iter =>
    -      if (writer.open(TaskContext.getPartitionId(), batchId)) {
    -        try {
    -          while (iter.hasNext) {
    -            writer.process(encoder.fromRow(iter.next()))
    -          }
    -        } catch {
    -          case e: Throwable =>
    -            writer.close(e)
    -            throw e
    -        }
    -        writer.close(null)
    -      } else {
    -        writer.close(null)
    +      schema.toAttributes,
    +      SparkSession.getActiveSession.get.sessionState.analyzer)
    +    ForeachInternalWriter(writer, encoder)
    +  }
    +}
    +
    +case class ForeachInternalWriter[T: Encoder](
    +    writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
    +    extends StreamWriter with SupportsWriteInternalRow {
    +  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
    +  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
    +
    +  override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = {
    +    ForeachWriterFactory(writer, encoder)
    +  }
    +}
    +
    +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
    +    extends DataWriterFactory[InternalRow] {
    +  override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = {
    +    new ForeachDataWriter(writer, encoder, partitionId)
    +  }
    +}
    +
    +class ForeachDataWriter[T : Encoder](
    +    private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], partitionId: Int)
    +    extends DataWriter[InternalRow] {
    +  private val initialEpochId: Long = {
    +    // Start with the microbatch ID. If it's not there, we're in continuous execution,
    +    // so get the start epoch.
    +    // This ID will be incremented as commits happen.
    +    TaskContext.get().getLocalProperty(MicroBatchExecution.BATCH_ID_KEY) match {
    +      case null => TaskContext.get().getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
    +      case batch => batch.toLong
    +    }
    +  }
    +  private var currentEpochId = initialEpochId
    +
    +  // The lifecycle of the ForeachWriter is incompatible with the lifecycle of DataSourceV2 writers.
    +  // Unfortunately, we cannot migrate ForeachWriter, as its implementations live in user code. So
    +  // we need a small state machine to shim between them.
    +  //  * CLOSED means close() has been called.
    +  //  * OPENED
    +  private object WriterState extends Enumeration {
    +    type WriterState = Value
    +    val CLOSED, OPENED, OPENED_SKIP_PROCESSING = Value
    +  }
    +  import WriterState._
    +
    +  private var state = CLOSED
    +
    +  private def openAndSetState(epochId: Long) = {
    +    // Create a new writer by roundtripping through the serialization for compatibility.
    +    // In the old API, a writer instantiation would never get reused.
    +    val byteStream = new ByteArrayOutputStream()
    --- End diff --
    
    You're right; this suggestion is what we really want.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87241/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20552#discussion_r167080661
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala ---
    @@ -17,52 +17,119 @@
     
     package org.apache.spark.sql.execution.streaming
     
    +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    +
     import org.apache.spark.TaskContext
    -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
    -import org.apache.spark.sql.catalyst.encoders.encoderFor
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
    +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.sql.streaming.OutputMode
    +import org.apache.spark.sql.types.StructType
     
    -/**
    - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by
    - * [[ForeachWriter]].
    - *
    - * @param writer The [[ForeachWriter]] to process all data.
    - * @tparam T The expected type of the sink.
    - */
    -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable {
    -
    -  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    -    // This logic should've been as simple as:
    -    // ```
    -    //   data.as[T].foreachPartition { iter => ... }
    -    // ```
    -    //
    -    // Unfortunately, doing that would just break the incremental planing. The reason is,
    -    // `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will
    -    // create a new plan. Because StreamExecution uses the existing plan to collect metrics and
    -    // update watermark, we should never create a new plan. Otherwise, metrics and watermark are
    -    // updated in the new plan, and StreamExecution cannot retrieval them.
    -    //
    -    // Hence, we need to manually convert internal rows to objects using encoder.
    +
    +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport {
    +  override def createStreamWriter(
    +      queryId: String,
    +      schema: StructType,
    +      mode: OutputMode,
    +      options: DataSourceOptions): StreamWriter = {
         val encoder = encoderFor[T].resolveAndBind(
    -      data.logicalPlan.output,
    -      data.sparkSession.sessionState.analyzer)
    -    data.queryExecution.toRdd.foreachPartition { iter =>
    -      if (writer.open(TaskContext.getPartitionId(), batchId)) {
    -        try {
    -          while (iter.hasNext) {
    -            writer.process(encoder.fromRow(iter.next()))
    -          }
    -        } catch {
    -          case e: Throwable =>
    -            writer.close(e)
    -            throw e
    -        }
    -        writer.close(null)
    -      } else {
    -        writer.close(null)
    +      schema.toAttributes,
    +      SparkSession.getActiveSession.get.sessionState.analyzer)
    +    ForeachInternalWriter(writer, encoder)
    +  }
    +}
    +
    +case class ForeachInternalWriter[T: Encoder](
    +    writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
    +    extends StreamWriter with SupportsWriteInternalRow {
    +  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
    +  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
    +
    +  override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = {
    +    ForeachWriterFactory(writer, encoder)
    +  }
    +}
    +
    +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
    +    extends DataWriterFactory[InternalRow] {
    +  override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = {
    +    new ForeachDataWriter(writer, encoder, partitionId)
    +  }
    +}
    +
    +class ForeachDataWriter[T : Encoder](
    +    private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], partitionId: Int)
    --- End diff --
    
    params in separate lines.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    This is obsolete - we're changing the lifecycle of DataWriter.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87417/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20552#discussion_r167120763
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala ---
    @@ -17,52 +17,119 @@
     
     package org.apache.spark.sql.execution.streaming
     
    +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
    +
     import org.apache.spark.TaskContext
    -import org.apache.spark.sql.{DataFrame, Encoder, ForeachWriter}
    -import org.apache.spark.sql.catalyst.encoders.encoderFor
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
    +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
    +import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
    +import org.apache.spark.sql.sources.v2.writer._
    +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
    +import org.apache.spark.sql.streaming.OutputMode
    +import org.apache.spark.sql.types.StructType
     
    -/**
    - * A [[Sink]] that forwards all data into [[ForeachWriter]] according to the contract defined by
    - * [[ForeachWriter]].
    - *
    - * @param writer The [[ForeachWriter]] to process all data.
    - * @tparam T The expected type of the sink.
    - */
    -class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Serializable {
    -
    -  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    -    // This logic should've been as simple as:
    -    // ```
    -    //   data.as[T].foreachPartition { iter => ... }
    -    // ```
    -    //
    -    // Unfortunately, doing that would just break the incremental planing. The reason is,
    -    // `Dataset.foreachPartition()` would further call `Dataset.rdd()`, but `Dataset.rdd()` will
    -    // create a new plan. Because StreamExecution uses the existing plan to collect metrics and
    -    // update watermark, we should never create a new plan. Otherwise, metrics and watermark are
    -    // updated in the new plan, and StreamExecution cannot retrieval them.
    -    //
    -    // Hence, we need to manually convert internal rows to objects using encoder.
    +
    +case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends StreamWriteSupport {
    +  override def createStreamWriter(
    +      queryId: String,
    +      schema: StructType,
    +      mode: OutputMode,
    +      options: DataSourceOptions): StreamWriter = {
         val encoder = encoderFor[T].resolveAndBind(
    -      data.logicalPlan.output,
    -      data.sparkSession.sessionState.analyzer)
    -    data.queryExecution.toRdd.foreachPartition { iter =>
    -      if (writer.open(TaskContext.getPartitionId(), batchId)) {
    -        try {
    -          while (iter.hasNext) {
    -            writer.process(encoder.fromRow(iter.next()))
    -          }
    -        } catch {
    -          case e: Throwable =>
    -            writer.close(e)
    -            throw e
    -        }
    -        writer.close(null)
    -      } else {
    -        writer.close(null)
    +      schema.toAttributes,
    +      SparkSession.getActiveSession.get.sessionState.analyzer)
    +    ForeachInternalWriter(writer, encoder)
    +  }
    +}
    +
    +case class ForeachInternalWriter[T: Encoder](
    +    writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
    +    extends StreamWriter with SupportsWriteInternalRow {
    +  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
    +  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
    +
    +  override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = {
    +    ForeachWriterFactory(writer, encoder)
    +  }
    +}
    +
    +case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
    --- End diff --
    
    actually.. probably should not inline this. its outer closure may not be serializable in that case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    **[Test build #87241 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87241/testReport)** for PR 20552 at commit [`a33a35c`](https://github.com/apache/spark/commit/a33a35ccbae7350519a3faf8d5d3d6f35692feb3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20552: [SPARK-23099][SS] Migrate foreach sink to DataSou...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20552#discussion_r167126862
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala ---
    @@ -255,6 +255,32 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
           query.stop()
         }
       }
    +
    --- End diff --
    
    Good instinct, it didn't quite work. Added the test.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    **[Test build #87417 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87417/testReport)** for PR 20552 at commit [`66270c5`](https://github.com/apache/spark/commit/66270c530ff7d7a82f9661b42d158d0b4544b8b7).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20552: [SPARK-23099][SS] Migrate foreach sink to DataSourceV2

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20552
  
    **[Test build #87241 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87241/testReport)** for PR 20552 at commit [`a33a35c`](https://github.com/apache/spark/commit/a33a35ccbae7350519a3faf8d5d3d6f35692feb3).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org