You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2016/04/22 03:06:15 UTC

[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/12592

    [SPARK-14833][SQL][STREAMING][TEST] Refactor StreamTests to test for source fault-tolerance correctly.

    ## What changes were proposed in this pull request?
    
    Current StreamTest allows testing of a streaming Dataset generated explicitly wraps a source. This is different from the actual production code path where the source object is dynamically created through a DataSource object every time a query is started. So all the fault-tolerance testing in FileSourceSuite and FileSourceStressSuite is not really testing the actual code path as they are just reusing the FileStreamSource object. This PR fixes StreamTest and the FileSource***Suite to test this correctly. Instead of maintaining a mapping of source --> expected offset in StreamTest (which requires reuse of source object), it now maintains a mapping of source index --> offset, so that it is independent of the source object.
    
    ## How was this patch tested?
    
    Refactored unit tests.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-14833

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/12592.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #12592
    
----
commit ef65ad768d6e6ca70b6339c43b57ee61576f77ec
Author: Tathagata Das <ta...@gmail.com>
Date:   2016-04-22T00:33:06Z

    Refactored test code for better testing

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213190118
  
    **[Test build #56621 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56621/consoleFull)** for PR 12592 at commit [`ef65ad7`](https://github.com/apache/spark/commit/ef65ad768d6e6ca70b6339c43b57ee61576f77ec).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213653499
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213570739
  
    LGTM except some nits and the style issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12592#discussion_r60679490
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala ---
    @@ -67,12 +67,6 @@ import org.apache.spark.util.Utils
      */
     trait StreamTest extends QueryTest with Timeouts {
     
    -  implicit class RichSource(s: Source) {
    -    def toDF(): DataFrame = Dataset.ofRows(sqlContext, StreamingExecutionRelation(s))
    -
    -    def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, StreamingExecutionRelation(s))
    -  }
    -
       /** How long to wait for an active stream to catch up when checking a result. */
    --- End diff --
    
    Removed the above lines to make sure that in future we do not use this pattern of creating DF/DS on source for testing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213201183
  
    **[Test build #56625 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56625/consoleFull)** for PR 12592 at commit [`48f1e6a`](https://github.com/apache/spark/commit/48f1e6a6cea7ac30de3c13e3e9dd4c9762fb83be).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213201198
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/56625/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12592#discussion_r60679517
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala ---
    @@ -95,20 +89,18 @@ trait StreamTest extends QueryTest with Timeouts {
     
       /** A trait that can be extended when testing other sources. */
       trait AddData extends StreamAction {
    -    def source: Source
    -
         /**
          * Called to trigger adding the data.  Should return the offset that will denote when this
          * new data has been processed.
          */
    -    def addData(): Offset
    +    def addData(query: Option[StreamExecution]): (Source, Offset)
    --- End diff --
    
    note to self: update docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213653449
  
    **[Test build #56762 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56762/consoleFull)** for PR 12592 at commit [`532a69c`](https://github.com/apache/spark/commit/532a69ccd6938f3d3e4ddbd8ec44d8c2962afe80).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12592#discussion_r60793404
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -205,165 +262,139 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
       }
     
       test("read from text files") {
    -    val src = Utils.createTempDir(namePrefix = "streaming.src")
    -    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
    -
    -    val textSource = createFileStreamSource("text", src.getCanonicalPath)
    -    val filtered = textSource.toDF().filter($"value" contains "keep")
    -
    -    testStream(filtered)(
    -      AddTextFileData(textSource, "drop1\nkeep2\nkeep3", src, tmp),
    -      CheckAnswer("keep2", "keep3"),
    -      StopStream,
    -      AddTextFileData(textSource, "drop4\nkeep5\nkeep6", src, tmp),
    -      StartStream,
    -      CheckAnswer("keep2", "keep3", "keep5", "keep6"),
    -      AddTextFileData(textSource, "drop7\nkeep8\nkeep9", src, tmp),
    -      CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
    -    )
    -
    -    Utils.deleteRecursively(src)
    -    Utils.deleteRecursively(tmp)
    +    withTempDirs { case (src, tmp) =>
    +      val textStream = createFileStream("text", src.getCanonicalPath)
    +      val filtered = textStream.filter($"value" contains "keep")
    +
    +      testStream(filtered)(
    +        AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
    +        CheckAnswer("keep2", "keep3"),
    +        StopStream,
    +        AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
    +        StartStream,
    +        CheckAnswer("keep2", "keep3", "keep5", "keep6"),
    +        AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
    +        CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
    +      )
    +    }
       }
     
       test("read from json files") {
    -    val src = Utils.createTempDir(namePrefix = "streaming.src")
    -    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
    -
    -    val textSource = createFileStreamSource("json", src.getCanonicalPath, Some(valueSchema))
    -    val filtered = textSource.toDF().filter($"value" contains "keep")
    -
    -    testStream(filtered)(
    -      AddTextFileData(
    -        textSource,
    -        "{'value': 'drop1'}\n{'value': 'keep2'}\n{'value': 'keep3'}",
    -        src,
    -        tmp),
    -      CheckAnswer("keep2", "keep3"),
    -      StopStream,
    -      AddTextFileData(
    -        textSource,
    -        "{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}",
    -        src,
    -        tmp),
    -      StartStream,
    -      CheckAnswer("keep2", "keep3", "keep5", "keep6"),
    -      AddTextFileData(
    -        textSource,
    -        "{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}",
    -        src,
    -        tmp),
    -      CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
    -    )
    -
    -    Utils.deleteRecursively(src)
    -    Utils.deleteRecursively(tmp)
    +    withTempDirs { case (src, tmp) =>
    +      val fileStream = createFileStream("json", src.getCanonicalPath, Some(valueSchema))
    +      val filtered = fileStream.filter($"value" contains "keep")
    +
    +      testStream(filtered)(
    +        AddTextFileData(
    +          "{'value': 'drop1'}\n{'value': 'keep2'}\n{'value': 'keep3'}",
    +          src,
    +          tmp),
    +        CheckAnswer("keep2", "keep3"),
    +        StopStream,
    +        AddTextFileData(
    +          "{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}",
    +          src,
    +          tmp),
    +        StartStream,
    +        CheckAnswer("keep2", "keep3", "keep5", "keep6"),
    +        AddTextFileData(
    +          "{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}",
    +          src,
    +          tmp),
    +        CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
    +      )
    +    }
       }
     
       test("read from json files with inferring schema") {
    -    val src = Utils.createTempDir(namePrefix = "streaming.src")
    -    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
    -
    -    // Add a file so that we can infer its schema
    -    stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
    +    withTempDirs { case (src, tmp) =>
     
    -    val textSource = createFileStreamSource("json", src.getCanonicalPath)
    +      // Add a file so that we can infer its schema
    +      stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
     
    -    // FileStreamSource should infer the column "c"
    -    val filtered = textSource.toDF().filter($"c" contains "keep")
    +      val fileStream = createFileStream("json", src.getCanonicalPath)
    +      require(fileStream.schema === StructType(Seq(StructField("c", StringType))))
    --- End diff --
    
    nit: require -> assert


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213889929
  
    LGTM. Thanks, merging to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213653500
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/56762/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213189615
  
    **[Test build #56621 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56621/consoleFull)** for PR 12592 at commit [`ef65ad7`](https://github.com/apache/spark/commit/ef65ad768d6e6ca70b6339c43b57ee61576f77ec).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213643497
  
    **[Test build #56762 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56762/consoleFull)** for PR 12592 at commit [`532a69c`](https://github.com/apache/spark/commit/532a69ccd6938f3d3e4ddbd8ec44d8c2962afe80).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213202897
  
    @zsxwing 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12592#discussion_r60679327
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -369,6 +369,7 @@ class StreamExecution(
       def awaitOffset(source: Source, newOffset: Offset): Unit = {
         def notDone = {
           val localCommittedOffsets = committedOffsets
    +      logInfo(s"Current offset for $source = " + localCommittedOffsets.get(source))
    --- End diff --
    
    note to self: remove this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213200949
  
    **[Test build #56625 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56625/consoleFull)** for PR 12592 at commit [`48f1e6a`](https://github.com/apache/spark/commit/48f1e6a6cea7ac30de3c13e3e9dd4c9762fb83be).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/12592


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213201193
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12592#discussion_r60793400
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -375,10 +406,10 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQ
         val src = Utils.createTempDir(namePrefix = "streaming.src")
    --- End diff --
    
    nit: This test can use `withTempDirs`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12592#discussion_r60793559
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -19,72 +19,129 @@ package org.apache.spark.sql.streaming
     
     import java.io.File
     
    -import org.apache.spark.sql.{AnalysisException, StreamTest}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType}
     import org.apache.spark.sql.catalyst.util._
     import org.apache.spark.sql.execution.streaming._
     import org.apache.spark.sql.test.SharedSQLContext
    -import org.apache.spark.sql.types.{StringType, StructType}
    +import org.apache.spark.sql.{AnalysisException, DataFrame, StreamTest}
     import org.apache.spark.util.Utils
     
     class FileStreamSourceTest extends StreamTest with SharedSQLContext {
     
       import testImplicits._
     
    -  case class AddTextFileData(source: FileStreamSource, content: String, src: File, tmp: File)
    -    extends AddData {
    +  /**
    +   * A subclass [[AddData]] for adding data to files. This is meant to use the
    +   * [[FileStreamSource]] actually being used in the execution.
    +   */
    +  abstract class AddFileData extends AddData {
    +    override def addData(query: Option[StreamExecution]): (Source, Offset) = {
    +      require(
    +        query.nonEmpty,
    +        "Cannot add data when there is no query for finding the active file stream source")
    +
    +      val sources = query.get.logicalPlan.collect {
    +        case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] =>
    +          source.asInstanceOf[FileStreamSource]
    +      }
    +      if (sources.isEmpty) {
    +        throw new Exception(
    +          "Could not find file source in the StreamExecution logical plan to add data to")
    +      } else if (sources.size > 1) {
    +        throw new Exception(
    +          "Could not select the file source in the StreamExecution logical plan as there" +
    +            "are multiple file sources:\n\t" + sources.mkString("\n\t"))
    +      }
    +      val source = sources.head
    +      val newOffset = source.withBatchingLocked {
    +        addData(source)
    +        source.currentOffset + 1
    +      }
    +      logInfo(s"Added data to $source at offset $newOffset")
    +      (source, newOffset)
    +    }
    +
    +    protected def addData(source: FileStreamSource): Unit
    +  }
     
    -    override def addData(): Offset = {
    -      source.withBatchingLocked {
    -        val file = Utils.tempFileWith(new File(tmp, "text"))
    -        stringToFile(file, content).renameTo(new File(src, file.getName))
    -        source.currentOffset
    -      } + 1
    +  case class AddTextFileData(content: String, src: File, tmp: File)
    +    extends AddFileData {
    +
    +    override def addData(source: FileStreamSource): Unit = {
    +      val file = Utils.tempFileWith(new File(tmp, "text"))
    +      stringToFile(file, content).renameTo(new File(src, file.getName))
         }
       }
     
    -  case class AddParquetFileData(
    -      source: FileStreamSource,
    -      content: Seq[String],
    -      src: File,
    -      tmp: File) extends AddData {
    -
    -    override def addData(): Offset = {
    -      source.withBatchingLocked {
    -        val file = Utils.tempFileWith(new File(tmp, "parquet"))
    -        content.toDS().toDF().write.parquet(file.getCanonicalPath)
    -        file.renameTo(new File(src, file.getName))
    -        source.currentOffset
    -      } + 1
    +  case class AddParquetFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
    +    override def addData(source: FileStreamSource): Unit = {
    +      AddParquetFileData.writeToFile(data, src, tmp)
         }
       }
     
    -  /** Use `format` and `path` to create FileStreamSource via DataFrameReader */
    -  def createFileStreamSource(
    +  object AddParquetFileData {
    +    def apply(seq: Seq[String], src: File, tmp: File): AddParquetFileData = {
    +      AddParquetFileData(seq.toDS().toDF(), src, tmp)
    +    }
    +
    +    def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
    +      val file = Utils.tempFileWith(new File(tmp, "parquet"))
    +      df.write.parquet(file.getCanonicalPath)
    +      file.renameTo(new File(src, file.getName))
    +    }
    +  }
    +
    +  def createFileStream(
           format: String,
           path: String,
    -      schema: Option[StructType] = None): FileStreamSource = {
    -    val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
    +      schema: Option[StructType] = None): DataFrame = {
    +
         val reader =
           if (schema.isDefined) {
             sqlContext.read.format(format).schema(schema.get)
           } else {
             sqlContext.read.format(format)
           }
         reader.stream(path)
    -      .queryExecution.analyzed
    +  }
    +
    +  protected def getSourceFromFileStream(df: DataFrame): FileStreamSource = {
    +    val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
    +    Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
    --- End diff --
    
    nit: unused line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213190125
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/56621/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14833][SQL][STREAMING][TEST] Refactor S...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12592#issuecomment-213190121
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org