You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2019/01/16 18:03:43 UTC

[spark] branch branch-2.3 updated: Revert "[SPARK-26629][SS] Fixed error with multiple file stream in a query + restart on a batch that has no data for one file stream"

This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new c0fc6d0  Revert "[SPARK-26629][SS] Fixed error with multiple file stream in a query + restart on a batch that has no data for one file stream"
c0fc6d0 is described below

commit c0fc6d0d8dbd890a817176eb1da6e98252c2e0c0
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Wed Jan 16 10:03:21 2019 -0800

    Revert "[SPARK-26629][SS] Fixed error with multiple file stream in a query + restart on a batch that has no data for one file stream"
    
    This reverts commit 5a50ae37f4c41099c174459603966ee25f21ac75.
---
 .../execution/streaming/FileStreamSourceLog.scala  |  4 +-
 .../sql/execution/streaming/HDFSMetadataLog.scala  |  3 +-
 .../execution/streaming/HDFSMetadataLogSuite.scala |  6 --
 .../sql/streaming/FileStreamSourceSuite.scala      | 75 ++--------------------
 4 files changed, 8 insertions(+), 80 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index 7b2ea96..8628471 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -117,9 +117,7 @@ class FileStreamSourceLog(
 
     val batches =
       (existedBatches ++ retrievedBatches).map(i => i._1 -> i._2.get).toArray.sortBy(_._1)
-    if (startBatchId <= endBatchId) {
-      HDFSMetadataLog.verifyBatchIds(batches.map(_._1), startId, endId)
-    }
+    HDFSMetadataLog.verifyBatchIds(batches.map(_._1), startId, endId)
     batches
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index d4cfbb3..00bc215 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -457,8 +457,7 @@ object HDFSMetadataLog {
   }
 
   /**
-   * Verify if batchIds are continuous and between `startId` and `endId` (both inclusive and
-   * startId assumed to be <= endId).
+   * Verify if batchIds are continuous and between `startId` and `endId`.
    *
    * @param batchIds the sorted ids to verify.
    * @param startId the start id. If it's set, batchIds should start with this id.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 57a0343..4677769 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -275,12 +275,6 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
     intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), None, Some(5L)))
     intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), Some(1L), Some(5L)))
     intercept[IllegalStateException](verifyBatchIds(Seq(1, 2, 4, 5), Some(1L), Some(5L)))
-
-    // Related to SPARK-26629, this capatures the behavior for verifyBatchIds when startId > endId
-    intercept[IllegalStateException](verifyBatchIds(Seq(), Some(2L), Some(1L)))
-    intercept[AssertionError](verifyBatchIds(Seq(2), Some(2L), Some(1L)))
-    intercept[AssertionError](verifyBatchIds(Seq(1), Some(2L), Some(1L)))
-    intercept[AssertionError](verifyBatchIds(Seq(0), Some(2L), Some(1L)))
   }
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index fb0b365..d4bd9c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -48,33 +48,21 @@ abstract class FileStreamSourceTest
    * `FileStreamSource` actually being used in the execution.
    */
   abstract class AddFileData extends AddData {
-    private val _qualifiedBasePath = PrivateMethod[Path]('qualifiedBasePath)
-
-    private def isSamePath(fileSource: FileStreamSource, srcPath: File): Boolean = {
-      val path = (fileSource invokePrivate _qualifiedBasePath()).toString.stripPrefix("file:")
-      path == srcPath.getCanonicalPath
-    }
-
     override def addData(query: Option[StreamExecution]): (Source, Offset) = {
       require(
         query.nonEmpty,
         "Cannot add data when there is no query for finding the active file stream source")
 
       val sources = getSourcesFromStreamingQuery(query.get)
-      val source = if (sources.isEmpty) {
+      if (sources.isEmpty) {
         throw new Exception(
           "Could not find file source in the StreamExecution logical plan to add data to")
-      } else if (sources.size == 1) {
-        sources.head
-      } else {
-        val matchedSources = sources.filter(isSamePath(_, src))
-        if (matchedSources.size != 1) {
-          throw new Exception(
-            "Could not select the file source in StreamExecution as there are multiple" +
-              s" file sources and none / more than one matches $src:\n" + sources.mkString("\n"))
-        }
-        matchedSources.head
+      } else if (sources.size > 1) {
+        throw new Exception(
+          "Could not select the file source in the StreamExecution logical plan as there" +
+            "are multiple file sources:\n\t" + sources.mkString("\n\t"))
       }
+      val source = sources.head
       val newOffset = source.withBatchingLocked {
         addData(source)
         new FileStreamSourceOffset(source.currentLogOffset + 1)
@@ -83,9 +71,6 @@ abstract class FileStreamSourceTest
       (source, newOffset)
     }
 
-    /** Source directory to add file data to */
-    protected def src: File
-
     protected def addData(source: FileStreamSource): Unit
   }
 
@@ -1509,54 +1494,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       newSource.getBatch(None, FileStreamSourceOffset(1))
     }
   }
-
-  test("SPARK-26629: multiple file sources work with restarts when a source does not have data") {
-    withTempDirs { case (dir, tmp) =>
-      val sourceDir1 = new File(dir, "source1")
-      val sourceDir2 = new File(dir, "source2")
-      sourceDir1.mkdirs()
-      sourceDir2.mkdirs()
-
-      val source1 = createFileStream("text", s"${sourceDir1.getCanonicalPath}")
-      val source2 = createFileStream("text", s"${sourceDir2.getCanonicalPath}")
-      val unioned = source1.union(source2)
-
-      def addMultiTextFileData(
-          source1Content: String,
-          source2Content: String): StreamAction = {
-        val actions = Seq(
-          AddTextFileData(source1Content, sourceDir1, tmp),
-          AddTextFileData(source2Content, sourceDir2, tmp)
-        ).filter(_.content != null)  // don't write to a source dir if no content specified
-        StreamProgressLockedActions(actions, desc = actions.mkString("[ ", " | ", " ]"))
-      }
-
-      testStream(unioned)(
-        StartStream(),
-        addMultiTextFileData(source1Content = "source1_0", source2Content = "source2_0"),
-        CheckNewAnswer("source1_0", "source2_0"),
-        StopStream,
-
-        StartStream(),
-        addMultiTextFileData(source1Content = "source1_1", source2Content = null),
-        CheckNewAnswer("source1_1"),
-        StopStream,
-
-        // Restart after a batch with one file source having no new data.
-        // This restart is needed to hit the issue in SPARK-26629.
-
-        StartStream(),
-        addMultiTextFileData(source1Content = null, source2Content = "source2_2"),
-        CheckNewAnswer("source2_2"),
-        StopStream,
-
-        StartStream(),
-        addMultiTextFileData(source1Content = "source1_3", source2Content = "source2_3"),
-        CheckNewAnswer("source1_3", "source2_3"),
-        StopStream
-      )
-    }
-  }
 }
 
 class FileStreamSourceStressTestSuite extends FileStreamSourceTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org