You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/09/02 09:11:59 UTC

[spark] branch branch-3.0 updated: [SPARK-32776][SS] Limit in streaming should not be optimized away by PropagateEmptyRelation

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8506728  [SPARK-32776][SS] Limit in streaming should not be optimized away by PropagateEmptyRelation
8506728 is described below

commit 850672870faed6677d166c0029335600a15f4153
Author: liwensun <li...@databricks.com>
AuthorDate: Wed Sep 2 18:05:06 2020 +0900

    [SPARK-32776][SS] Limit in streaming should not be optimized away by PropagateEmptyRelation
    
    PropagateEmptyRelation will not be applied to LIMIT operators in streaming queries.
    
    Right now, the limit operator in a streaming query may get optimized away when the relation is empty. This can be problematic for stateful streaming, as this empty batch will not write any state store files, and the next batch will fail when trying to read these state store files and throw a file not found error.
    
    We should not let PropagateEmptyRelation optimize away the Limit operator for streaming queries.
    
    This PR is intended as a small and safe fix for PropagateEmptyRelation. A fundamental fix that can prevent this from happening again in the future and in other optimizer rules is more desirable, but that's a much larger task.
    
    No
    
    unit tests.
    
    Closes #29623 from liwensun/spark-32776.
    
    Authored-by: liwensun <li...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit f0851e95c6a2c05716a32c961bf2842e18fe1bc7)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../optimizer/PropagateEmptyRelation.scala         |  4 +--
 .../optimizer/PropagateEmptyRelationSuite.scala    |  6 ++++
 .../spark/sql/streaming/StreamingQuerySuite.scala  | 38 ++++++++++++++++++++++
 3 files changed, 46 insertions(+), 2 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
index b19e138..7535f9c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -85,8 +85,8 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
       case _: Filter => empty(p)
       case _: Sample => empty(p)
       case _: Sort => empty(p)
-      case _: GlobalLimit => empty(p)
-      case _: LocalLimit => empty(p)
+      case _: GlobalLimit if !p.isStreaming => empty(p)
+      case _: LocalLimit if !p.isStreaming => empty(p)
       case _: Repartition => empty(p)
       case _: RepartitionByExpression => empty(p)
       // An aggregate with non-empty group expression will return one output row per group when the
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index 9c7d4c7..535be12 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -221,4 +221,10 @@ class PropagateEmptyRelationSuite extends PlanTest {
     val optimized = Optimize.execute(query.analyze)
     assert(optimized.resolved)
   }
+
+  test("should not optimize away limit if streaming") {
+    val query = LocalRelation(Nil, Nil, isStreaming = true).limit(1).analyze
+    val optimized = Optimize.execute(query)
+    comparePlans(optimized, query)
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 434b6a2..1f408d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -34,7 +34,9 @@ import org.scalatestplus.mockito.MockitoSugar
 import org.apache.spark.{SparkException, TestUtils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid}
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete
 import org.apache.spark.sql.connector.read.InputPartition
 import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
@@ -1141,6 +1143,42 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     }
   }
 
+  testQuietly("limit on empty batch should not cause state store error") {
+    // The source only produces two batches, the first batch is empty and the second batch has data.
+    val source = new Source {
+      var batchId = 0
+      override def stop(): Unit = {}
+      override def getOffset: Option[Offset] = {
+        Some(LongOffset(batchId + 1))
+      }
+      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+        if (batchId == 0) {
+          batchId += 1
+          Dataset.ofRows(spark, LocalRelation(schema.toAttributes, Nil, isStreaming = true))
+        } else {
+          Dataset.ofRows(spark,
+            LocalRelation(schema.toAttributes, InternalRow(10) :: Nil, isStreaming = true))
+        }
+      }
+      override def schema: StructType = MockSourceProvider.fakeSchema
+    }
+
+    MockSourceProvider.withMockSources(source) {
+      val df = spark.readStream
+        .format("org.apache.spark.sql.streaming.util.MockSourceProvider")
+        .load()
+        .limit(1)
+
+      testStream(df)(
+        StartStream(),
+        AssertOnQuery { q =>
+          q.processAllAvailable()
+          true
+        },
+        CheckAnswer(10))
+    }
+  }
+
   private def checkExceptionMessage(df: DataFrame): Unit = {
     withTempDir { outputDir =>
       withTempDir { checkpointDir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org