You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2020/01/31 17:35:02 UTC

[spark] branch master updated: [SPARK-30657][SPARK-30658][SS] Fixed two bugs in streaming limits

This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 481e521  [SPARK-30657][SPARK-30658][SS] Fixed two bugs in streaming limits
481e521 is described below

commit 481e5211d237173ea0fb7c0b292eb7abd2b8a3fe
Author: Tathagata Das <ta...@gmail.com>
AuthorDate: Fri Jan 31 09:26:03 2020 -0800

    [SPARK-30657][SPARK-30658][SS] Fixed two bugs in streaming limits
    
    This PR solves two bugs related to streaming limits
    
    **Bug 1 (SPARK-30658)**: Limit before a streaming aggregate (i.e. `df.limit(5).groupBy().count()`) in complete mode was not being planned as a stateful streaming limit. The planner rule planned a logical limit with a stateful streaming limit plan only if the query is in append mode. As a result, instead of allowing max 5 rows across batches, the planned streaming query was allowing 5 rows in every batch thus producing incorrect results.
    
    **Solution**: Change the planner rule to plan the logical limit with a streaming limit plan even when the query is in complete mode if the logical limit has no stateful operator before it.
    
    **Bug 2 (SPARK-30657)**: `LocalLimitExec` does not consume the iterator of the child plan. So if there is a limit after a stateful operator like streaming dedup in append mode (e.g. `df.dropDuplicates().limit(5)`), the state changes of streaming duplicate may not be committed (most stateful ops commit state changes only after the generated iterator is fully consumed).
    
    **Solution**: Change the planner rule to always use a new `StreamingLocalLimitExec` which always fully consumes the iterator. This is the safest thing to do. However, this will introduce a performance regression as consuming the iterator is extra work. To minimize this performance impact, add an additional post-planner optimization rule to replace `StreamingLocalLimitExec` with `LocalLimitExec` when there is no stateful operator before the limit that could be affected by it.
    
    No
    
    Updated incorrect unit tests and added new ones
    
    Closes #27373 from tdas/SPARK-30657.
    
    Authored-by: Tathagata Das <ta...@gmail.com>
    Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
 .../spark/sql/execution/SparkStrategies.scala      |  38 ++++---
 .../execution/streaming/IncrementalExecution.scala |  34 ++++++-
 ...GlobalLimitExec.scala => streamingLimits.scala} |  55 ++++++++--
 .../apache/spark/sql/streaming/StreamSuite.scala   | 112 ++++++++++++++++++++-
 4 files changed, 211 insertions(+), 28 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 00ad4e0..bd2684d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -451,21 +451,35 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
    * Used to plan the streaming global limit operator for streams in append mode.
    * We need to check for either a direct Limit or a Limit wrapped in a ReturnAnswer operator,
    * following the example of the SpecialLimits Strategy above.
-   * Streams with limit in Append mode use the stateful StreamingGlobalLimitExec.
-   * Streams with limit in Complete mode use the stateless CollectLimitExec operator.
-   * Limit is unsupported for streams in Update mode.
    */
   case class StreamingGlobalLimitStrategy(outputMode: OutputMode) extends Strategy {
-    override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case ReturnAnswer(rootPlan) => rootPlan match {
-        case Limit(IntegerLiteral(limit), child)
-            if plan.isStreaming && outputMode == InternalOutputModes.Append =>
-          StreamingGlobalLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
-        case _ => Nil
+
+    private def generatesStreamingAppends(plan: LogicalPlan): Boolean = {
+
+      /** Ensures that this plan does not have a streaming aggregate in it. */
+      def hasNoStreamingAgg: Boolean = {
+        plan.collectFirst { case a: Aggregate if a.isStreaming => a }.isEmpty
       }
-      case Limit(IntegerLiteral(limit), child)
-          if plan.isStreaming && outputMode == InternalOutputModes.Append =>
-        StreamingGlobalLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
+
+      // The following cases of limits on a streaming plan has to be executed with a stateful
+      // streaming plan.
+      // 1. When the query is in append mode (that is, all logical plan operate on appended data).
+      // 2. When the plan does not contain any streaming aggregate (that is, plan has only
+      //    operators that operate on appended data). This must be executed with a stateful
+      //    streaming plan even if the query is in complete mode because of a later streaming
+      //    aggregation (e.g., `streamingDf.limit(5).groupBy().count()`).
+      plan.isStreaming && (
+        outputMode == InternalOutputModes.Append ||
+        outputMode == InternalOutputModes.Complete && hasNoStreamingAgg)
+    }
+
+    override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case ReturnAnswer(Limit(IntegerLiteral(limit), child)) if generatesStreamingAppends(child) =>
+        StreamingGlobalLimitExec(limit, StreamingLocalLimitExec(limit, planLater(child))) :: Nil
+
+      case Limit(IntegerLiteral(limit), child) if generatesStreamingAppends(child) =>
+        StreamingGlobalLimitExec(limit, StreamingLocalLimitExec(limit, planLater(child))) :: Nil
+
       case _ => Nil
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index bf80a0b..09ae769 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Express
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashPartitioning, SinglePartition}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
+import org.apache.spark.sql.execution.{LeafExecNode, LocalLimitExec, QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.OutputMode
@@ -105,6 +105,32 @@ class IncrementalExecution(
   /** Locates save/restore pairs surrounding aggregation. */
   val state = new Rule[SparkPlan] {
 
+    /**
+     * Ensures that this plan DOES NOT have any stateful operation in it whose pipelined execution
+     * depends on this plan. In other words, this function returns true if this plan does
+     * have a narrow dependency on a stateful subplan.
+     */
+    private def hasNoStatefulOp(plan: SparkPlan): Boolean = {
+      var statefulOpFound = false
+
+      def findStatefulOp(planToCheck: SparkPlan): Unit = {
+        planToCheck match {
+          case s: StatefulOperator =>
+            statefulOpFound = true
+
+          case e: ShuffleExchangeExec =>
+            // Don't search recursively any further as any child stateful operator as we
+            // are only looking for stateful subplans that this plan has narrow dependencies on.
+
+          case p: SparkPlan =>
+            p.children.foreach(findStatefulOp)
+        }
+      }
+
+      findStatefulOp(plan)
+      !statefulOpFound
+    }
+
     override def apply(plan: SparkPlan): SparkPlan = plan transform {
       case StateStoreSaveExec(keys, None, None, None, stateFormatVersion,
              UnaryExecNode(agg,
@@ -149,6 +175,12 @@ class IncrementalExecution(
         l.copy(
           stateInfo = Some(nextStatefulOperationStateInfo),
           outputMode = Some(outputMode))
+
+      case StreamingLocalLimitExec(limit, child) if hasNoStatefulOp(child) =>
+        // Optimize limit execution by replacing StreamingLocalLimitExec (consumes the iterator
+        // completely) to LocalLimitExec (does not consume the iterator) when the child plan has
+        // no stateful operator (i.e., consuming the iterator is not needed).
+        LocalLimitExec(limit, child)
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingGlobalLimitExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
similarity index 68%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingGlobalLimitExec.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
index bf4af60..b195402 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingGlobalLimitExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala
@@ -20,21 +20,21 @@ import java.util.concurrent.TimeUnit.NANOSECONDS
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow, SortOrder, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, Partitioning}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
-import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.{LimitExec, SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.execution.streaming.state.StateStoreOps
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.{LongType, NullType, StructField, StructType}
-import org.apache.spark.util.CompletionIterator
+import org.apache.spark.util.{CompletionIterator, NextIterator}
 
 /**
  * A physical operator for executing a streaming limit, which makes sure no more than streamLimit
- * rows are returned. This operator is meant for streams in Append mode only.
+ * rows are returned. This physical operator is only meant for logical limit operations that
+ * will get a input stream of rows that are effectively appends. For example,
+ * - limit on any query in append mode
+ * - limit before the aggregation in a streaming aggregation query complete mode
  */
 case class StreamingGlobalLimitExec(
     streamLimit: Long,
@@ -49,9 +49,6 @@ case class StreamingGlobalLimitExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    assert(outputMode.isDefined && outputMode.get == InternalOutputModes.Append,
-      "StreamingGlobalLimitExec is only valid for streams in Append output mode")
-
     child.execute().mapPartitionsWithStateStore(
         getStateInfo,
         keySchema,
@@ -100,3 +97,41 @@ case class StreamingGlobalLimitExec(
     UnsafeProjection.create(valueSchema)(new GenericInternalRow(Array[Any](value)))
   }
 }
+
+
+/**
+ * A physical operator for executing limits locally on each partition. The main difference from
+ * LocalLimitExec is that this will fully consume `child` plan's iterators to ensure that any
+ * stateful operation within `child` commits all the state changes (many stateful operations
+ * commit state changes only after the iterator is consumed).
+ */
+case class StreamingLocalLimitExec(limit: Int, child: SparkPlan)
+  extends LimitExec {
+
+  override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
+
+    var generatedCount = 0
+
+    new NextIterator[InternalRow]() {
+      override protected def getNext(): InternalRow = {
+        if (generatedCount < limit && iter.hasNext) {
+          generatedCount += 1
+          iter.next()
+        } else {
+          finished = true
+          null
+        }
+      }
+
+      override protected def close(): Unit = {
+        while (iter.hasNext) iter.next() // consume the iterator completely
+      }
+    }
+  }
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def output: Seq[Attribute] = child.output
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index bf80962..b661882 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.SimpleMode
+import org.apache.spark.sql.execution.{LocalLimitExec, SimpleMode, SparkPlan}
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.{ContinuousMemoryStream, MemorySink}
@@ -976,24 +976,50 @@ class StreamSuite extends StreamTest {
       CheckAnswer(1 to 3: _*))
   }
 
-  test("streaming limit in complete mode") {
+  test("SPARK-30658: streaming limit before agg in complete mode") {
     val inputData = MemoryStream[Int]
     val limited = inputData.toDF().limit(5).groupBy("value").count()
     testStream(limited, OutputMode.Complete())(
       AddData(inputData, 1 to 3: _*),
       CheckAnswer(Row(1, 1), Row(2, 1), Row(3, 1)),
       AddData(inputData, 1 to 9: _*),
-      CheckAnswer(Row(1, 2), Row(2, 2), Row(3, 2), Row(4, 1), Row(5, 1)))
+      CheckAnswer(Row(1, 2), Row(2, 2), Row(3, 1)))
   }
 
-  test("streaming limits in complete mode") {
+  test("SPARK-30658: streaming limits before and after agg in complete mode " +
+    "(after limit < before limit)") {
     val inputData = MemoryStream[Int]
     val limited = inputData.toDF().limit(4).groupBy("value").count().orderBy("value").limit(3)
     testStream(limited, OutputMode.Complete())(
+      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
       AddData(inputData, 1 to 9: _*),
+      // only 1 to 4 should be allowed to aggregate, and counts for only 1 to 3 should be output
       CheckAnswer(Row(1, 1), Row(2, 1), Row(3, 1)),
       AddData(inputData, 2 to 6: _*),
-      CheckAnswer(Row(1, 1), Row(2, 2), Row(3, 2)))
+      // None of the new values should be allowed to aggregate, same 3 counts should be output
+      CheckAnswer(Row(1, 1), Row(2, 1), Row(3, 1)))
+  }
+
+  test("SPARK-30658: streaming limits before and after agg in complete mode " +
+    "(before limit < after limit)") {
+    val inputData = MemoryStream[Int]
+    val limited = inputData.toDF().limit(2).groupBy("value").count().orderBy("value").limit(3)
+    testStream(limited, OutputMode.Complete())(
+      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
+      AddData(inputData, 1 to 9: _*),
+      CheckAnswer(Row(1, 1), Row(2, 1)),
+      AddData(inputData, 2 to 6: _*),
+      CheckAnswer(Row(1, 1), Row(2, 1)))
+  }
+
+  test("SPARK-30657: streaming limit after streaming dedup in append mode") {
+    val inputData = MemoryStream[Int]
+    val limited = inputData.toDF().dropDuplicates().limit(1)
+    testStream(limited)(
+      AddData(inputData, 1, 2),
+      CheckAnswer(Row(1)),
+      AddData(inputData, 3, 4),
+      CheckAnswer(Row(1)))
   }
 
   test("streaming limit in update mode") {
@@ -1034,6 +1060,82 @@ class StreamSuite extends StreamTest {
         false))
   }
 
+  test("SPARK-30657: streaming limit should not apply on limits on state subplans") {
+    val streanData = MemoryStream[Int]
+    val streamingDF = streanData.toDF().toDF("value")
+    val staticDF = spark.createDataset(Seq(1)).toDF("value").orderBy("value")
+    testStream(streamingDF.join(staticDF.limit(1), "value"))(
+      AddData(streanData, 1, 2, 3),
+      CheckAnswer(Row(1)),
+      AddData(streanData, 1, 3, 5),
+      CheckAnswer(Row(1), Row(1)))
+  }
+
+  test("SPARK-30657: streaming limit optimization from StreamingLocalLimitExec to LocalLimitExec") {
+    val inputData = MemoryStream[Int]
+    val inputDF = inputData.toDF()
+
+    /** Verify whether the local limit in the plan is a streaming limit or is a simple */
+    def verifyLocalLimit(
+        df: DataFrame,
+        expectStreamingLimit: Boolean,
+      outputMode: OutputMode = OutputMode.Append): Unit = {
+
+      var execPlan: SparkPlan = null
+      testStream(df, outputMode)(
+        AddData(inputData, 1),
+        AssertOnQuery { q =>
+          q.processAllAvailable()
+          execPlan = q.lastExecution.executedPlan
+          true
+        }
+      )
+      require(execPlan != null)
+
+      val localLimits = execPlan.collect {
+        case l: LocalLimitExec => l
+        case l: StreamingLocalLimitExec => l
+      }
+
+      require(
+        localLimits.size == 1,
+        s"Cant verify local limit optimization with this plan:\n$execPlan")
+
+      if (expectStreamingLimit) {
+        assert(
+          localLimits.head.isInstanceOf[StreamingLocalLimitExec],
+          s"Local limit was not StreamingLocalLimitExec:\n$execPlan")
+      } else {
+        assert(
+          localLimits.head.isInstanceOf[LocalLimitExec],
+          s"Local limit was not LocalLimitExec:\n$execPlan")
+      }
+    }
+
+    // Should not be optimized, so StreamingLocalLimitExec should be present
+    verifyLocalLimit(inputDF.dropDuplicates().limit(1), expectStreamingLimit = true)
+
+    // Should be optimized from StreamingLocalLimitExec to LocalLimitExec
+    verifyLocalLimit(inputDF.limit(1), expectStreamingLimit = false)
+    verifyLocalLimit(
+      inputDF.limit(1).groupBy().count(),
+      expectStreamingLimit = false,
+      outputMode = OutputMode.Complete())
+
+    // Should be optimized as repartition is sufficient to ensure that the iterators of
+    // StreamingDeduplicationExec should be consumed completely by the repartition exchange.
+    verifyLocalLimit(inputDF.dropDuplicates().repartition(1).limit(1), expectStreamingLimit = false)
+
+    // Should be LocalLimitExec in the first place, not from optimization of StreamingLocalLimitExec
+    val staticDF = spark.range(1).toDF("value").limit(1)
+    verifyLocalLimit(inputDF.toDF("value").join(staticDF, "value"), expectStreamingLimit = false)
+
+    verifyLocalLimit(
+      inputDF.groupBy().count().limit(1),
+      expectStreamingLimit = false,
+      outputMode = OutputMode.Complete())
+  }
+
   test("is_continuous_processing property should be false for microbatch processing") {
     val input = MemoryStream[Int]
     val df = input.toDS()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org