You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mukulmurthy <gi...@git.apache.org> on 2018/06/13 20:59:31 UTC

[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

GitHub user mukulmurthy opened a pull request:

    https://github.com/apache/spark/pull/21559

    [SPARK-24525][SS] Provide an option to limit number of rows in a MemorySink

    ## What changes were proposed in this pull request?
    
    Provide an option to limit number of rows in a MemorySink. Currently, MemorySink and MemorySinkV2 have unbounded size, meaning that if they're used (including under the hood during display()) on big data, they can OOM the stream. This change adds a maxMemorySinkRows option to limit how many rows MemorySink and MemorySinkV2 can hold. By default, they are still unbounded.
    
    ## How was this patch tested?
    
    Added new unit tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mukulmurthy/oss-spark SPARK-24525

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21559.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21559
    
----
commit ac7eb2f3cf4cca8ee5d64f90f71c6c0d14931c52
Author: Mukul Murthy <mu...@...>
Date:   2018-06-12T00:38:38Z

    Add in logic to determine the max rows a sink can have

commit 8dc89cca9129b25ad8f5f4cda856e5b594f53e52
Author: Mukul Murthy <mu...@...>
Date:   2018-06-12T18:55:32Z

    Make MemorySink and MemorySinkV2 respect row and byte limits

commit 8ddf566259016e4ce727eabb3206fd65303c5580
Author: Mukul Murthy <mu...@...>
Date:   2018-06-12T19:20:44Z

    Make tests compile

commit d82c7d5ee84b25e968f705aded2f2c04edc5c140
Author: Mukul Murthy <mu...@...>
Date:   2018-06-12T20:26:56Z

    Make microbatch memory writer work with limits

commit 7fefe877b03fe4ad522275780a64425b58bf5bb0
Author: Mukul Murthy <mu...@...>
Date:   2018-06-12T20:27:03Z

    Test MemorySinkV2 with limits

commit 58c5044ca2e62ca825df3a4e88c4b4f6d697461e
Author: Mukul Murthy <mu...@...>
Date:   2018-06-12T22:08:49Z

    Add MemorySink test with limit

commit 392f05f4c1d008493220f59ff7a4d4b948fdfc4b
Author: Mukul Murthy <mu...@...>
Date:   2018-06-12T22:23:27Z

    rename method

commit 9097dd52bf654d7de059a0a0eaca961bd424f3cd
Author: Mukul Murthy <mu...@...>
Date:   2018-06-13T20:36:08Z

    Don't use byte limit, and log if we truncate rows

commit a28fb38053395c04a72b5d79f1f12a3aa5d49972
Author: Mukul Murthy <mu...@...>
Date:   2018-06-13T20:36:21Z

    Update tests

commit f981cb818ffc95ddce2b59fcd64142615037b6a3
Author: Mukul Murthy <mu...@...>
Date:   2018-06-13T20:50:43Z

    minor refactor

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    **[Test build #91868 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91868/testReport)** for PR 21559 at commit [`e5b6175`](https://github.com/apache/spark/commit/e5b6175f0b638dc7235e4d3b610284a761e01480).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21559#discussion_r195268999
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala ---
    @@ -81,22 +84,35 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkB
         }.mkString("\n")
       }
     
    -  def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = {
    +  def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row], sinkCapacity: Option[Int])
    --- End diff --
    
    nit: our style is more like
    ```scala
      def write(
        batchId: Long,
        outputMode: OutputMode,
        newRows: Array[Row],
        sinkCapacity: Option[Int]): Unit = {
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21559#discussion_r195798990
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
    @@ -221,26 +222,72 @@ class MemoryStreamInputPartition(records: Array[UnsafeRow])
     }
     
     /** A common trait for MemorySinks with methods used for testing */
    -trait MemorySinkBase extends BaseStreamingSink {
    +trait MemorySinkBase extends BaseStreamingSink with Logging {
       def allData: Seq[Row]
       def latestBatchData: Seq[Row]
       def dataSinceBatch(sinceBatchId: Long): Seq[Row]
       def latestBatchId: Option[Long]
    +
    +  /**
    +   * Truncates the given rows to return at most maxRows rows.
    +   * @param rows The data that may need to be truncated.
    +   * @param batchLimit Number of rows to keep in this batch; the rest will be truncated
    +   * @param sinkLimit Total number of rows kept in this sink, for logging purposes.
    +   * @param batchId The ID of the batch that sent these rows, for logging purposes.
    +   * @return Truncated rows.
    +   */
    +  protected def truncateRowsIfNeeded(
    +      rows: Array[Row],
    +      batchLimit: Int,
    +      sinkLimit: Int,
    +      batchId: Long): Array[Row] = {
    +    if (rows.length > batchLimit && batchLimit >= 0) {
    +      logWarning(s"Truncating batch $batchId to $batchLimit rows because of sink limit $sinkLimit")
    --- End diff --
    
    nit: not sure if these sinks get used by Continuous processing too. If so I would rename `batch` to `trigger version`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    **[Test build #91926 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91926/testReport)** for PR 21559 at commit [`0402b60`](https://github.com/apache/spark/commit/0402b6042b6f0b773a17d2bc6d30eda1c46dd731).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21559#discussion_r195268218
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
    @@ -228,19 +229,45 @@ trait MemorySinkBase extends BaseStreamingSink {
       def latestBatchId: Option[Long]
     }
     
    +/**
    + * Companion object to MemorySinkBase.
    + */
    +object MemorySinkBase {
    +  val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows"
    --- End diff --
    
    `maxRows` is sufficient


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    jenkins add to whitelist


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    **[Test build #91796 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91796/testReport)** for PR 21559 at commit [`f981cb8`](https://github.com/apache/spark/commit/f981cb818ffc95ddce2b59fcd64142615037b6a3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21559#discussion_r195268299
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
    @@ -228,19 +229,45 @@ trait MemorySinkBase extends BaseStreamingSink {
       def latestBatchId: Option[Long]
     }
     
    +/**
    + * Companion object to MemorySinkBase.
    + */
    +object MemorySinkBase {
    +  val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows"
    +  val MAX_MEMORY_SINK_ROWS_DEFAULT = -1
    +
    +  /**
    +   * Gets the max number of rows a MemorySink should store. This number is based on the memory
    +   * sink row limit if it is set. If not, there is no limit.
    +   * @param options Options for writing from which we get the max rows option
    +   * @return The maximum number of rows a memorySink should store, or None for no limit.
    +   */
    +  def getMemorySinkCapacity(options: DataSourceOptions): Option[Int] = {
    +    val maxRows = options.getInt(MAX_MEMORY_SINK_ROWS, MAX_MEMORY_SINK_ROWS_DEFAULT)
    +    if (maxRows >= 0) Some(maxRows) else None
    +  }
    +}
    +
    +
    --- End diff --
    
    nit: remove extra line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21559#discussion_r195516533
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
    @@ -294,6 +333,16 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink
     
       def clear(): Unit = synchronized {
         batches.clear()
    +    numRows = 0
    +  }
    +
    +  private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = {
    --- End diff --
    
    nit: I'd document that maxRows is the remaining row capacity, not the maximum row limit defined in the options


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    @jose-torres @brkyvz for review


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    **[Test build #91861 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91861/testReport)** for PR 21559 at commit [`b2ef59c`](https://github.com/apache/spark/commit/b2ef59c40e58cdd6efdb0f5414f16ac5358bc99a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    **[Test build #91861 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91861/testReport)** for PR 21559 at commit [`b2ef59c`](https://github.com/apache/spark/commit/b2ef59c40e58cdd6efdb0f5414f16ac5358bc99a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21559#discussion_r195268130
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
    @@ -294,6 +333,16 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink
     
       def clear(): Unit = synchronized {
         batches.clear()
    +    numRows = 0
    +  }
    +
    +  private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = {
    +    if (rows.length > maxRows) {
    --- End diff --
    
    Also adding a check here to make sure maxRows >= 0. It shouldn't ever be negative, but doesn't hurt to safeguard.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21559#discussion_r195809395
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
    @@ -221,26 +222,72 @@ class MemoryStreamInputPartition(records: Array[UnsafeRow])
     }
     
     /** A common trait for MemorySinks with methods used for testing */
    -trait MemorySinkBase extends BaseStreamingSink {
    +trait MemorySinkBase extends BaseStreamingSink with Logging {
       def allData: Seq[Row]
       def latestBatchData: Seq[Row]
       def dataSinceBatch(sinceBatchId: Long): Seq[Row]
       def latestBatchId: Option[Long]
    +
    +  /**
    +   * Truncates the given rows to return at most maxRows rows.
    +   * @param rows The data that may need to be truncated.
    +   * @param batchLimit Number of rows to keep in this batch; the rest will be truncated
    +   * @param sinkLimit Total number of rows kept in this sink, for logging purposes.
    +   * @param batchId The ID of the batch that sent these rows, for logging purposes.
    +   * @return Truncated rows.
    +   */
    +  protected def truncateRowsIfNeeded(
    +      rows: Array[Row],
    +      batchLimit: Int,
    +      sinkLimit: Int,
    +      batchId: Long): Array[Row] = {
    +    if (rows.length > batchLimit && batchLimit >= 0) {
    +      logWarning(s"Truncating batch $batchId to $batchLimit rows because of sink limit $sinkLimit")
    --- End diff --
    
    This piece is shared by MemorySink and MemorySinkV2, and the MemorySinkV2 (continuous processing) sink still calls them batches.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21559#discussion_r195797571
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
    @@ -221,26 +222,72 @@ class MemoryStreamInputPartition(records: Array[UnsafeRow])
     }
     
     /** A common trait for MemorySinks with methods used for testing */
    -trait MemorySinkBase extends BaseStreamingSink {
    +trait MemorySinkBase extends BaseStreamingSink with Logging {
       def allData: Seq[Row]
       def latestBatchData: Seq[Row]
       def dataSinceBatch(sinceBatchId: Long): Seq[Row]
       def latestBatchId: Option[Long]
    +
    +  /**
    +   * Truncates the given rows to return at most maxRows rows.
    +   * @param rows The data that may need to be truncated.
    +   * @param batchLimit Number of rows to keep in this batch; the rest will be truncated
    +   * @param sinkLimit Total number of rows kept in this sink, for logging purposes.
    +   * @param batchId The ID of the batch that sent these rows, for logging purposes.
    +   * @return Truncated rows.
    +   */
    +  protected def truncateRowsIfNeeded(
    +      rows: Array[Row],
    +      batchLimit: Int,
    +      sinkLimit: Int,
    +      batchId: Long): Array[Row] = {
    +    if (rows.length > batchLimit && batchLimit >= 0) {
    +      logWarning(s"Truncating batch $batchId to $batchLimit rows because of sink limit $sinkLimit")
    +      rows.take(batchLimit)
    +    } else {
    +      rows
    +    }
    +  }
    +}
    +
    +/**
    + * Companion object to MemorySinkBase.
    + */
    +object MemorySinkBase {
    +  val MAX_MEMORY_SINK_ROWS = "maxRows"
    +  val MAX_MEMORY_SINK_ROWS_DEFAULT = -1
    +
    +  /**
    +   * Gets the max number of rows a MemorySink should store. This number is based on the memory
    +   * sink row limit if it is set. If not, there is no limit.
    +   * @param options Options for writing from which we get the max rows option
    +   * @return The maximum number of rows a memorySink should store, or None for no limit.
    --- End diff --
    
    need to update docs


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    **[Test build #91864 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91864/testReport)** for PR 21559 at commit [`25d6de1`](https://github.com/apache/spark/commit/25d6de1db8223975ebd9b69c7ca77c26e3d8674c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Jenkins add to whitelist


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    **[Test build #91868 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91868/testReport)** for PR 21559 at commit [`e5b6175`](https://github.com/apache/spark/commit/e5b6175f0b638dc7235e4d3b610284a761e01480).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21559#discussion_r195268861
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
    @@ -228,19 +229,45 @@ trait MemorySinkBase extends BaseStreamingSink {
       def latestBatchId: Option[Long]
     }
     
    +/**
    + * Companion object to MemorySinkBase.
    + */
    +object MemorySinkBase {
    +  val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows"
    +  val MAX_MEMORY_SINK_ROWS_DEFAULT = -1
    +
    +  /**
    +   * Gets the max number of rows a MemorySink should store. This number is based on the memory
    +   * sink row limit if it is set. If not, there is no limit.
    +   * @param options Options for writing from which we get the max rows option
    +   * @return The maximum number of rows a memorySink should store, or None for no limit.
    +   */
    +  def getMemorySinkCapacity(options: DataSourceOptions): Option[Int] = {
    +    val maxRows = options.getInt(MAX_MEMORY_SINK_ROWS, MAX_MEMORY_SINK_ROWS_DEFAULT)
    +    if (maxRows >= 0) Some(maxRows) else None
    --- End diff --
    
    Do you want to do `if (maxRows >= 0) maxRows else Int.MaxValue - 10`
    We can't exceed runtime array max size anyway


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91868/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91926/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    **[Test build #91799 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91799/testReport)** for PR 21559 at commit [`4ab9bda`](https://github.com/apache/spark/commit/4ab9bdaea895f6d0c76ee9ddd44c131f499eaec5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91861/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91796/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21559#discussion_r195516859
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala ---
    @@ -110,40 +126,61 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkB
     
       def clear(): Unit = synchronized {
         batches.clear()
    +    numRows = 0
    +  }
    +
    +  private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = {
    --- End diff --
    
    Can this go in MemorySinkBase?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    lgtm


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21559


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    **[Test build #91799 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91799/testReport)** for PR 21559 at commit [`4ab9bda`](https://github.com/apache/spark/commit/4ab9bdaea895f6d0c76ee9ddd44c131f499eaec5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Thanks! Merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    **[Test build #91864 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91864/testReport)** for PR 21559 at commit [`25d6de1`](https://github.com/apache/spark/commit/25d6de1db8223975ebd9b69c7ca77c26e3d8674c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait MemorySinkBase extends BaseStreamingSink with Logging `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91864/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21559#discussion_r195269434
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala ---
    @@ -110,40 +126,61 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkB
     
       def clear(): Unit = synchronized {
         batches.clear()
    +    numRows = 0
    +  }
    +
    +  private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = {
    +    if (rows.length > maxRows) {
    +      logWarning(s"Truncating batch $batchId to $maxRows rows")
    --- End diff --
    
    How does take behave with negative rows? Printing a warning message with negative values may be weird. I would also include the sink limit in the warning.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    **[Test build #91796 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91796/testReport)** for PR 21559 at commit [`f981cb8`](https://github.com/apache/spark/commit/f981cb818ffc95ddce2b59fcd64142615037b6a3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91799/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21559
  
    **[Test build #91926 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91926/testReport)** for PR 21559 at commit [`0402b60`](https://github.com/apache/spark/commit/0402b6042b6f0b773a17d2bc6d30eda1c46dd731).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org