You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2020/01/31 17:35:02 UTC
[spark] branch master updated: [SPARK-30657][SPARK-30658][SS] Fixed
two bugs in streaming limits
This is an automated email from the ASF dual-hosted git repository.
zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 481e521 [SPARK-30657][SPARK-30658][SS] Fixed two bugs in streaming limits
481e521 is described below
commit 481e5211d237173ea0fb7c0b292eb7abd2b8a3fe
Author: Tathagata Das <ta...@gmail.com>
AuthorDate: Fri Jan 31 09:26:03 2020 -0800
[SPARK-30657][SPARK-30658][SS] Fixed two bugs in streaming limits
This PR solves two bugs related to streaming limits
**Bug 1 (SPARK-30658)**: Limit before a streaming aggregate (i.e. `df.limit(5).groupBy().count()`) in complete mode was not being planned as a stateful streaming limit. The planner rule planned a logical limit with a stateful streaming limit plan only if the query is in append mode. As a result, instead of allowing max 5 rows across batches, the planned streaming query was allowing 5 rows in every batch thus producing incorrect results.
**Solution**: Change the planner rule to plan the logical limit with a streaming limit plan even when the query is in complete mode if the logical limit has no stateful operator before it.
**Bug 2 (SPARK-30657)**: `LocalLimitExec` does not consume the iterator of the child plan. So if there is a limit after a stateful operator like streaming dedup in append mode (e.g. `df.dropDuplicates().limit(5)`), the state changes of streaming duplicate may not be committed (most stateful ops commit state changes only after the generated iterator is fully consumed).
**Solution**: Change the planner rule to always use a new `StreamingLocalLimitExec` which always fully consumes the iterator. This is the safest thing to do. However, this will introduce a performance regression as consuming the iterator is extra work. To minimize this performance impact, add an additional post-planner optimization rule to replace `StreamingLocalLimitExec` with `LocalLimitExec` when there is no stateful operator before the limit that could be affected by it.
No
Updated incorrect unit tests and added new ones
Closes #27373 from tdas/SPARK-30657.
Authored-by: Tathagata Das <ta...@gmail.com>
Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
.../spark/sql/execution/SparkStrategies.scala | 38 ++++---
.../execution/streaming/IncrementalExecution.scala | 34 ++++++-
...GlobalLimitExec.scala => streamingLimits.scala} | 55 ++++++++--
.../apache/spark/sql/streaming/StreamSuite.scala | 112 ++++++++++++++++++++-
4 files changed, 211 insertions(+), 28 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 00ad4e0..bd2684d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -451,21 +451,35 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* Used to plan the streaming global limit operator for streams in append mode.
* We need to check for either a direct Limit or a Limit wrapped in a ReturnAnswer operator,
* following the example of the SpecialLimits Strategy above.
- * Streams with limit in Append mode use the stateful StreamingGlobalLimitExec.
- * Streams with limit in Complete mode use the stateless CollectLimitExec operator.
- * Limit is unsupported for streams in Update mode.
*/
case class StreamingGlobalLimitStrategy(outputMode: OutputMode) extends Strategy {
- override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case ReturnAnswer(rootPlan) => rootPlan match {
- case Limit(IntegerLiteral(limit), child)
- if plan.isStreaming && outputMode == InternalOutputModes.Append =>
- StreamingGlobalLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
- case _ => Nil
+
+ private def generatesStreamingAppends(plan: LogicalPlan): Boolean = {
+
+ /** Ensures that this plan does not have a streaming aggregate in it. */
+ def hasNoStreamingAgg: Boolean = {
+ plan.collectFirst { case a: Aggregate if a.isStreaming => a }.isEmpty
}
- case Limit(IntegerLiteral(limit), child)
- if plan.isStreaming && outputMode == InternalOutputModes.Append =>
- StreamingGlobalLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
+
+ // The following cases of limits on a streaming plan has to be executed with a stateful
+ // streaming plan.
+ // 1. When the query is in append mode (that is, all logical plan operate on appended data).
+ // 2. When the plan does not contain any streaming aggregate (that is, plan has only
+ // operators that operate on appended data). This must be executed with a stateful
+ // streaming plan even if the query is in complete mode because of a later streaming
+ // aggregation (e.g., `streamingDf.limit(5).groupBy().count()`).
+ plan.isStreaming && (
+ outputMode == InternalOutputModes.Append ||
+ outputMode == InternalOutputModes.Complete && hasNoStreamingAgg)
+ }
+
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case ReturnAnswer(Limit(IntegerLiteral(limit), child)) if generatesStreamingAppends(child) =>
+ StreamingGlobalLimitExec(limit, StreamingLocalLimitExec(limit, planLater(child))) :: Nil
+
+ case Limit(IntegerLiteral(limit), child) if generatesStreamingAppends(child) =>
+ StreamingGlobalLimitExec(limit, StreamingLocalLimitExec(limit, planLater(child))) :: Nil
+
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index bf80a0b..09ae769 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Express
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
+import org.apache.spark.sql.execution.{LeafExecNode, LocalLimitExec, QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
@@ -105,6 +105,32 @@ class IncrementalExecution(
/** Locates save/restore pairs surrounding aggregation. */
val state = new Rule[SparkPlan] {
+ /**
+ * Ensures that this plan DOES NOT have any stateful operation in it whose pipelined execution
+ * depends on this plan. In other words, this function returns true if this plan does
+ * have a narrow dependency on a stateful subplan.
+ */
+ private def hasNoStatefulOp(plan: SparkPlan): Boolean = {
+ var statefulOpFound = false
+
+ def findStatefulOp(planToCheck: SparkPlan): Unit = {
+ planToCheck match {
+ case s: StatefulOperator =>
+ statefulOpFound = true
+
+ case e: ShuffleExchangeExec =>
+ // Don't search recursively any further as any child stateful operator as we
+ // are only looking for stateful subplans that this plan has narrow dependencies on.
+
+ case p: SparkPlan =>
+ p.children.foreach(findStatefulOp)
+ }
+ }
+
+ findStatefulOp(plan)
+ !statefulOpFound
+ }
+
override def apply(plan: SparkPlan): SparkPlan = plan transform {
case StateStoreSaveExec(keys, None, None, None, stateFormatVersion,
UnaryExecNode(agg,
@@ -149,6 +175,12 @@ class IncrementalExecution(
l.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
outputMode = Some(outputMode))
+
+ case StreamingLocalLimitExec(limit, child) if hasNoStatefulOp(child) =>
+ // Optimize limit execution by replacing StreamingLocalLimitExec (consumes the iterator
+ // completely) to LocalLimitExec (does not consume the iterator) when the child plan has
+ // no stateful operator (i.e., consuming the iterator is not needed).
+ LocalLimitExec(limit, child)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingGlobalLimitExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
similarity index 68%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingGlobalLimitExec.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
index bf4af60..b195402 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingGlobalLimitExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
@@ -20,21 +20,21 @@ import java.util.concurrent.TimeUnit.NANOSECONDS
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow, SortOrder, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
-import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.{LimitExec, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.streaming.state.StateStoreOps
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
-import org.apache.spark.util.CompletionIterator
+import org.apache.spark.util.{CompletionIterator, NextIterator}
/**
* A physical operator for executing a streaming limit, which makes sure no more than streamLimit
- * rows are returned. This operator is meant for streams in Append mode only.
+ * rows are returned. This physical operator is only meant for logical limit operations that
+ * will get a input stream of rows that are effectively appends. For example,
+ * - limit on any query in append mode
+ * - limit before the aggregation in a streaming aggregation query complete mode
*/
case class StreamingGlobalLimitExec(
streamLimit: Long,
@@ -49,9 +49,6 @@ case class StreamingGlobalLimitExec(
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
- assert(outputMode.isDefined && outputMode.get == InternalOutputModes.Append,
- "StreamingGlobalLimitExec is only valid for streams in Append output mode")
-
child.execute().mapPartitionsWithStateStore(
getStateInfo,
keySchema,
@@ -100,3 +97,41 @@ case class StreamingGlobalLimitExec(
UnsafeProjection.create(valueSchema)(new GenericInternalRow(Array[Any](value)))
}
}
+
+
+/**
+ * A physical operator for executing limits locally on each partition. The main difference from
+ * LocalLimitExec is that this will fully consume `child` plan's iterators to ensure that any
+ * stateful operation within `child` commits all the state changes (many stateful operations
+ * commit state changes only after the iterator is consumed).
+ */
+case class StreamingLocalLimitExec(limit: Int, child: SparkPlan)
+ extends LimitExec {
+
+ override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
+
+ var generatedCount = 0
+
+ new NextIterator[InternalRow]() {
+ override protected def getNext(): InternalRow = {
+ if (generatedCount < limit && iter.hasNext) {
+ generatedCount += 1
+ iter.next()
+ } else {
+ finished = true
+ null
+ }
+ }
+
+ override protected def close(): Unit = {
+ while (iter.hasNext) iter.next() // consume the iterator completely
+ }
+ }
+ }
+
+ override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
+ override def output: Seq[Attribute] = child.output
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index bf80962..b661882 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.SimpleMode
+import org.apache.spark.sql.execution.{LocalLimitExec, SimpleMode, SparkPlan}
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.{ContinuousMemoryStream, MemorySink}
@@ -976,24 +976,50 @@ class StreamSuite extends StreamTest {
CheckAnswer(1 to 3: _*))
}
- test("streaming limit in complete mode") {
+ test("SPARK-30658: streaming limit before agg in complete mode") {
val inputData = MemoryStream[Int]
val limited = inputData.toDF().limit(5).groupBy("value").count()
testStream(limited, OutputMode.Complete())(
AddData(inputData, 1 to 3: _*),
CheckAnswer(Row(1, 1), Row(2, 1), Row(3, 1)),
AddData(inputData, 1 to 9: _*),
- CheckAnswer(Row(1, 2), Row(2, 2), Row(3, 2), Row(4, 1), Row(5, 1)))
+ CheckAnswer(Row(1, 2), Row(2, 2), Row(3, 1)))
}
- test("streaming limits in complete mode") {
+ test("SPARK-30658: streaming limits before and after agg in complete mode " +
+ "(after limit < before limit)") {
val inputData = MemoryStream[Int]
val limited = inputData.toDF().limit(4).groupBy("value").count().orderBy("value").limit(3)
testStream(limited, OutputMode.Complete())(
+ StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
AddData(inputData, 1 to 9: _*),
+ // only 1 to 4 should be allowed to aggregate, and counts for only 1 to 3 should be output
CheckAnswer(Row(1, 1), Row(2, 1), Row(3, 1)),
AddData(inputData, 2 to 6: _*),
- CheckAnswer(Row(1, 1), Row(2, 2), Row(3, 2)))
+ // None of the new values should be allowed to aggregate, same 3 counts should be output
+ CheckAnswer(Row(1, 1), Row(2, 1), Row(3, 1)))
+ }
+
+ test("SPARK-30658: streaming limits before and after agg in complete mode " +
+ "(before limit < after limit)") {
+ val inputData = MemoryStream[Int]
+ val limited = inputData.toDF().limit(2).groupBy("value").count().orderBy("value").limit(3)
+ testStream(limited, OutputMode.Complete())(
+ StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
+ AddData(inputData, 1 to 9: _*),
+ CheckAnswer(Row(1, 1), Row(2, 1)),
+ AddData(inputData, 2 to 6: _*),
+ CheckAnswer(Row(1, 1), Row(2, 1)))
+ }
+
+ test("SPARK-30657: streaming limit after streaming dedup in append mode") {
+ val inputData = MemoryStream[Int]
+ val limited = inputData.toDF().dropDuplicates().limit(1)
+ testStream(limited)(
+ AddData(inputData, 1, 2),
+ CheckAnswer(Row(1)),
+ AddData(inputData, 3, 4),
+ CheckAnswer(Row(1)))
}
test("streaming limit in update mode") {
@@ -1034,6 +1060,82 @@ class StreamSuite extends StreamTest {
false))
}
+ test("SPARK-30657: streaming limit should not apply on limits on state subplans") {
+ val streanData = MemoryStream[Int]
+ val streamingDF = streanData.toDF().toDF("value")
+ val staticDF = spark.createDataset(Seq(1)).toDF("value").orderBy("value")
+ testStream(streamingDF.join(staticDF.limit(1), "value"))(
+ AddData(streanData, 1, 2, 3),
+ CheckAnswer(Row(1)),
+ AddData(streanData, 1, 3, 5),
+ CheckAnswer(Row(1), Row(1)))
+ }
+
+ test("SPARK-30657: streaming limit optimization from StreamingLocalLimitExec to LocalLimitExec") {
+ val inputData = MemoryStream[Int]
+ val inputDF = inputData.toDF()
+
+ /** Verify whether the local limit in the plan is a streaming limit or is a simple */
+ def verifyLocalLimit(
+ df: DataFrame,
+ expectStreamingLimit: Boolean,
+ outputMode: OutputMode = OutputMode.Append): Unit = {
+
+ var execPlan: SparkPlan = null
+ testStream(df, outputMode)(
+ AddData(inputData, 1),
+ AssertOnQuery { q =>
+ q.processAllAvailable()
+ execPlan = q.lastExecution.executedPlan
+ true
+ }
+ )
+ require(execPlan != null)
+
+ val localLimits = execPlan.collect {
+ case l: LocalLimitExec => l
+ case l: StreamingLocalLimitExec => l
+ }
+
+ require(
+ localLimits.size == 1,
+ s"Cant verify local limit optimization with this plan:\n$execPlan")
+
+ if (expectStreamingLimit) {
+ assert(
+ localLimits.head.isInstanceOf[StreamingLocalLimitExec],
+ s"Local limit was not StreamingLocalLimitExec:\n$execPlan")
+ } else {
+ assert(
+ localLimits.head.isInstanceOf[LocalLimitExec],
+ s"Local limit was not LocalLimitExec:\n$execPlan")
+ }
+ }
+
+ // Should not be optimized, so StreamingLocalLimitExec should be present
+ verifyLocalLimit(inputDF.dropDuplicates().limit(1), expectStreamingLimit = true)
+
+ // Should be optimized from StreamingLocalLimitExec to LocalLimitExec
+ verifyLocalLimit(inputDF.limit(1), expectStreamingLimit = false)
+ verifyLocalLimit(
+ inputDF.limit(1).groupBy().count(),
+ expectStreamingLimit = false,
+ outputMode = OutputMode.Complete())
+
+ // Should be optimized as repartition is sufficient to ensure that the iterators of
+ // StreamingDeduplicationExec should be consumed completely by the repartition exchange.
+ verifyLocalLimit(inputDF.dropDuplicates().repartition(1).limit(1), expectStreamingLimit = false)
+
+ // Should be LocalLimitExec in the first place, not from optimization of StreamingLocalLimitExec
+ val staticDF = spark.range(1).toDF("value").limit(1)
+ verifyLocalLimit(inputDF.toDF("value").join(staticDF, "value"), expectStreamingLimit = false)
+
+ verifyLocalLimit(
+ inputDF.groupBy().count().limit(1),
+ expectStreamingLimit = false,
+ outputMode = OutputMode.Complete())
+ }
+
test("is_continuous_processing property should be false for microbatch processing") {
val input = MemoryStream[Int]
val df = input.toDS()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org