You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2018/01/18 09:09:23 UTC

[GitHub] spark pull request #20311: [SPARK-23144][SS] Added console sink for continuo...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/20311

    [SPARK-23144][SS] Added console sink for continuous processing

    ## What changes were proposed in this pull request?
    Refactored ConsoleWriter into ConsoleMicrobatchWriter and ConsoleContinuousWriter.
    
    ## How was this patch tested?
    new unit test


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-23144

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20311.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20311
    
----
commit 6f69669c6b34a6d6bbcd11c3fb635262fe802d28
Author: Tathagata Das <ta...@...>
Date:   2018-01-18T09:07:00Z

    added console sink for continuous processing

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20311: [SPARK-23144][SS] Added console sink for continuous proc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20311
  
    **[Test build #86331 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86331/testReport)** for PR 20311 at commit [`6f69669`](https://github.com/apache/spark/commit/6f69669c6b34a6d6bbcd11c3fb635262fe802d28).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait ConsoleWriter extends Logging `
      * `class ConsoleMicroBatchWriter(batchId: Long, schema: StructType, val options: DataSourceV2Options)`
      * `class ConsoleContinuousWriter(schema: StructType, val options: DataSourceV2Options)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20311: [SPARK-23144][SS] Added console sink for continuous proc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20311
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86331/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20311: [SPARK-23144][SS] Added console sink for continuous proc...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20311
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20311: [SPARK-23144][SS] Added console sink for continuous proc...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20311
  
    **[Test build #86331 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86331/testReport)** for PR 20311 at commit [`6f69669`](https://github.com/apache/spark/commit/6f69669c6b34a6d6bbcd11c3fb635262fe802d28).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20311: [SPARK-23144][SS] Added console sink for continuous proc...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/20311
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20311: [SPARK-23144][SS] Added console sink for continuo...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20311#discussion_r162408245
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala ---
    @@ -20,45 +20,85 @@ package org.apache.spark.sql.execution.streaming.sources
     import org.apache.spark.internal.Logging
     import org.apache.spark.sql.{Row, SparkSession}
     import org.apache.spark.sql.sources.v2.DataSourceV2Options
    +import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
     import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage}
     import org.apache.spark.sql.types.StructType
     
    -/**
    - * A [[DataSourceV2Writer]] that collects results to the driver and prints them in the console.
    - * Generated by [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider]].
    - *
    - * This sink should not be used for production, as it requires sending all rows to the driver
    - * and does not support recovery.
    - */
    -class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options)
    -    extends DataSourceV2Writer with Logging {
    +/** Common methods used to create writes for the the console sink */
    +trait ConsoleWriter extends Logging {
    +
    +  def options: DataSourceV2Options
    +
       // Number of rows to display, by default 20 rows
    -  private val numRowsToShow = options.getInt("numRows", 20)
    +  protected val numRowsToShow = options.getInt("numRows", 20)
     
       // Truncate the displayed data if it is too long, by default it is true
    -  private val isTruncated = options.getBoolean("truncate", true)
    +  protected val isTruncated = options.getBoolean("truncate", true)
     
       assert(SparkSession.getActiveSession.isDefined)
    -  private val spark = SparkSession.getActiveSession.get
    +  protected val spark = SparkSession.getActiveSession.get
    +
    +  def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory
     
    -  override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory
    +  def abort(messages: Array[WriterCommitMessage]): Unit = {}
     
    -  override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized {
    -    val batch = messages.collect {
    +  protected def printRows(
    +      commitMessages: Array[WriterCommitMessage],
    +      schema: StructType,
    +      printMessage: String): Unit = {
    +    val rows = commitMessages.collect {
           case PackedRowCommitMessage(rows) => rows
         }.flatten
     
         // scalastyle:off println
         println("-------------------------------------------")
    -    println(s"Batch: $batchId")
    +    println(printMessage)
         println("-------------------------------------------")
         // scalastyle:off println
    -    spark.createDataFrame(
    -      spark.sparkContext.parallelize(batch), schema)
    +    spark
    +      .createDataFrame(spark.sparkContext.parallelize(rows), schema)
           .show(numRowsToShow, isTruncated)
       }
    +}
    +
    +
    +/**
    + * A [[DataSourceV2Writer]] that collects results from a micro-batch query to the driver and
    + * prints them in the console. Created by
    + * [[org.apache.spark.sql.execution.streaming.ConsoleSinkProvider]].
    + *
    + * This sink should not be used for production, as it requires sending all rows to the driver
    + * and does not support recovery.
    + */
    +class ConsoleMicroBatchWriter(batchId: Long, schema: StructType, val options: DataSourceV2Options)
    +  extends DataSourceV2Writer with ConsoleWriter {
    +
    +  override def commit(messages: Array[WriterCommitMessage]): Unit = {
    +    printRows(messages, schema, s"Batch: $batchId")
    --- End diff --
    
    Not part of this PR - but can you envision scenarios where microbatch.commit(messages) would be implemented differently than continuous.commit(batchId, messages)? Seeing this in context made me realize there's an obvious strategy to unify the streaming writer APIs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20311: [SPARK-23144][SS] Added console sink for continuo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/20311


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20311: [SPARK-23144][SS] Added console sink for continuous proc...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/20311
  
    @jose-torres PTAL


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org