You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mukulmurthy <gi...@git.apache.org> on 2018/06/13 20:59:31 UTC
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
GitHub user mukulmurthy opened a pull request:
https://github.com/apache/spark/pull/21559
[SPARK-24525][SS] Provide an option to limit number of rows in a MemorySink
## What changes were proposed in this pull request?
Provide an option to limit number of rows in a MemorySink. Currently, MemorySink and MemorySinkV2 have unbounded size, meaning that if they're used (including under the hood during display()) on big data, they can OOM the stream. This change adds a maxMemorySinkRows option to limit how many rows MemorySink and MemorySinkV2 can hold. By default, they are still unbounded.
## How was this patch tested?
Added new unit tests.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/mukulmurthy/oss-spark SPARK-24525
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/21559.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #21559
----
commit ac7eb2f3cf4cca8ee5d64f90f71c6c0d14931c52
Author: Mukul Murthy <mu...@...>
Date: 2018-06-12T00:38:38Z
Add in logic to determine the max rows a sink can have
commit 8dc89cca9129b25ad8f5f4cda856e5b594f53e52
Author: Mukul Murthy <mu...@...>
Date: 2018-06-12T18:55:32Z
Make MemorySink and MemorySinkV2 respect row and byte limits
commit 8ddf566259016e4ce727eabb3206fd65303c5580
Author: Mukul Murthy <mu...@...>
Date: 2018-06-12T19:20:44Z
Make tests compile
commit d82c7d5ee84b25e968f705aded2f2c04edc5c140
Author: Mukul Murthy <mu...@...>
Date: 2018-06-12T20:26:56Z
Make microbatch memory writer work with limits
commit 7fefe877b03fe4ad522275780a64425b58bf5bb0
Author: Mukul Murthy <mu...@...>
Date: 2018-06-12T20:27:03Z
Test MemorySinkV2 with limits
commit 58c5044ca2e62ca825df3a4e88c4b4f6d697461e
Author: Mukul Murthy <mu...@...>
Date: 2018-06-12T22:08:49Z
Add MemorySink test with limit
commit 392f05f4c1d008493220f59ff7a4d4b948fdfc4b
Author: Mukul Murthy <mu...@...>
Date: 2018-06-12T22:23:27Z
rename method
commit 9097dd52bf654d7de059a0a0eaca961bd424f3cd
Author: Mukul Murthy <mu...@...>
Date: 2018-06-13T20:36:08Z
Don't use byte limit, and log if we truncate rows
commit a28fb38053395c04a72b5d79f1f12a3aa5d49972
Author: Mukul Murthy <mu...@...>
Date: 2018-06-13T20:36:21Z
Update tests
commit f981cb818ffc95ddce2b59fcd64142615037b6a3
Author: Mukul Murthy <mu...@...>
Date: 2018-06-13T20:50:43Z
minor refactor
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21559
**[Test build #91868 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91868/testReport)** for PR 21559 at commit [`e5b6175`](https://github.com/apache/spark/commit/e5b6175f0b638dc7235e4d3b610284a761e01480).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/21559#discussion_r195268999
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala ---
@@ -81,22 +84,35 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkB
}.mkString("\n")
}
- def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = {
+ def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row], sinkCapacity: Option[Int])
--- End diff --
nit: our style is more like
```scala
def write(
batchId: Long,
outputMode: OutputMode,
newRows: Array[Row],
sinkCapacity: Option[Int]): Unit = {
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/21559#discussion_r195798990
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
@@ -221,26 +222,72 @@ class MemoryStreamInputPartition(records: Array[UnsafeRow])
}
/** A common trait for MemorySinks with methods used for testing */
-trait MemorySinkBase extends BaseStreamingSink {
+trait MemorySinkBase extends BaseStreamingSink with Logging {
def allData: Seq[Row]
def latestBatchData: Seq[Row]
def dataSinceBatch(sinceBatchId: Long): Seq[Row]
def latestBatchId: Option[Long]
+
+ /**
+ * Truncates the given rows to return at most maxRows rows.
+ * @param rows The data that may need to be truncated.
+ * @param batchLimit Number of rows to keep in this batch; the rest will be truncated
+ * @param sinkLimit Total number of rows kept in this sink, for logging purposes.
+ * @param batchId The ID of the batch that sent these rows, for logging purposes.
+ * @return Truncated rows.
+ */
+ protected def truncateRowsIfNeeded(
+ rows: Array[Row],
+ batchLimit: Int,
+ sinkLimit: Int,
+ batchId: Long): Array[Row] = {
+ if (rows.length > batchLimit && batchLimit >= 0) {
+ logWarning(s"Truncating batch $batchId to $batchLimit rows because of sink limit $sinkLimit")
--- End diff --
nit: not sure if these sinks get used by Continuous processing too. If so I would rename `batch` to `trigger version`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21559
**[Test build #91926 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91926/testReport)** for PR 21559 at commit [`0402b60`](https://github.com/apache/spark/commit/0402b6042b6f0b773a17d2bc6d30eda1c46dd731).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/21559#discussion_r195268218
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
@@ -228,19 +229,45 @@ trait MemorySinkBase extends BaseStreamingSink {
def latestBatchId: Option[Long]
}
+/**
+ * Companion object to MemorySinkBase.
+ */
+object MemorySinkBase {
+ val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows"
--- End diff --
`maxRows` is sufficient
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:
https://github.com/apache/spark/pull/21559
jenkins add to whitelist
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21559
**[Test build #91796 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91796/testReport)** for PR 21559 at commit [`f981cb8`](https://github.com/apache/spark/commit/f981cb818ffc95ddce2b59fcd64142615037b6a3).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/21559#discussion_r195268299
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
@@ -228,19 +229,45 @@ trait MemorySinkBase extends BaseStreamingSink {
def latestBatchId: Option[Long]
}
+/**
+ * Companion object to MemorySinkBase.
+ */
+object MemorySinkBase {
+ val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows"
+ val MAX_MEMORY_SINK_ROWS_DEFAULT = -1
+
+ /**
+ * Gets the max number of rows a MemorySink should store. This number is based on the memory
+ * sink row limit if it is set. If not, there is no limit.
+ * @param options Options for writing from which we get the max rows option
+ * @return The maximum number of rows a memorySink should store, or None for no limit.
+ */
+ def getMemorySinkCapacity(options: DataSourceOptions): Option[Int] = {
+ val maxRows = options.getInt(MAX_MEMORY_SINK_ROWS, MAX_MEMORY_SINK_ROWS_DEFAULT)
+ if (maxRows >= 0) Some(maxRows) else None
+ }
+}
+
+
--- End diff --
nit: remove extra line
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Can one of the admins verify this patch?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/21559#discussion_r195516533
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
@@ -294,6 +333,16 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink
def clear(): Unit = synchronized {
batches.clear()
+ numRows = 0
+ }
+
+ private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = {
--- End diff --
nit: I'd document that maxRows is the remaining row capacity, not the maximum row limit defined in the options
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on the issue:
https://github.com/apache/spark/pull/21559
@jose-torres @brkyvz for review
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21559
**[Test build #91861 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91861/testReport)** for PR 21559 at commit [`b2ef59c`](https://github.com/apache/spark/commit/b2ef59c40e58cdd6efdb0f5414f16ac5358bc99a).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21559
**[Test build #91861 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91861/testReport)** for PR 21559 at commit [`b2ef59c`](https://github.com/apache/spark/commit/b2ef59c40e58cdd6efdb0f5414f16ac5358bc99a).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on a diff in the pull request:
https://github.com/apache/spark/pull/21559#discussion_r195268130
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
@@ -294,6 +333,16 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink
def clear(): Unit = synchronized {
batches.clear()
+ numRows = 0
+ }
+
+ private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = {
+ if (rows.length > maxRows) {
--- End diff --
Also adding a check here to make sure maxRows >= 0. It shouldn't ever be negative, but doesn't hurt to safeguard.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on a diff in the pull request:
https://github.com/apache/spark/pull/21559#discussion_r195809395
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
@@ -221,26 +222,72 @@ class MemoryStreamInputPartition(records: Array[UnsafeRow])
}
/** A common trait for MemorySinks with methods used for testing */
-trait MemorySinkBase extends BaseStreamingSink {
+trait MemorySinkBase extends BaseStreamingSink with Logging {
def allData: Seq[Row]
def latestBatchData: Seq[Row]
def dataSinceBatch(sinceBatchId: Long): Seq[Row]
def latestBatchId: Option[Long]
+
+ /**
+ * Truncates the given rows to return at most maxRows rows.
+ * @param rows The data that may need to be truncated.
+ * @param batchLimit Number of rows to keep in this batch; the rest will be truncated
+ * @param sinkLimit Total number of rows kept in this sink, for logging purposes.
+ * @param batchId The ID of the batch that sent these rows, for logging purposes.
+ * @return Truncated rows.
+ */
+ protected def truncateRowsIfNeeded(
+ rows: Array[Row],
+ batchLimit: Int,
+ sinkLimit: Int,
+ batchId: Long): Array[Row] = {
+ if (rows.length > batchLimit && batchLimit >= 0) {
+ logWarning(s"Truncating batch $batchId to $batchLimit rows because of sink limit $sinkLimit")
--- End diff --
This piece is shared by MemorySink and MemorySinkV2, and the MemorySinkV2 (continuous processing) sink still calls them batches.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/21559#discussion_r195797571
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
@@ -221,26 +222,72 @@ class MemoryStreamInputPartition(records: Array[UnsafeRow])
}
/** A common trait for MemorySinks with methods used for testing */
-trait MemorySinkBase extends BaseStreamingSink {
+trait MemorySinkBase extends BaseStreamingSink with Logging {
def allData: Seq[Row]
def latestBatchData: Seq[Row]
def dataSinceBatch(sinceBatchId: Long): Seq[Row]
def latestBatchId: Option[Long]
+
+ /**
+ * Truncates the given rows to return at most maxRows rows.
+ * @param rows The data that may need to be truncated.
+ * @param batchLimit Number of rows to keep in this batch; the rest will be truncated
+ * @param sinkLimit Total number of rows kept in this sink, for logging purposes.
+ * @param batchId The ID of the batch that sent these rows, for logging purposes.
+ * @return Truncated rows.
+ */
+ protected def truncateRowsIfNeeded(
+ rows: Array[Row],
+ batchLimit: Int,
+ sinkLimit: Int,
+ batchId: Long): Array[Row] = {
+ if (rows.length > batchLimit && batchLimit >= 0) {
+ logWarning(s"Truncating batch $batchId to $batchLimit rows because of sink limit $sinkLimit")
+ rows.take(batchLimit)
+ } else {
+ rows
+ }
+ }
+}
+
+/**
+ * Companion object to MemorySinkBase.
+ */
+object MemorySinkBase {
+ val MAX_MEMORY_SINK_ROWS = "maxRows"
+ val MAX_MEMORY_SINK_ROWS_DEFAULT = -1
+
+ /**
+ * Gets the max number of rows a MemorySink should store. This number is based on the memory
+ * sink row limit if it is set. If not, there is no limit.
+ * @param options Options for writing from which we get the max rows option
+ * @return The maximum number of rows a memorySink should store, or None for no limit.
--- End diff --
need to update docs
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Can one of the admins verify this patch?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21559
**[Test build #91864 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91864/testReport)** for PR 21559 at commit [`25d6de1`](https://github.com/apache/spark/commit/25d6de1db8223975ebd9b69c7ca77c26e3d8674c).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:
https://github.com/apache/spark/pull/21559
Jenkins add to whitelist
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21559
**[Test build #91868 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91868/testReport)** for PR 21559 at commit [`e5b6175`](https://github.com/apache/spark/commit/e5b6175f0b638dc7235e4d3b610284a761e01480).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/21559#discussion_r195268861
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---
@@ -228,19 +229,45 @@ trait MemorySinkBase extends BaseStreamingSink {
def latestBatchId: Option[Long]
}
+/**
+ * Companion object to MemorySinkBase.
+ */
+object MemorySinkBase {
+ val MAX_MEMORY_SINK_ROWS = "maxMemorySinkRows"
+ val MAX_MEMORY_SINK_ROWS_DEFAULT = -1
+
+ /**
+ * Gets the max number of rows a MemorySink should store. This number is based on the memory
+ * sink row limit if it is set. If not, there is no limit.
+ * @param options Options for writing from which we get the max rows option
+ * @return The maximum number of rows a memorySink should store, or None for no limit.
+ */
+ def getMemorySinkCapacity(options: DataSourceOptions): Option[Int] = {
+ val maxRows = options.getInt(MAX_MEMORY_SINK_ROWS, MAX_MEMORY_SINK_ROWS_DEFAULT)
+ if (maxRows >= 0) Some(maxRows) else None
--- End diff --
Do you want to do `if (maxRows >= 0) maxRows else Int.MaxValue - 10`
We can't exceed runtime array max size anyway
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91868/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:
https://github.com/apache/spark/pull/21559
ok to test
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91926/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21559
**[Test build #91799 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91799/testReport)** for PR 21559 at commit [`4ab9bda`](https://github.com/apache/spark/commit/4ab9bdaea895f6d0c76ee9ddd44c131f499eaec5).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91861/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91796/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/21559#discussion_r195516859
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala ---
@@ -110,40 +126,61 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkB
def clear(): Unit = synchronized {
batches.clear()
+ numRows = 0
+ }
+
+ private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = {
--- End diff --
Can this go in MemorySinkBase?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Can one of the admins verify this patch?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:
https://github.com/apache/spark/pull/21559
lgtm
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/21559
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21559
**[Test build #91799 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91799/testReport)** for PR 21559 at commit [`4ab9bda`](https://github.com/apache/spark/commit/4ab9bdaea895f6d0c76ee9ddd44c131f499eaec5).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:
https://github.com/apache/spark/pull/21559
Thanks! Merging to master!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21559
**[Test build #91864 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91864/testReport)** for PR 21559 at commit [`25d6de1`](https://github.com/apache/spark/commit/25d6de1db8223975ebd9b69c7ca77c26e3d8674c).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds the following public classes _(experimental)_:
* `trait MemorySinkBase extends BaseStreamingSink with Logging `
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91864/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/21559#discussion_r195269434
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala ---
@@ -110,40 +126,61 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkB
def clear(): Unit = synchronized {
batches.clear()
+ numRows = 0
+ }
+
+ private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = {
+ if (rows.length > maxRows) {
+ logWarning(s"Truncating batch $batchId to $maxRows rows")
--- End diff --
How does take behave with negative rows? Printing a warning message with negative values may be weird. I would also include the sink limit in the warning.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21559
**[Test build #91796 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91796/testReport)** for PR 21559 at commit [`f981cb8`](https://github.com/apache/spark/commit/f981cb818ffc95ddce2b59fcd64142615037b6a3).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21559
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91799/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21559
**[Test build #91926 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91926/testReport)** for PR 21559 at commit [`0402b60`](https://github.com/apache/spark/commit/0402b6042b6f0b773a17d2bc6d30eda1c46dd731).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org