You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by lw-lin <gi...@git.apache.org> on 2017/02/19 07:56:52 UTC

[GitHub] spark pull request #16987: [WIP][SPARK-][SS] FileSource read from FileSink

GitHub user lw-lin opened a pull request:

    https://github.com/apache/spark/pull/16987

    [WIP][SPARK-][SS] FileSource read from FileSink

    ## What changes were proposed in this pull request?
    
    (Please fill in changes proposed in this fix)
    
    ## How was this patch tested?
    
    (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
    (If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/lw-lin/spark source-read-from-sink

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16987.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16987
    
----
commit b66d2ccabcae41973bd8af4ed406567dc071ff67
Author: Liwei Lin <lw...@gmail.com>
Date:   2017-02-18T01:20:18Z

    File Source reads from File Sink

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103603705
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    +        val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +        val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +        val q1 =
    +          q1_source
    +            .toDF()
    +            .writeStream
    +            .option("checkpointLocation", q1_checkpointDir)
    +            .format("text")
    +            .start(q1_outputDir)
    +
    +        // q2 is a streaming query that reads q1's text outputs
    +        val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep")
    +
    +        def q1AddData(data: String*): StreamAction =
    +          Execute { _ =>
    +            q1_source.addData(data)
    +            q1.processAllAvailable()
    +          }
    +        def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +        testStream(q2)(
    +          // batch 0
    +          q1AddData("drop1", "keep2"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2"),
    +
    +          // batch 1
    +          Assert {
    +            // create a text file that won't be on q1's sink log
    +            // thus even if its contents contains "keep", it should NOT appear in q2's answer
    +            val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt")
    +            stringToFile(shouldNotKeep, "should_not_keep!!!")
    +            shouldNotKeep.exists()
    +          },
    +          q1AddData("keep3"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // batch 2: check that things work well when the sink log gets compacted
    +          q1AddData("keep4"),
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
    +          },
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          // stop q1 manually
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103603922
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    +        val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +        val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +        val q1 =
    +          q1_source
    +            .toDF()
    +            .writeStream
    +            .option("checkpointLocation", q1_checkpointDir)
    +            .format("text")
    +            .start(q1_outputDir)
    +
    +        // q2 is a streaming query that reads q1's text outputs
    +        val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep")
    +
    +        def q1AddData(data: String*): StreamAction =
    +          Execute { _ =>
    +            q1_source.addData(data)
    +            q1.processAllAvailable()
    +          }
    +        def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +        testStream(q2)(
    +          // batch 0
    +          q1AddData("drop1", "keep2"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2"),
    +
    +          // batch 1
    +          Assert {
    +            // create a text file that won't be on q1's sink log
    +            // thus even if its contents contains "keep", it should NOT appear in q2's answer
    +            val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt")
    +            stringToFile(shouldNotKeep, "should_not_keep!!!")
    +            shouldNotKeep.exists()
    +          },
    +          q1AddData("keep3"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // batch 2: check that things work well when the sink log gets compacted
    +          q1AddData("keep4"),
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
    +          },
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          // stop q1 manually
    +          Execute { _ => q1.stop() }
    +        )
    +      }
    +    }
    +  }
    +
    +  test("read partitioned data from outputs of another streaming query") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to partitioned json files
    +      val q1_source = MemoryStream[(String, String)]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +      val q1 =
    +        q1_source
    +          .toDF()
    +          .select($"_1" as "partition", $"_2" as "value")
    +          .writeStream
    +          .option("checkpointLocation", q1_checkpointDir)
    +          .partitionBy("partition")
    +          .format("json")
    +          .start(q1_outputDir)
    +
    +      // q2 is a streaming query that reads q1's partitioned json outputs
    +      val schema = new StructType().add("value", StringType).add("partition", StringType)
    +      val q2 = createFileStream("json", q1_outputDir, Some(schema)).filter($"value" contains "keep")
    +
    +      def q1AddData(data: (String, String)*): StreamAction =
    +        Execute { _ =>
    +          q1_source.addData(data)
    +          q1.processAllAvailable()
    +        }
    +      def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +      testStream(q2)(
    +        // batch 0: append to a new partition=foo, should read value and partition
    +        q1AddData(("foo", "drop1"), ("foo", "keep2")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo")),
    +
    +        // batch 1: append to same partition=foo, should read value and partition
    +        q1AddData(("foo", "keep3")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
    +
    +        // batch 2: append to a different partition=bar, should read value and partition
    +        q1AddData(("bar", "keep4")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
    +
    +        // stop q1 manually
    +        Execute { _ => q1.stop() }
    +      )
    +    }
    +  }
    +
    +  test("start before another streaming query, and read its output") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to text files
    +      val q1_source = MemoryStream[String]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir")
    +      assert(q1_outputDir.mkdir())                    // prepare the output dir for q2 to read
    --- End diff --
    
    understood & fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103404486
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -159,28 +161,64 @@ class FileStreamSource(
     
       /**
        * If the source has a metadata log indicating which files should be read, then we should use it.
    -   * We figure out whether there exists some metadata log only when user gives a non-glob path.
    +   * Only when user gives a non-glob path that will we figure out whether the source has some
    +   * metadata log
    +   *
    +   * None        means we don't know at the moment
    +   * Some(true)  means we know for sure the source DOES have metadata
    +   * Some(false) means we know for sure the source DOSE NOT have metadata
        */
    -  private val sourceHasMetadata: Boolean =
    -    !SparkHadoopUtil.get.isGlobPath(new Path(path)) &&
    -      FileStreamSink.hasMetadata(Seq(path), sparkSession.sessionState.newHadoopConf())
    +  @volatile private[sql] var sourceHasMetadata: Option[Boolean] =
    +    if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None
    +
    +  private def allFilesUsingInMemoryFileIndex() = {
    +    val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
    +    val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
    +    fileIndex.allFiles()
    +  }
    +
    +  private def allFilesUsingMetadataLogFileIndex() = {
    +    // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a
    +    // non-glob path
    +    new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles()
    +  }
     
       /**
        * Returns a list of files found, sorted by their timestamp.
        */
       private def fetchAllFiles(): Seq[(String, Long)] = {
         val startTime = System.nanoTime
    -    val catalog =
    -      if (sourceHasMetadata) {
    -        // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a
    -        // non-glob path
    -        new MetadataLogFileIndex(sparkSession, qualifiedBasePath)
    +
    --- End diff --
    
    then based on `sourceHasMetadata`'s value, we can choose which `FileIndex` should be used. As showed below, `case None` should be dealt with most care.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Reopening :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73653/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    **[Test build #73653 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73653/testReport)** for PR 16987 at commit [`62fd518`](https://github.com/apache/spark/commit/62fd5189d9ba27ed3ed20cc103252aa9fdac052a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [WIP][SPARK-][SS] FileSource read from FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73124/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Thanks for working on this, however I'm not sure if we want to go with this approach.  In Spark 2.2, I think we should consider deprecating the manifest files and instead use deterministic file names to get exactly once semantics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    **[Test build #73492 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73492/testReport)** for PR 16987 at commit [`d31cb76`](https://github.com/apache/spark/commit/d31cb76756f7aa2c9c3c803d263ae81f5f509ff2).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Rebased to master and tests updated. @zsxwing would you take another look when you've got a minute?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73374/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [WIP][SPARK-][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Using deterministic file names sounds great. Thanks! I'm closing this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103404962
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -159,28 +161,64 @@ class FileStreamSource(
     
       /**
        * If the source has a metadata log indicating which files should be read, then we should use it.
    -   * We figure out whether there exists some metadata log only when user gives a non-glob path.
    +   * Only when user gives a non-glob path that will we figure out whether the source has some
    +   * metadata log
    +   *
    +   * None        means we don't know at the moment
    +   * Some(true)  means we know for sure the source DOES have metadata
    +   * Some(false) means we know for sure the source DOSE NOT have metadata
        */
    -  private val sourceHasMetadata: Boolean =
    -    !SparkHadoopUtil.get.isGlobPath(new Path(path)) &&
    -      FileStreamSink.hasMetadata(Seq(path), sparkSession.sessionState.newHadoopConf())
    +  @volatile private[sql] var sourceHasMetadata: Option[Boolean] =
    +    if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None
    +
    +  private def allFilesUsingInMemoryFileIndex() = {
    +    val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
    +    val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
    +    fileIndex.allFiles()
    +  }
    +
    +  private def allFilesUsingMetadataLogFileIndex() = {
    +    // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a
    +    // non-glob path
    +    new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles()
    +  }
     
       /**
        * Returns a list of files found, sorted by their timestamp.
        */
       private def fetchAllFiles(): Seq[(String, Long)] = {
         val startTime = System.nanoTime
    -    val catalog =
    -      if (sourceHasMetadata) {
    -        // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a
    -        // non-glob path
    -        new MetadataLogFileIndex(sparkSession, qualifiedBasePath)
    +
    --- End diff --
    
    seems like `sourceHasMetadata match { case ... }` is more appropriate here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103562238
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    --- End diff --
    
    `tmp` is not used. Why not just name them as `(outputDir, checkpointDir)`? Same for other tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103014293
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +663,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  testWithUninterruptibleThread("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        val sinkLogDir = new File(dir, FileStreamSink.metadataDir).getCanonicalPath
    +        val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, sinkLogDir)
    +
    +        val fileStream = createFileStream("text", dir.getCanonicalPath)
    +        val filtered = fileStream.filter($"value" contains "keep")
    +
    +        def addIntoSinkLog(batch: Int, fileName: String): Boolean = {
    +          val unqualifiedDirPath = new Path(new File(dir, fileName).getCanonicalPath)
    +          val fs = unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration)
    +          val sinkFileStatus = SinkFileStatus(fs.getFileStatus(unqualifiedDirPath))
    +          sinkLog.add(batch, Array(sinkFileStatus))
    +        }
    +
    +        testStream(filtered)(
    +          // Create new dir and write to it, should read
    +          AddTextFileData("drop1\nkeep2", dir, tmp, Some("file_1")),
    +          Assert { addIntoSinkLog(0, "file_1") },
    +          CheckAnswer("keep2"),
    +
    +          // Create "file_2" but DO NOT add it to the log intentionally
    +          AddTextFileData("should_not_keep!!!", dir, tmp, Some("file_2")),
    +          Assert { new File(dir, "file_2").exists() },
    +          AddTextFileData("keep3", dir, tmp, Some("file_3")),
    +          Assert { addIntoSinkLog(1, "file_3") },
    +          // Should NOT read "file_2"; should read "file_3"
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // Check that things work well when the sink log gets compacted
    +          AddTextFileData("keep4", dir, tmp, Some("file_4")),
    +          Assert { addIntoSinkLog(2, "file_4") },
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(sinkLogDir, "2" + CompactibleFileStreamLog.COMPACT_FILE_SUFFIX).exists()
    +          },
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          AddTextFileData("keep5", dir, tmp, Some("file_5")),
    +          Assert { addIntoSinkLog(3, "file_5") },
    +          CheckAnswer("keep2", "keep3", "keep4", "keep5")
    +        )
    +      }
    +    }
    +  }
    +
    +  testWithUninterruptibleThread("read partitioned data from outputs of another streaming query") {
    --- End diff --
    
    nit: same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103565647
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    +        val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +        val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +        val q1 =
    +          q1_source
    +            .toDF()
    +            .writeStream
    +            .option("checkpointLocation", q1_checkpointDir)
    +            .format("text")
    +            .start(q1_outputDir)
    +
    +        // q2 is a streaming query that reads q1's text outputs
    +        val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep")
    +
    +        def q1AddData(data: String*): StreamAction =
    +          Execute { _ =>
    +            q1_source.addData(data)
    +            q1.processAllAvailable()
    +          }
    +        def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +        testStream(q2)(
    +          // batch 0
    +          q1AddData("drop1", "keep2"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2"),
    +
    +          // batch 1
    +          Assert {
    +            // create a text file that won't be on q1's sink log
    +            // thus even if its contents contains "keep", it should NOT appear in q2's answer
    +            val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt")
    +            stringToFile(shouldNotKeep, "should_not_keep!!!")
    +            shouldNotKeep.exists()
    +          },
    +          q1AddData("keep3"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // batch 2: check that things work well when the sink log gets compacted
    +          q1AddData("keep4"),
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
    +          },
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          // stop q1 manually
    +          Execute { _ => q1.stop() }
    +        )
    +      }
    +    }
    +  }
    +
    +  test("read partitioned data from outputs of another streaming query") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to partitioned json files
    +      val q1_source = MemoryStream[(String, String)]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +      val q1 =
    +        q1_source
    +          .toDF()
    +          .select($"_1" as "partition", $"_2" as "value")
    +          .writeStream
    +          .option("checkpointLocation", q1_checkpointDir)
    +          .partitionBy("partition")
    +          .format("json")
    +          .start(q1_outputDir)
    +
    +      // q2 is a streaming query that reads q1's partitioned json outputs
    +      val schema = new StructType().add("value", StringType).add("partition", StringType)
    +      val q2 = createFileStream("json", q1_outputDir, Some(schema)).filter($"value" contains "keep")
    +
    +      def q1AddData(data: (String, String)*): StreamAction =
    +        Execute { _ =>
    +          q1_source.addData(data)
    +          q1.processAllAvailable()
    +        }
    +      def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +      testStream(q2)(
    +        // batch 0: append to a new partition=foo, should read value and partition
    +        q1AddData(("foo", "drop1"), ("foo", "keep2")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo")),
    +
    +        // batch 1: append to same partition=foo, should read value and partition
    +        q1AddData(("foo", "keep3")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
    +
    +        // batch 2: append to a different partition=bar, should read value and partition
    +        q1AddData(("bar", "keep4")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
    +
    +        // stop q1 manually
    +        Execute { _ => q1.stop() }
    +      )
    +    }
    +  }
    +
    +  test("start before another streaming query, and read its output") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to text files
    +      val q1_source = MemoryStream[String]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir")
    +      assert(q1_outputDir.mkdir())                    // prepare the output dir for q2 to read
    +      val q1_write = q1_source
    +        .toDF()
    +        .writeStream
    +        .option("checkpointLocation", q1_checkpointDir)
    +        .format("text")                               // define q1, but don't start it for now
    +      var q1: StreamingQuery = null
    +
    +      val q2 =
    +        createFileStream("text", q1_outputDir.getCanonicalPath).filter($"value" contains "keep")
    +
    +      testStream(q2)(
    +        AssertOnQuery { q2 =>
    +          val fileSource = getSourcesFromStreamingQuery(q2).head
    +          fileSource.sourceHasMetadata === None       // q1 has not started yet, verify that q2
    +                                                      // doesn't know whether q1 has metadata
    +        },
    +        Execute { _ =>
    +          q1 = q1_write.start(q1_outputDir.getCanonicalPath)  // start q1 !!!
    --- End diff --
    
    nit: `// start q1 !!!` is obvious. Don't add such comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    **[Test build #73374 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73374/testReport)** for PR 16987 at commit [`b66d2cc`](https://github.com/apache/spark/commit/b66d2ccabcae41973bd8af4ed406567dc071ff67).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103403986
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -159,28 +161,64 @@ class FileStreamSource(
     
       /**
        * If the source has a metadata log indicating which files should be read, then we should use it.
    -   * We figure out whether there exists some metadata log only when user gives a non-glob path.
    +   * Only when user gives a non-glob path that will we figure out whether the source has some
    +   * metadata log
    +   *
    +   * None        means we don't know at the moment
    +   * Some(true)  means we know for sure the source DOES have metadata
    +   * Some(false) means we know for sure the source DOSE NOT have metadata
    --- End diff --
    
    ( some notes here since the changes are not trival )
    
    here we're using this `sourceHasMetadata` to indicate whether we know for sure the source has metadata, as stated in the source file comments:
    - `None`        means we don't know at the moment
    - `Some(true)`  means we know for sure the source DOES have metadata
    - `Some(false)` means we know for sure the source DOSE NOT have metadata



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103405331
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -52,10 +52,7 @@ abstract class FileStreamSourceTest
             query.nonEmpty,
             "Cannot add data when there is no query for finding the active file stream source")
     
    -      val sources = query.get.logicalPlan.collect {
    -        case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
    -          source.asInstanceOf[FileStreamSource]
    -      }
    --- End diff --
    
    this common logic is extracted out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103565167
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    +        val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +        val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +        val q1 =
    +          q1_source
    +            .toDF()
    +            .writeStream
    +            .option("checkpointLocation", q1_checkpointDir)
    +            .format("text")
    +            .start(q1_outputDir)
    +
    +        // q2 is a streaming query that reads q1's text outputs
    +        val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep")
    +
    +        def q1AddData(data: String*): StreamAction =
    +          Execute { _ =>
    +            q1_source.addData(data)
    +            q1.processAllAvailable()
    +          }
    +        def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +        testStream(q2)(
    +          // batch 0
    +          q1AddData("drop1", "keep2"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2"),
    +
    +          // batch 1
    +          Assert {
    +            // create a text file that won't be on q1's sink log
    +            // thus even if its contents contains "keep", it should NOT appear in q2's answer
    +            val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt")
    +            stringToFile(shouldNotKeep, "should_not_keep!!!")
    +            shouldNotKeep.exists()
    +          },
    +          q1AddData("keep3"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // batch 2: check that things work well when the sink log gets compacted
    +          q1AddData("keep4"),
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
    +          },
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          // stop q1 manually
    +          Execute { _ => q1.stop() }
    +        )
    +      }
    +    }
    +  }
    +
    +  test("read partitioned data from outputs of another streaming query") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to partitioned json files
    +      val q1_source = MemoryStream[(String, String)]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +      val q1 =
    +        q1_source
    +          .toDF()
    +          .select($"_1" as "partition", $"_2" as "value")
    +          .writeStream
    +          .option("checkpointLocation", q1_checkpointDir)
    +          .partitionBy("partition")
    +          .format("json")
    +          .start(q1_outputDir)
    +
    +      // q2 is a streaming query that reads q1's partitioned json outputs
    +      val schema = new StructType().add("value", StringType).add("partition", StringType)
    +      val q2 = createFileStream("json", q1_outputDir, Some(schema)).filter($"value" contains "keep")
    +
    +      def q1AddData(data: (String, String)*): StreamAction =
    +        Execute { _ =>
    +          q1_source.addData(data)
    +          q1.processAllAvailable()
    +        }
    +      def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +      testStream(q2)(
    +        // batch 0: append to a new partition=foo, should read value and partition
    +        q1AddData(("foo", "drop1"), ("foo", "keep2")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo")),
    +
    +        // batch 1: append to same partition=foo, should read value and partition
    +        q1AddData(("foo", "keep3")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
    +
    +        // batch 2: append to a different partition=bar, should read value and partition
    +        q1AddData(("bar", "keep4")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
    +
    +        // stop q1 manually
    +        Execute { _ => q1.stop() }
    +      )
    +    }
    +  }
    +
    +  test("start before another streaming query, and read its output") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to text files
    +      val q1_source = MemoryStream[String]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir")
    +      assert(q1_outputDir.mkdir())                    // prepare the output dir for q2 to read
    --- End diff --
    
    nit: just put the command following the statement with 1 space. Using the current format is hard to maintain in future because it requires to align comments. Same for other comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103151253
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -158,12 +158,28 @@ class FileStreamSource(
       }
     
       /**
    +   * If the source has a metadata log indicating which files should be read, then we should use it.
    +   * We figure out whether there exists some metadata log only when user gives a non-glob path.
    +   */
    +  private val sourceHasMetadata: Boolean =
    --- End diff --
    
    Just found one corner case: if the query to write files has not yet started, the current folder will contain no files even it's an output folder of the file sink. I think we should always call `sourceHasMetadata` until the folder is not empty. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    **[Test build #73374 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73374/testReport)** for PR 16987 at commit [`b66d2cc`](https://github.com/apache/spark/commit/b66d2ccabcae41973bd8af4ed406567dc071ff67).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103014135
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +663,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  testWithUninterruptibleThread("read data from outputs of another streaming query") {
    --- End diff --
    
    nit: you can merge the latest master and use `test` directly. Not need to use `testWithUninterruptibleThread` after #16947


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103603671
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---
    @@ -208,6 +208,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
         }
       }
     
    +  /** Execute arbitrary code */
    +  case class Execute(val func: StreamExecution => Any) extends StreamAction {
    --- End diff --
    
    fixed, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103538636
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---
    @@ -208,6 +208,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
         }
       }
     
    +  /** Execute arbitrary code */
    +  case class Execute(val func: StreamExecution => Any) extends StreamAction {
    --- End diff --
    
    How about just make this extend `AssertOnQuery` to avoid adding new case clause to `testStream` which is already pretty long?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103104036
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +663,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  testWithUninterruptibleThread("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        val sinkLogDir = new File(dir, FileStreamSink.metadataDir).getCanonicalPath
    +        val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, sinkLogDir)
    +
    +        val fileStream = createFileStream("text", dir.getCanonicalPath)
    +        val filtered = fileStream.filter($"value" contains "keep")
    +
    +        def addIntoSinkLog(batch: Int, fileName: String): Boolean = {
    +          val unqualifiedDirPath = new Path(new File(dir, fileName).getCanonicalPath)
    +          val fs = unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration)
    +          val sinkFileStatus = SinkFileStatus(fs.getFileStatus(unqualifiedDirPath))
    +          sinkLog.add(batch, Array(sinkFileStatus))
    +        }
    +
    +        testStream(filtered)(
    +          // Create new dir and write to it, should read
    +          AddTextFileData("drop1\nkeep2", dir, tmp, Some("file_1")),
    +          Assert { addIntoSinkLog(0, "file_1") },
    +          CheckAnswer("keep2"),
    +
    +          // Create "file_2" but DO NOT add it to the log intentionally
    +          AddTextFileData("should_not_keep!!!", dir, tmp, Some("file_2")),
    +          Assert { new File(dir, "file_2").exists() },
    +          AddTextFileData("keep3", dir, tmp, Some("file_3")),
    +          Assert { addIntoSinkLog(1, "file_3") },
    +          // Should NOT read "file_2"; should read "file_3"
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // Check that things work well when the sink log gets compacted
    +          AddTextFileData("keep4", dir, tmp, Some("file_4")),
    +          Assert { addIntoSinkLog(2, "file_4") },
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(sinkLogDir, "2" + CompactibleFileStreamLog.COMPACT_FILE_SUFFIX).exists()
    +          },
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          AddTextFileData("keep5", dir, tmp, Some("file_5")),
    +          Assert { addIntoSinkLog(3, "file_5") },
    +          CheckAnswer("keep2", "keep3", "keep4", "keep5")
    +        )
    +      }
    +    }
    +  }
    +
    +  testWithUninterruptibleThread("read partitioned data from outputs of another streaming query") {
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103013496
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---
    @@ -243,13 +243,20 @@ case class DataSource(
             val path = caseInsensitiveOptions.getOrElse("path", {
               throw new IllegalArgumentException("'path' is not specified")
             })
    +        // If we're reading files from outputs of another streaming query, then it does not make
    +        // sense to glob files since we would get files from the metadata log.
    +        // Thus we would figure out whether there exists some metadata log only when user gives a
    +        // non-glob path.
    +        val sourceHasMetadata =
    +          !SparkHadoopUtil.get.isGlobPath(new Path(path)) && hasMetadata(Seq(path))
    --- End diff --
    
    I guess `sourceHasMetadata` is generated here is because of `hasMetadata`. Could you move `hasMetadata` to `object FileStreamSink`? Then you can do it inside `FileStreamSource`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103603687
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103366158
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -158,12 +158,28 @@ class FileStreamSource(
       }
     
       /**
    +   * If the source has a metadata log indicating which files should be read, then we should use it.
    +   * We figure out whether there exists some metadata log only when user gives a non-glob path.
    +   */
    +  private val sourceHasMetadata: Boolean =
    --- End diff --
    
    and add a dedicated test case of course


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    LGTM. Thanks! Merging to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103603693
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103603898
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    +        val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +        val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +        val q1 =
    +          q1_source
    +            .toDF()
    +            .writeStream
    +            .option("checkpointLocation", q1_checkpointDir)
    +            .format("text")
    +            .start(q1_outputDir)
    +
    +        // q2 is a streaming query that reads q1's text outputs
    +        val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep")
    +
    +        def q1AddData(data: String*): StreamAction =
    +          Execute { _ =>
    +            q1_source.addData(data)
    +            q1.processAllAvailable()
    +          }
    +        def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +        testStream(q2)(
    +          // batch 0
    +          q1AddData("drop1", "keep2"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2"),
    +
    +          // batch 1
    +          Assert {
    +            // create a text file that won't be on q1's sink log
    +            // thus even if its contents contains "keep", it should NOT appear in q2's answer
    +            val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt")
    +            stringToFile(shouldNotKeep, "should_not_keep!!!")
    +            shouldNotKeep.exists()
    +          },
    +          q1AddData("keep3"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // batch 2: check that things work well when the sink log gets compacted
    +          q1AddData("keep4"),
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
    +          },
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          // stop q1 manually
    +          Execute { _ => q1.stop() }
    +        )
    +      }
    +    }
    +  }
    +
    +  test("read partitioned data from outputs of another streaming query") {
    --- End diff --
    
    test removed -- Let me think about this write partition infommation thing :)
    thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103603942
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    +        val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +        val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +        val q1 =
    +          q1_source
    +            .toDF()
    +            .writeStream
    +            .option("checkpointLocation", q1_checkpointDir)
    +            .format("text")
    +            .start(q1_outputDir)
    +
    +        // q2 is a streaming query that reads q1's text outputs
    +        val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep")
    +
    +        def q1AddData(data: String*): StreamAction =
    +          Execute { _ =>
    +            q1_source.addData(data)
    +            q1.processAllAvailable()
    +          }
    +        def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +        testStream(q2)(
    +          // batch 0
    +          q1AddData("drop1", "keep2"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2"),
    +
    +          // batch 1
    +          Assert {
    +            // create a text file that won't be on q1's sink log
    +            // thus even if its contents contains "keep", it should NOT appear in q2's answer
    +            val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt")
    +            stringToFile(shouldNotKeep, "should_not_keep!!!")
    +            shouldNotKeep.exists()
    +          },
    +          q1AddData("keep3"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // batch 2: check that things work well when the sink log gets compacted
    +          q1AddData("keep4"),
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
    +          },
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          // stop q1 manually
    +          Execute { _ => q1.stop() }
    +        )
    +      }
    +    }
    +  }
    +
    +  test("read partitioned data from outputs of another streaming query") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to partitioned json files
    +      val q1_source = MemoryStream[(String, String)]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +      val q1 =
    +        q1_source
    +          .toDF()
    +          .select($"_1" as "partition", $"_2" as "value")
    +          .writeStream
    +          .option("checkpointLocation", q1_checkpointDir)
    +          .partitionBy("partition")
    +          .format("json")
    +          .start(q1_outputDir)
    +
    +      // q2 is a streaming query that reads q1's partitioned json outputs
    +      val schema = new StructType().add("value", StringType).add("partition", StringType)
    +      val q2 = createFileStream("json", q1_outputDir, Some(schema)).filter($"value" contains "keep")
    +
    +      def q1AddData(data: (String, String)*): StreamAction =
    +        Execute { _ =>
    +          q1_source.addData(data)
    +          q1.processAllAvailable()
    +        }
    +      def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +      testStream(q2)(
    +        // batch 0: append to a new partition=foo, should read value and partition
    +        q1AddData(("foo", "drop1"), ("foo", "keep2")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo")),
    +
    +        // batch 1: append to same partition=foo, should read value and partition
    +        q1AddData(("foo", "keep3")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
    +
    +        // batch 2: append to a different partition=bar, should read value and partition
    +        q1AddData(("bar", "keep4")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
    +
    +        // stop q1 manually
    +        Execute { _ => q1.stop() }
    +      )
    +    }
    +  }
    +
    +  test("start before another streaming query, and read its output") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to text files
    +      val q1_source = MemoryStream[String]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir")
    +      assert(q1_outputDir.mkdir())                    // prepare the output dir for q2 to read
    +      val q1_write = q1_source
    +        .toDF()
    +        .writeStream
    +        .option("checkpointLocation", q1_checkpointDir)
    +        .format("text")                               // define q1, but don't start it for now
    +      var q1: StreamingQuery = null
    +
    +      val q2 =
    +        createFileStream("text", q1_outputDir.getCanonicalPath).filter($"value" contains "keep")
    +
    +      testStream(q2)(
    +        AssertOnQuery { q2 =>
    +          val fileSource = getSourcesFromStreamingQuery(q2).head
    +          fileSource.sourceHasMetadata === None       // q1 has not started yet, verify that q2
    +                                                      // doesn't know whether q1 has metadata
    +        },
    +        Execute { _ =>
    +          q1 = q1_write.start(q1_outputDir.getCanonicalPath)  // start q1 !!!
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103565336
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    +        val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +        val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +        val q1 =
    +          q1_source
    +            .toDF()
    +            .writeStream
    +            .option("checkpointLocation", q1_checkpointDir)
    +            .format("text")
    +            .start(q1_outputDir)
    +
    +        // q2 is a streaming query that reads q1's text outputs
    +        val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep")
    +
    +        def q1AddData(data: String*): StreamAction =
    +          Execute { _ =>
    +            q1_source.addData(data)
    +            q1.processAllAvailable()
    +          }
    +        def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +        testStream(q2)(
    +          // batch 0
    +          q1AddData("drop1", "keep2"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2"),
    +
    +          // batch 1
    +          Assert {
    +            // create a text file that won't be on q1's sink log
    +            // thus even if its contents contains "keep", it should NOT appear in q2's answer
    +            val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt")
    +            stringToFile(shouldNotKeep, "should_not_keep!!!")
    +            shouldNotKeep.exists()
    +          },
    +          q1AddData("keep3"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // batch 2: check that things work well when the sink log gets compacted
    +          q1AddData("keep4"),
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
    +          },
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          // stop q1 manually
    +          Execute { _ => q1.stop() }
    +        )
    +      }
    +    }
    +  }
    +
    +  test("read partitioned data from outputs of another streaming query") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to partitioned json files
    +      val q1_source = MemoryStream[(String, String)]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +      val q1 =
    +        q1_source
    +          .toDF()
    +          .select($"_1" as "partition", $"_2" as "value")
    +          .writeStream
    +          .option("checkpointLocation", q1_checkpointDir)
    +          .partitionBy("partition")
    +          .format("json")
    +          .start(q1_outputDir)
    +
    +      // q2 is a streaming query that reads q1's partitioned json outputs
    +      val schema = new StructType().add("value", StringType).add("partition", StringType)
    +      val q2 = createFileStream("json", q1_outputDir, Some(schema)).filter($"value" contains "keep")
    +
    +      def q1AddData(data: (String, String)*): StreamAction =
    +        Execute { _ =>
    +          q1_source.addData(data)
    +          q1.processAllAvailable()
    +        }
    +      def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +      testStream(q2)(
    +        // batch 0: append to a new partition=foo, should read value and partition
    +        q1AddData(("foo", "drop1"), ("foo", "keep2")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo")),
    +
    +        // batch 1: append to same partition=foo, should read value and partition
    +        q1AddData(("foo", "keep3")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
    +
    +        // batch 2: append to a different partition=bar, should read value and partition
    +        q1AddData(("bar", "keep4")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
    +
    +        // stop q1 manually
    +        Execute { _ => q1.stop() }
    +      )
    +    }
    +  }
    +
    +  test("start before another streaming query, and read its output") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to text files
    +      val q1_source = MemoryStream[String]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir")
    +      assert(q1_outputDir.mkdir())                    // prepare the output dir for q2 to read
    +      val q1_write = q1_source
    +        .toDF()
    +        .writeStream
    +        .option("checkpointLocation", q1_checkpointDir)
    +        .format("text")                               // define q1, but don't start it for now
    +      var q1: StreamingQuery = null
    +
    +      val q2 =
    +        createFileStream("text", q1_outputDir.getCanonicalPath).filter($"value" contains "keep")
    +
    +      testStream(q2)(
    +        AssertOnQuery { q2 =>
    +          val fileSource = getSourcesFromStreamingQuery(q2).head
    +          fileSource.sourceHasMetadata === None       // q1 has not started yet, verify that q2
    --- End diff --
    
    nit: put the comment above this line. Same for other comments
    ```
    // q1 has not started yet, verify that q2 doesn't know whether q1 has metadata
    fileSource.sourceHasMetadata === None 
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103104024
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +663,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  testWithUninterruptibleThread("read data from outputs of another streaming query") {
    --- End diff --
    
    done; thanks! and good job for https://github.com/apache/spark/pull/16947!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    **[Test build #73578 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73578/testReport)** for PR 16987 at commit [`eed1c04`](https://github.com/apache/spark/commit/eed1c049c7cc955085c701f43f0d461e86aba328).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin closed the pull request at:

    https://github.com/apache/spark/pull/16987


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103363591
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -158,12 +158,28 @@ class FileStreamSource(
       }
     
       /**
    +   * If the source has a metadata log indicating which files should be read, then we should use it.
    +   * We figure out whether there exists some metadata log only when user gives a non-glob path.
    +   */
    +  private val sourceHasMetadata: Boolean =
    --- End diff --
    
    Actually, why not just change `sourceHasMetadata` to a method? `sparkSession.sessionState.newHadoopConf()` seems expensive but we can save it into a field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [WIP][SPARK-][SS] FileSource read from FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103603935
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    +        val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +        val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +        val q1 =
    +          q1_source
    +            .toDF()
    +            .writeStream
    +            .option("checkpointLocation", q1_checkpointDir)
    +            .format("text")
    +            .start(q1_outputDir)
    +
    +        // q2 is a streaming query that reads q1's text outputs
    +        val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep")
    +
    +        def q1AddData(data: String*): StreamAction =
    +          Execute { _ =>
    +            q1_source.addData(data)
    +            q1.processAllAvailable()
    +          }
    +        def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +        testStream(q2)(
    +          // batch 0
    +          q1AddData("drop1", "keep2"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2"),
    +
    +          // batch 1
    +          Assert {
    +            // create a text file that won't be on q1's sink log
    +            // thus even if its contents contains "keep", it should NOT appear in q2's answer
    +            val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt")
    +            stringToFile(shouldNotKeep, "should_not_keep!!!")
    +            shouldNotKeep.exists()
    +          },
    +          q1AddData("keep3"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // batch 2: check that things work well when the sink log gets compacted
    +          q1AddData("keep4"),
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
    +          },
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          // stop q1 manually
    +          Execute { _ => q1.stop() }
    +        )
    +      }
    +    }
    +  }
    +
    +  test("read partitioned data from outputs of another streaming query") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to partitioned json files
    +      val q1_source = MemoryStream[(String, String)]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +      val q1 =
    +        q1_source
    +          .toDF()
    +          .select($"_1" as "partition", $"_2" as "value")
    +          .writeStream
    +          .option("checkpointLocation", q1_checkpointDir)
    +          .partitionBy("partition")
    +          .format("json")
    +          .start(q1_outputDir)
    +
    +      // q2 is a streaming query that reads q1's partitioned json outputs
    +      val schema = new StructType().add("value", StringType).add("partition", StringType)
    +      val q2 = createFileStream("json", q1_outputDir, Some(schema)).filter($"value" contains "keep")
    +
    +      def q1AddData(data: (String, String)*): StreamAction =
    +        Execute { _ =>
    +          q1_source.addData(data)
    +          q1.processAllAvailable()
    +        }
    +      def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +      testStream(q2)(
    +        // batch 0: append to a new partition=foo, should read value and partition
    +        q1AddData(("foo", "drop1"), ("foo", "keep2")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo")),
    +
    +        // batch 1: append to same partition=foo, should read value and partition
    +        q1AddData(("foo", "keep3")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
    +
    +        // batch 2: append to a different partition=bar, should read value and partition
    +        q1AddData(("bar", "keep4")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
    +
    +        // stop q1 manually
    +        Execute { _ => q1.stop() }
    +      )
    +    }
    +  }
    +
    +  test("start before another streaming query, and read its output") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to text files
    +      val q1_source = MemoryStream[String]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir")
    +      assert(q1_outputDir.mkdir())                    // prepare the output dir for q2 to read
    +      val q1_write = q1_source
    +        .toDF()
    +        .writeStream
    +        .option("checkpointLocation", q1_checkpointDir)
    +        .format("text")                               // define q1, but don't start it for now
    +      var q1: StreamingQuery = null
    +
    +      val q2 =
    +        createFileStream("text", q1_outputDir.getCanonicalPath).filter($"value" contains "keep")
    +
    +      testStream(q2)(
    +        AssertOnQuery { q2 =>
    +          val fileSource = getSourcesFromStreamingQuery(q2).head
    +          fileSource.sourceHasMetadata === None       // q1 has not started yet, verify that q2
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103565523
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    +        val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +        val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +        val q1 =
    +          q1_source
    +            .toDF()
    +            .writeStream
    +            .option("checkpointLocation", q1_checkpointDir)
    +            .format("text")
    +            .start(q1_outputDir)
    +
    +        // q2 is a streaming query that reads q1's text outputs
    +        val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep")
    +
    +        def q1AddData(data: String*): StreamAction =
    +          Execute { _ =>
    +            q1_source.addData(data)
    +            q1.processAllAvailable()
    +          }
    +        def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +        testStream(q2)(
    +          // batch 0
    +          q1AddData("drop1", "keep2"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2"),
    +
    +          // batch 1
    +          Assert {
    +            // create a text file that won't be on q1's sink log
    +            // thus even if its contents contains "keep", it should NOT appear in q2's answer
    +            val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt")
    +            stringToFile(shouldNotKeep, "should_not_keep!!!")
    +            shouldNotKeep.exists()
    +          },
    +          q1AddData("keep3"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // batch 2: check that things work well when the sink log gets compacted
    +          q1AddData("keep4"),
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
    +          },
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          // stop q1 manually
    +          Execute { _ => q1.stop() }
    +        )
    +      }
    +    }
    +  }
    +
    +  test("read partitioned data from outputs of another streaming query") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to partitioned json files
    +      val q1_source = MemoryStream[(String, String)]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +      val q1 =
    +        q1_source
    +          .toDF()
    +          .select($"_1" as "partition", $"_2" as "value")
    +          .writeStream
    +          .option("checkpointLocation", q1_checkpointDir)
    +          .partitionBy("partition")
    +          .format("json")
    +          .start(q1_outputDir)
    +
    +      // q2 is a streaming query that reads q1's partitioned json outputs
    +      val schema = new StructType().add("value", StringType).add("partition", StringType)
    +      val q2 = createFileStream("json", q1_outputDir, Some(schema)).filter($"value" contains "keep")
    +
    +      def q1AddData(data: (String, String)*): StreamAction =
    +        Execute { _ =>
    +          q1_source.addData(data)
    +          q1.processAllAvailable()
    +        }
    +      def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +      testStream(q2)(
    +        // batch 0: append to a new partition=foo, should read value and partition
    +        q1AddData(("foo", "drop1"), ("foo", "keep2")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo")),
    +
    +        // batch 1: append to same partition=foo, should read value and partition
    +        q1AddData(("foo", "keep3")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
    +
    +        // batch 2: append to a different partition=bar, should read value and partition
    +        q1AddData(("bar", "keep4")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
    +
    +        // stop q1 manually
    +        Execute { _ => q1.stop() }
    +      )
    +    }
    +  }
    +
    +  test("start before another streaming query, and read its output") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to text files
    +      val q1_source = MemoryStream[String]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir")
    +      assert(q1_outputDir.mkdir())                    // prepare the output dir for q2 to read
    +      val q1_write = q1_source
    +        .toDF()
    +        .writeStream
    +        .option("checkpointLocation", q1_checkpointDir)
    +        .format("text")                               // define q1, but don't start it for now
    +      var q1: StreamingQuery = null
    +
    +      val q2 =
    +        createFileStream("text", q1_outputDir.getCanonicalPath).filter($"value" contains "keep")
    +
    +      testStream(q2)(
    +        AssertOnQuery { q2 =>
    +          val fileSource = getSourcesFromStreamingQuery(q2).head
    +          fileSource.sourceHasMetadata === None       // q1 has not started yet, verify that q2
    +                                                      // doesn't know whether q1 has metadata
    +        },
    +        Execute { _ =>
    +          q1 = q1_write.start(q1_outputDir.getCanonicalPath)  // start q1 !!!
    +          q1_source.addData("drop1", "keep2")
    +          q1.processAllAvailable()
    +        },
    +        AssertOnQuery { q2 =>
    +          q2.processAllAvailable()
    +          val fileSource = getSourcesFromStreamingQuery(q2).head
    +          fileSource.sourceHasMetadata === Some(true) // q1 has started, verify that q2 knows q1 has
    +                                                      // metadata by now
    +        },
    +        CheckAnswer("keep2"),                         // answer should be correct
    --- End diff --
    
    nit: `// answer should be correct` is obvious. Don't add such comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [WIP][SPARK-][SS] FileSource read from FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    **[Test build #73124 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73124/testReport)** for PR 16987 at commit [`b66d2cc`](https://github.com/apache/spark/commit/b66d2ccabcae41973bd8af4ed406567dc071ff67).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    **[Test build #73653 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73653/testReport)** for PR 16987 at commit [`62fd518`](https://github.com/apache/spark/commit/62fd5189d9ba27ed3ed20cc103252aa9fdac052a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103567273
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    +        val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +        val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +        val q1 =
    +          q1_source
    +            .toDF()
    +            .writeStream
    +            .option("checkpointLocation", q1_checkpointDir)
    +            .format("text")
    +            .start(q1_outputDir)
    +
    +        // q2 is a streaming query that reads q1's text outputs
    +        val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep")
    +
    +        def q1AddData(data: String*): StreamAction =
    +          Execute { _ =>
    +            q1_source.addData(data)
    +            q1.processAllAvailable()
    +          }
    +        def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +        testStream(q2)(
    +          // batch 0
    +          q1AddData("drop1", "keep2"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2"),
    +
    +          // batch 1
    +          Assert {
    +            // create a text file that won't be on q1's sink log
    +            // thus even if its contents contains "keep", it should NOT appear in q2's answer
    +            val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt")
    +            stringToFile(shouldNotKeep, "should_not_keep!!!")
    +            shouldNotKeep.exists()
    +          },
    +          q1AddData("keep3"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // batch 2: check that things work well when the sink log gets compacted
    +          q1AddData("keep4"),
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
    +          },
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          // stop q1 manually
    --- End diff --
    
    nit: `// stop q1 manually` is obvious. Don't add such comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    **[Test build #73382 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73382/testReport)** for PR 16987 at commit [`b66d2cc`](https://github.com/apache/spark/commit/b66d2ccabcae41973bd8af4ed406567dc071ff67).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73382/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73492/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103567860
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    +        val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +        val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +        val q1 =
    +          q1_source
    +            .toDF()
    +            .writeStream
    +            .option("checkpointLocation", q1_checkpointDir)
    +            .format("text")
    +            .start(q1_outputDir)
    +
    +        // q2 is a streaming query that reads q1's text outputs
    +        val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep")
    +
    +        def q1AddData(data: String*): StreamAction =
    +          Execute { _ =>
    +            q1_source.addData(data)
    +            q1.processAllAvailable()
    +          }
    +        def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +        testStream(q2)(
    +          // batch 0
    +          q1AddData("drop1", "keep2"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2"),
    +
    +          // batch 1
    +          Assert {
    +            // create a text file that won't be on q1's sink log
    +            // thus even if its contents contains "keep", it should NOT appear in q2's answer
    +            val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt")
    +            stringToFile(shouldNotKeep, "should_not_keep!!!")
    +            shouldNotKeep.exists()
    +          },
    +          q1AddData("keep3"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // batch 2: check that things work well when the sink log gets compacted
    +          q1AddData("keep4"),
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
    +          },
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          // stop q1 manually
    +          Execute { _ => q1.stop() }
    +        )
    +      }
    +    }
    +  }
    +
    +  test("read partitioned data from outputs of another streaming query") {
    --- End diff --
    
    This test seems not necessary. It will pass even if the source doesn't use the partition information.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103104003
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---
    @@ -243,13 +243,20 @@ case class DataSource(
             val path = caseInsensitiveOptions.getOrElse("path", {
               throw new IllegalArgumentException("'path' is not specified")
             })
    +        // If we're reading files from outputs of another streaming query, then it does not make
    +        // sense to glob files since we would get files from the metadata log.
    +        // Thus we would figure out whether there exists some metadata log only when user gives a
    +        // non-glob path.
    +        val sourceHasMetadata =
    +          !SparkHadoopUtil.get.isGlobPath(new Path(path)) && hasMetadata(Seq(path))
    --- End diff --
    
    Yea `hasMetadata` was the reason! Now it lives in `object FileStreamSink` :-D


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/16987


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    I spoke too soon, sorry!  Thinking about it more the deterministic filename solution is not great as the number of partitions could change for several reasons.
    
    Given that would you mind reopening this?
    
    /cc @zsxwing do you have time to review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103366082
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -158,12 +158,28 @@ class FileStreamSource(
       }
     
       /**
    +   * If the source has a metadata log indicating which files should be read, then we should use it.
    +   * We figure out whether there exists some metadata log only when user gives a non-glob path.
    +   */
    +  private val sourceHasMetadata: Boolean =
    --- End diff --
    
    ah thanks! I was about to change it to a method which would stop detecting once we know for sure to use a metadatafileindex or a inmemoryfileindex and save up this information. will udpate with this code soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
GitHub user lw-lin reopened a pull request:

    https://github.com/apache/spark/pull/16987

    [SPARK-19633][SS] FileSource read from FileSink

    ## What changes were proposed in this pull request?
    
    Right now file source always uses `InMemoryFileIndex` to scan files from a given path.
    
    But when reading the outputs from another streaming query, the file source should use `MetadataFileIndex` to list files from the sink log. This patch adds this support.
    
    ## `MetadataFileIndex` or `InMemoryFileIndex`
    ```scala
    spark
      .readStream
      .format(...)
      .load("/some/path") // for a non-glob path:
                          //   - use `MetadataFileIndex` when `/some/path/_spark_meta` exists
                          //   - fall back to `InMemoryFileIndex` otherwise
    ```
    ```scala
    spark
      .readStream
      .format(...)
      .load("/some/path/*/*") // for a glob path: always use `InMemoryFileIndex`
    ```
    
    ## How was this patch tested?
    
    two newly added tests

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/lw-lin/spark source-read-from-sink

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16987.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16987
    
----
commit b66d2ccabcae41973bd8af4ed406567dc071ff67
Author: Liwei Lin <lw...@gmail.com>
Date:   2017-02-18T01:20:18Z

    File Source reads from File Sink

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103604050
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -190,32 +190,31 @@ class FileStreamSource(
         val startTime = System.nanoTime
     
         var allFiles: Seq[FileStatus] = null
    -    if (sourceHasMetadata.isEmpty) {
    -      if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
    -        sourceHasMetadata = Some(true)
    -        allFiles = allFilesUsingMetadataLogFileIndex()
    -      } else {
    -        allFiles = allFilesUsingInMemoryFileIndex()
    -        if (allFiles.isEmpty) {
    -          // we still cannot decide
    +    sourceHasMetadata match {
    --- End diff --
    
    simply switched to `sourceHasMetadata match { case... case ... case ...}`
    actual diff is quite small


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [WIP][SPARK-][SS] FileSource read from FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    **[Test build #73126 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73126/testReport)** for PR 16987 at commit [`b66d2cc`](https://github.com/apache/spark/commit/b66d2ccabcae41973bd8af4ed406567dc071ff67).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r101917807
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -76,12 +76,13 @@ abstract class FileStreamSourceTest
         protected def addData(source: FileStreamSource): Unit
       }
     
    -  case class AddTextFileData(content: String, src: File, tmp: File)
    -    extends AddFileData {
    +  case class AddTextFileData (
    +      content: String, src: File, tmp: File, finalFileName: Option[String] = None
    +    ) extends AddFileData {
     
         override def addData(source: FileStreamSource): Unit = {
           val tempFile = Utils.tempFileWith(new File(tmp, "text"))
    -      val finalFile = new File(src, tempFile.getName)
    +      val finalFile = new File(src, finalFileName.getOrElse(tempFile.getName))
    --- End diff --
    
    this is to keep track of the file name for later checking


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73578/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    **[Test build #73578 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73578/testReport)** for PR 16987 at commit [`eed1c04`](https://github.com/apache/spark/commit/eed1c049c7cc955085c701f43f0d461e86aba328).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103569221
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    +        val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +        val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +        val q1 =
    +          q1_source
    +            .toDF()
    +            .writeStream
    +            .option("checkpointLocation", q1_checkpointDir)
    +            .format("text")
    +            .start(q1_outputDir)
    +
    +        // q2 is a streaming query that reads q1's text outputs
    +        val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep")
    +
    +        def q1AddData(data: String*): StreamAction =
    +          Execute { _ =>
    +            q1_source.addData(data)
    +            q1.processAllAvailable()
    +          }
    +        def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +        testStream(q2)(
    +          // batch 0
    +          q1AddData("drop1", "keep2"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2"),
    +
    +          // batch 1
    +          Assert {
    +            // create a text file that won't be on q1's sink log
    +            // thus even if its contents contains "keep", it should NOT appear in q2's answer
    +            val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt")
    +            stringToFile(shouldNotKeep, "should_not_keep!!!")
    +            shouldNotKeep.exists()
    +          },
    +          q1AddData("keep3"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // batch 2: check that things work well when the sink log gets compacted
    +          q1AddData("keep4"),
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
    +          },
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          // stop q1 manually
    +          Execute { _ => q1.stop() }
    +        )
    +      }
    +    }
    +  }
    +
    +  test("read partitioned data from outputs of another streaming query") {
    --- End diff --
    
    In the long term, we should write the partition information to the file sink log, then we can read it in the file source. However, it's out of scope. If you have time, you can think about it and submit a new PR after this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    **[Test build #73382 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73382/testReport)** for PR 16987 at commit [`b66d2cc`](https://github.com/apache/spark/commit/b66d2ccabcae41973bd8af4ed406567dc071ff67).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [WIP][SPARK-][SS] FileSource read from FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73126/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    @marmbrus @zsxwing would you take a look at this? thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [WIP][SPARK-][SS] FileSource read from FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    **[Test build #73126 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73126/testReport)** for PR 16987 at commit [`b66d2cc`](https://github.com/apache/spark/commit/b66d2ccabcae41973bd8af4ed406567dc071ff67).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    **[Test build #73492 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73492/testReport)** for PR 16987 at commit [`d31cb76`](https://github.com/apache/spark/commit/d31cb76756f7aa2c9c3c803d263ae81f5f509ff2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103603945
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    +        val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +        val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +        val q1 =
    +          q1_source
    +            .toDF()
    +            .writeStream
    +            .option("checkpointLocation", q1_checkpointDir)
    +            .format("text")
    +            .start(q1_outputDir)
    +
    +        // q2 is a streaming query that reads q1's text outputs
    +        val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep")
    +
    +        def q1AddData(data: String*): StreamAction =
    +          Execute { _ =>
    +            q1_source.addData(data)
    +            q1.processAllAvailable()
    +          }
    +        def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +        testStream(q2)(
    +          // batch 0
    +          q1AddData("drop1", "keep2"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2"),
    +
    +          // batch 1
    +          Assert {
    +            // create a text file that won't be on q1's sink log
    +            // thus even if its contents contains "keep", it should NOT appear in q2's answer
    +            val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt")
    +            stringToFile(shouldNotKeep, "should_not_keep!!!")
    +            shouldNotKeep.exists()
    +          },
    +          q1AddData("keep3"),
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3"),
    +
    +          // batch 2: check that things work well when the sink log gets compacted
    +          q1AddData("keep4"),
    +          Assert {
    +            // compact interval is 3, so file "2.compact" should exist
    +            new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
    +          },
    +          q2ProcessAllAvailable(),
    +          CheckAnswer("keep2", "keep3", "keep4"),
    +
    +          // stop q1 manually
    +          Execute { _ => q1.stop() }
    +        )
    +      }
    +    }
    +  }
    +
    +  test("read partitioned data from outputs of another streaming query") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to partitioned json files
    +      val q1_source = MemoryStream[(String, String)]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
    +      val q1 =
    +        q1_source
    +          .toDF()
    +          .select($"_1" as "partition", $"_2" as "value")
    +          .writeStream
    +          .option("checkpointLocation", q1_checkpointDir)
    +          .partitionBy("partition")
    +          .format("json")
    +          .start(q1_outputDir)
    +
    +      // q2 is a streaming query that reads q1's partitioned json outputs
    +      val schema = new StructType().add("value", StringType).add("partition", StringType)
    +      val q2 = createFileStream("json", q1_outputDir, Some(schema)).filter($"value" contains "keep")
    +
    +      def q1AddData(data: (String, String)*): StreamAction =
    +        Execute { _ =>
    +          q1_source.addData(data)
    +          q1.processAllAvailable()
    +        }
    +      def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
    +
    +      testStream(q2)(
    +        // batch 0: append to a new partition=foo, should read value and partition
    +        q1AddData(("foo", "drop1"), ("foo", "keep2")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo")),
    +
    +        // batch 1: append to same partition=foo, should read value and partition
    +        q1AddData(("foo", "keep3")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
    +
    +        // batch 2: append to a different partition=bar, should read value and partition
    +        q1AddData(("bar", "keep4")),
    +        q2ProcessAllAvailable(),
    +        CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
    +
    +        // stop q1 manually
    +        Execute { _ => q1.stop() }
    +      )
    +    }
    +  }
    +
    +  test("start before another streaming query, and read its output") {
    +    withTempDirs { case (dir, tmp) =>
    +      // q1 is a streaming query that reads from memory and writes to text files
    +      val q1_source = MemoryStream[String]
    +      val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath
    +      val q1_outputDir = new File(dir, "q1_outputDir")
    +      assert(q1_outputDir.mkdir())                    // prepare the output dir for q2 to read
    +      val q1_write = q1_source
    +        .toDF()
    +        .writeStream
    +        .option("checkpointLocation", q1_checkpointDir)
    +        .format("text")                               // define q1, but don't start it for now
    +      var q1: StreamingQuery = null
    +
    +      val q2 =
    +        createFileStream("text", q1_outputDir.getCanonicalPath).filter($"value" contains "keep")
    +
    +      testStream(q2)(
    +        AssertOnQuery { q2 =>
    +          val fileSource = getSourcesFromStreamingQuery(q2).head
    +          fileSource.sourceHasMetadata === None       // q1 has not started yet, verify that q2
    +                                                      // doesn't know whether q1 has metadata
    +        },
    +        Execute { _ =>
    +          q1 = q1_write.start(q1_outputDir.getCanonicalPath)  // start q1 !!!
    +          q1_source.addData("drop1", "keep2")
    +          q1.processAllAvailable()
    +        },
    +        AssertOnQuery { q2 =>
    +          q2.processAllAvailable()
    +          val fileSource = getSourcesFromStreamingQuery(q2).head
    +          fileSource.sourceHasMetadata === Some(true) // q1 has started, verify that q2 knows q1 has
    +                                                      // metadata by now
    +        },
    +        CheckAnswer("keep2"),                         // answer should be correct
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16987: [WIP][SPARK-][SS] FileSource read from FileSink

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16987
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16987#discussion_r103538967
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         }
       }
     
    +  test("read data from outputs of another streaming query") {
    +    withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
    +      withTempDirs { case (dir, tmp) =>
    +        // q1 is a streaming query that reads from memory and writes to text files
    +        val q1_source = MemoryStream[String]
    --- End diff --
    
    nit: please don't use `_` in a variable name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org