You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2016/04/22 20:44:16 UTC

[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/12616

    [SPARK-14837][SQL][STREAMING] Added support in file stream source for reading new files added to subdirs

    ## What changes were proposed in this pull request?
    Currently, file stream source can only find new files if they appear in the directory given to the source, but not if they appear in subdirs. This PR adds support for specifying the nesting depth that the system will scan for new files.
    
    ## How was this patch tested?
    
    Unit test that tests when new files are discovered

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-14837

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/12616.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #12616
    
----
commit a2cbc810bc80890c77fa416921858e3cee39271f
Author: Tathagata Das <ta...@gmail.com>
Date:   2016-04-22T18:30:48Z

    Added support for file depth in file stream source

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-213559435
  
    Why not just support globbing like we do in batch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r62737506
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -97,21 +102,30 @@ class FileStreamSource(
         assert(startId <= endId)
         val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
         logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
    -    logDebug(s"Streaming ${files.mkString(", ")}")
    -    dataFrameBuilder(files)
    +    logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
    +    val newOptions = options.filterKeys(_ != "path")
    --- End diff --
    
    I think our `options` is case preserving. So, when we compare with the `path`, we need to convert the string to its lowercase form.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-213550362
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/56719/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r61498403
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -98,20 +109,49 @@ class FileStreamSource(
         val files = metadataLog.get(Some(startId + 1), Some(endId)).map(_._2).flatten
         logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
         logDebug(s"Streaming ${files.mkString(", ")}")
    -    dataFrameBuilder(files)
    +
    +    val newOptions = options.filterKeys(_ != "path") + ("basePath" -> basePath.toUri.toString)
    +    val newDataSource =
    +      DataSource(
    +        sparkSession,
    +        paths = files,
    +        userSpecifiedSchema = Some(schema),
    +        className = fileFormatClassName,
    +        options = new CaseInsensitiveMap(newOptions))
    +    Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
       }
     
       private def fetchAllFiles(): Seq[String] = {
    -    val startTime = System.nanoTime()
    -    val files = fs.listStatus(new Path(path))
    -      .filterNot(_.getPath.getName.startsWith("_"))
    -      .map(_.getPath.toUri.toString)
    -    val endTime = System.nanoTime()
    -    logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
    -    files
    +    logInfo("Path to glob " + pathGlobPattern)
    +    val startTime = System.nanoTime
    +    val files = SparkHadoopUtil.get.globPath(pathGlobPattern)
    +    val endTime = System.nanoTime
    +    logInfo(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
    --- End diff --
    
    Make this logDebug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r62737973
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -33,12 +35,14 @@ import org.apache.spark.util.collection.OpenHashSet
      */
     class FileStreamSource(
         sparkSession: SparkSession,
    -    metadataPath: String,
         path: String,
    +    fileFormatClassName: String,
         override val schema: StructType,
    -    dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
    +    metadataPath: String,
    +    options: Map[String, String]) extends Source with Logging {
     
       private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
    +  private val qualifiedBasePath = fs.makeQualified(new Path(path))
    --- End diff --
    
    Maybe add a comment to mention that this `qualifiedBasePath` can contain glob patterns?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r62737571
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -97,21 +102,30 @@ class FileStreamSource(
         assert(startId <= endId)
         val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
         logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
    -    logDebug(s"Streaming ${files.mkString(", ")}")
    -    dataFrameBuilder(files)
    +    logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
    +    val newOptions = options.filterKeys(_ != "path")
    +    val newDataSource =
    +      DataSource(
    +        sparkSession,
    +        paths = files,
    +        userSpecifiedSchema = Some(schema),
    +        className = fileFormatClassName,
    +        options = new CaseInsensitiveMap(newOptions))
    --- End diff --
    
    do we still need to add `basePath` to this `newOptions`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r61498063
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---
    @@ -175,25 +175,11 @@ case class DataSource(
             s.createSource(sparkSession.wrapped, metadataPath, userSpecifiedSchema, className, options)
     
           case format: FileFormat =>
    -        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
    -        val path = caseInsensitiveOptions.getOrElse("path", {
    +        val path = new CaseInsensitiveMap(options).getOrElse("path", {
               throw new IllegalArgumentException("'path' is not specified")
             })
    -
    -        def dataFrameBuilder(files: Array[String]): DataFrame = {
    -          val newOptions = options.filterKeys(_ != "path") + ("basePath" -> path)
    -          val newDataSource =
    -            DataSource(
    -              sparkSession,
    -              paths = files,
    -              userSpecifiedSchema = Some(sourceInfo.schema),
    -              className = className,
    -              options = new CaseInsensitiveMap(newOptions))
    -          Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
    -        }
    -
             new FileStreamSource(
    -          sparkSession, metadataPath, path, sourceInfo.schema, dataFrameBuilder)
    --- End diff --
    
    All this code has been moved into the FileStreamSource. It does not make sense that all this file stream specific code should be in the Datasource file. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-215297170
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/57200/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r62740742
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -97,21 +102,30 @@ class FileStreamSource(
         assert(startId <= endId)
         val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
         logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
    -    logDebug(s"Streaming ${files.mkString(", ")}")
    -    dataFrameBuilder(files)
    +    logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
    +    val newOptions = options.filterKeys(_ != "path")
    --- End diff --
    
    Aah right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r62738019
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -97,21 +102,30 @@ class FileStreamSource(
         assert(startId <= endId)
         val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
         logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
    -    logDebug(s"Streaming ${files.mkString(", ")}")
    -    dataFrameBuilder(files)
    +    logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
    +    val newOptions = options.filterKeys(_ != "path")
    +    val newDataSource =
    +      DataSource(
    +        sparkSession,
    +        paths = files,
    +        userSpecifiedSchema = Some(schema),
    +        className = fileFormatClassName,
    +        options = new CaseInsensitiveMap(newOptions))
    +    Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
       }
     
       private def fetchAllFiles(): Seq[String] = {
    -    val startTime = System.nanoTime()
    -    val files = fs.listStatus(new Path(path))
    -      .filterNot(_.getPath.getName.startsWith("_"))
    -      .map(_.getPath.toUri.toString)
    -    val endTime = System.nanoTime()
    -    logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
    +    val startTime = System.nanoTime
    +    val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
    +    val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
    +    val files = catalog.allFiles().map(_.getPath.toUri.toString)
    +    val endTime = System.nanoTime
    +    logInfo(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
    --- End diff --
    
    Nice!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r61498520
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -439,6 +442,105 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
         }
       }
     
    +  test("read new files in nested directories with globbing") {
    +    withTempDirs { case (src, tmp) =>
    +
    +      val fileStream = createFileStream("text", s"${src.getCanonicalPath}/*/*")
    +      val filtered = fileStream.filter($"value" contains "keep")
    +      val srcSubDir = new File(src, "newSubDir")
    +      val srcSubSubDir = new File(srcSubDir, "newSubSubDir")
    +
    +      testStream(filtered)(
    +        // Create new dir/subdir and write to it, should read
    +        AddTextFileData("drop1\nkeep2", srcSubDir, tmp),
    +        CheckAnswer("keep2"),
    +
    +        // Append to dir/subdir, should read
    +        AddTextFileData("keep3", srcSubDir, tmp),
    +        CheckAnswer("keep2", "keep3"),
    +
    +        // Create new dir/subDir/subsubdir and write to it, should read
    +        AddTextFileData("keep4", srcSubSubDir, tmp),
    +        CheckAnswer("keep2", "keep3", "keep4"),
    +
    +        // Append to src dir, should not read as globbing src/*/* does not capture files in dir,
    +        // only captures files in dir/subdirs/
    +        AddTextFileData("keep5", src, tmp),
    +
    +        // Append to dir/subDir/subsubdir/, should not read
    +        AddTextFileData("keep6", srcSubSubDir, tmp),
    +        AddTextFileData("keep7", srcSubDir, tmp), // needed to make query detect new data
    +        CheckAnswer("keep2", "keep3", "keep4", "keep7")
    +      )
    +    }
    +  }
    +
    +  test("read new parquet files in partitioned directories with globbing") {
    +    withTempDirs { case (src, tmp) =>
    +      val partition1SubDir = new File(src, "partition=1")
    +      val partition2SubDir = new File(src, "partition=2")
    +
    +      val schema = new StructType().add("value", StringType).add("partition", IntegerType)
    +      val fileStream = createFileStream("parquet", s"${src.getCanonicalPath}/*/*", Some(schema))
    +      val filtered = fileStream.filter($"value" contains "keep")
    +
    +      testStream(filtered)(
    +        // Create new partition=1 sub dir and write to it, should read
    +        AddParquetFileData(Seq("drop1", "keep2"), partition1SubDir, tmp),
    +        CheckAnswer(("keep2", 1)),
    +
    +        // Append to same partition=1 sub dir, should read
    +        AddParquetFileData(Seq("keep3"), partition1SubDir, tmp),
    +        CheckAnswer(("keep2", 1), ("keep3", 1)),
    +
    +        // Create new partition sub dir and write to it, should read
    +        AddParquetFileData(Seq("keep4"), partition2SubDir, tmp),
    +        CheckAnswer(("keep2", 1), ("keep3", 1), ("keep4", 2)),
    +
    +        // Append to same partition=2 sub dir, should read
    +        AddParquetFileData(Seq("keep5"), partition2SubDir, tmp),
    +        CheckAnswer(("keep2", 1), ("keep3", 1), ("keep4", 2), ("keep5", 2))
    +      )
    +    }
    +  }
    +
    +  test("read new json files in multi-level partitioned dirs with complex globbing") {
    +    withTempDirs { case (src, tmp) =>
    +      val level1Dir = new File(src, "level1=xyz")
    +      val level2Dir = new File(level1Dir, "level2=00")
    +
    +      val schema =
    +        new StructType()
    +          .add("value", StringType)
    +          .add("level2", IntegerType)
    +
    +      // Stream from src/level1=xyz/level2=0*/* , which should not pick up data from column level1
    +      val fileStream = createFileStream(
    +        "json", s"${level1Dir.getCanonicalPath}/level2=0*/*", Some(schema))
    +
    +      val filtered = fileStream.filter($"value" contains "keep")
    +      val nullStr = null.asInstanceOf[String]
    +      testStream(filtered)(
    +        // Create new src/level1=xyz/level2=0/ and write to it, should read value for level2 data
    +        // but not level1 as it is not in scope of the provided glob pattern src/level1=xyz/*/*
    +        AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", level2Dir, tmp),
    +        CheckAnswer(("keep2", 0)),
    +
    +        AddTextFileData("{'value': 'keep3'}", level2Dir, tmp),
    +        CheckAnswer(("keep2", 0), ("keep3", 0)),
    +
    +        // Create new dir/subDir/subsubdir and write to it, should read
    --- End diff --
    
    Update comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-215281707
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-215281481
  
    **[Test build #57199 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/57199/consoleFull)** for PR 12616 at commit [`2404dc3`](https://github.com/apache/spark/commit/2404dc381c720b6403e3833641a03d8b7fe19e17).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218307642
  
    **[Test build #58285 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58285/consoleFull)** for PR 12616 at commit [`08fd2b4`](https://github.com/apache/spark/commit/08fd2b4c77f5cc3a12cc190d7dd686c6d78c4153).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218323375
  
    LGTM pending jenkins


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r62741347
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -33,12 +35,14 @@ import org.apache.spark.util.collection.OpenHashSet
      */
     class FileStreamSource(
         sparkSession: SparkSession,
    -    metadataPath: String,
         path: String,
    +    fileFormatClassName: String,
         override val schema: StructType,
    -    dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
    +    metadataPath: String,
    +    options: Map[String, String]) extends Source with Logging {
     
       private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
    +  private val qualifiedBasePath = fs.makeQualified(new Path(path))
    --- End diff --
    
    okay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r61498450
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -98,20 +109,49 @@ class FileStreamSource(
         val files = metadataLog.get(Some(startId + 1), Some(endId)).map(_._2).flatten
         logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
         logDebug(s"Streaming ${files.mkString(", ")}")
    -    dataFrameBuilder(files)
    +
    +    val newOptions = options.filterKeys(_ != "path") + ("basePath" -> basePath.toUri.toString)
    +    val newDataSource =
    +      DataSource(
    +        sparkSession,
    +        paths = files,
    +        userSpecifiedSchema = Some(schema),
    +        className = fileFormatClassName,
    +        options = new CaseInsensitiveMap(newOptions))
    +    Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
       }
     
       private def fetchAllFiles(): Seq[String] = {
    -    val startTime = System.nanoTime()
    -    val files = fs.listStatus(new Path(path))
    -      .filterNot(_.getPath.getName.startsWith("_"))
    -      .map(_.getPath.toUri.toString)
    -    val endTime = System.nanoTime()
    -    logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
    -    files
    +    logInfo("Path to glob " + pathGlobPattern)
    +    val startTime = System.nanoTime
    +    val files = SparkHadoopUtil.get.globPath(pathGlobPattern)
    +    val endTime = System.nanoTime
    +    logInfo(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
    +    files.map(_.toUri.toString)
    +  }
    +
    +  private def findBasePath(): Path = {
    --- End diff --
    
    Add docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218041367
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-213550360
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r61498117
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -55,10 +62,12 @@ class FileStreamSource(
        */
       private def fetchMaxOffset(): LongOffset = synchronized {
         val filesPresent = fetchAllFiles()
    +    logInfo("All files:\n\t" + filesPresent.mkString("\n\t"))
    --- End diff --
    
    Remove this line. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r61498250
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -55,10 +62,12 @@ class FileStreamSource(
        */
       private def fetchMaxOffset(): LongOffset = synchronized {
         val filesPresent = fetchAllFiles()
    +    logInfo("All files:\n\t" + filesPresent.mkString("\n\t"))
    +
         val newFiles = new ArrayBuffer[String]()
         filesPresent.foreach { file =>
           if (!seenFiles.contains(file)) {
    -        logDebug(s"new file: $file")
    +        logInfo(s"new file: $file")
    --- End diff --
    
    Back to log debug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r62739836
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -444,6 +445,79 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
         }
       }
     
    +  test("read new files in nested directories with globbing") {
    +    withTempDirs { case (dir, tmp) =>
    +
    +      // src/*/* should consider all the files and directories that matches that glob.
    +      // So any files that matches the glob as well as any files in directories that matches
    +      // this glob should be read.
    +      val fileStream = createFileStream("text", s"${dir.getCanonicalPath}/*/*")
    +      val filtered = fileStream.filter($"value" contains "keep")
    +      val subDir = new File(dir, "subdir")
    +      val subSubDir = new File(subDir, "subsubdir")
    +      val subSubSubDir = new File(subSubDir, "subsubsubdir")
    +
    +      require(!subDir.exists())
    +      require(!subSubDir.exists())
    +
    +      testStream(filtered)(
    +        // Create new dir/subdir and write to it, should read
    +        AddTextFileData("drop1\nkeep2", subDir, tmp),
    +        CheckAnswer("keep2"),
    +
    +        // Add files to dir/subdir, should read
    +        AddTextFileData("keep3", subDir, tmp),
    +        CheckAnswer("keep2", "keep3"),
    +
    +        // Create new dir/subdir/subsubdir and write to it, should read
    +        AddTextFileData("keep4", subSubDir, tmp),
    +        CheckAnswer("keep2", "keep3", "keep4"),
    +
    +        // Add files to dir/subdir/subsubdir, should read
    +        AddTextFileData("keep5", subSubDir, tmp),
    +        CheckAnswer("keep2", "keep3", "keep4", "keep5"),
    +
    +        // 1. Add file to src dir, should not read as globbing src/*/* does not capture files in
    +        //    dir, only captures files in dir/subdir/
    +        // 2. Add files to dir/subDir/subsubdir/subsubsubdir, should not read as src/*/* should
    +        //    not capture those files
    +        AddTextFileData("keep6", dir, tmp),
    +        AddTextFileData("keep7", subSubSubDir, tmp),
    +        AddTextFileData("keep8", subDir, tmp), // needed to make query detect new data
    +        CheckAnswer("keep2", "keep3", "keep4", "keep5", "keep8")
    +      )
    +    }
    +  }
    +
    +  test("read new files in partitioned table with globbing, should not read partition data") {
    +    withTempDirs { case (dir, tmp) =>
    +      val partitionFooSubDir = new File(dir, "partition=foo")
    +      val partitionBarSubDir = new File(dir, "partition=bar")
    +
    +      val schema = new StructType().add("value", StringType).add("partition", StringType)
    +      val fileStream = createFileStream("json", s"${dir.getCanonicalPath}/*/*", Some(schema))
    +      val filtered = fileStream.filter($"value" contains "keep")
    +      val nullStr = null.asInstanceOf[String]
    +      testStream(filtered)(
    +        // Create new partition=foo sub dir and write to it, should read only value, not partition
    +        AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp),
    +        CheckAnswer(("keep2", nullStr)),
    --- End diff --
    
    why do we have a `nullStr` at here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r62741272
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---
    @@ -444,6 +445,79 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
         }
       }
     
    +  test("read new files in nested directories with globbing") {
    +    withTempDirs { case (dir, tmp) =>
    +
    +      // src/*/* should consider all the files and directories that matches that glob.
    +      // So any files that matches the glob as well as any files in directories that matches
    +      // this glob should be read.
    +      val fileStream = createFileStream("text", s"${dir.getCanonicalPath}/*/*")
    +      val filtered = fileStream.filter($"value" contains "keep")
    +      val subDir = new File(dir, "subdir")
    +      val subSubDir = new File(subDir, "subsubdir")
    +      val subSubSubDir = new File(subSubDir, "subsubsubdir")
    +
    +      require(!subDir.exists())
    +      require(!subSubDir.exists())
    +
    +      testStream(filtered)(
    +        // Create new dir/subdir and write to it, should read
    +        AddTextFileData("drop1\nkeep2", subDir, tmp),
    +        CheckAnswer("keep2"),
    +
    +        // Add files to dir/subdir, should read
    +        AddTextFileData("keep3", subDir, tmp),
    +        CheckAnswer("keep2", "keep3"),
    +
    +        // Create new dir/subdir/subsubdir and write to it, should read
    +        AddTextFileData("keep4", subSubDir, tmp),
    +        CheckAnswer("keep2", "keep3", "keep4"),
    +
    +        // Add files to dir/subdir/subsubdir, should read
    +        AddTextFileData("keep5", subSubDir, tmp),
    +        CheckAnswer("keep2", "keep3", "keep4", "keep5"),
    +
    +        // 1. Add file to src dir, should not read as globbing src/*/* does not capture files in
    +        //    dir, only captures files in dir/subdir/
    +        // 2. Add files to dir/subDir/subsubdir/subsubsubdir, should not read as src/*/* should
    +        //    not capture those files
    +        AddTextFileData("keep6", dir, tmp),
    +        AddTextFileData("keep7", subSubSubDir, tmp),
    +        AddTextFileData("keep8", subDir, tmp), // needed to make query detect new data
    +        CheckAnswer("keep2", "keep3", "keep4", "keep5", "keep8")
    +      )
    +    }
    +  }
    +
    +  test("read new files in partitioned table with globbing, should not read partition data") {
    +    withTempDirs { case (dir, tmp) =>
    +      val partitionFooSubDir = new File(dir, "partition=foo")
    +      val partitionBarSubDir = new File(dir, "partition=bar")
    +
    +      val schema = new StructType().add("value", StringType).add("partition", StringType)
    +      val fileStream = createFileStream("json", s"${dir.getCanonicalPath}/*/*", Some(schema))
    +      val filtered = fileStream.filter($"value" contains "keep")
    +      val nullStr = null.asInstanceOf[String]
    +      testStream(filtered)(
    +        // Create new partition=foo sub dir and write to it, should read only value, not partition
    +        AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp),
    +        CheckAnswer(("keep2", nullStr)),
    --- End diff --
    
    I want to make sure in this test that the partition directories are never parsed, even if the user tries to extract it by specifying it in the user defined schema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218260826
  
    @yhuai @marmbrus 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-213561278
  
    Globbing? Can you explain that a bit. Code links? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218323829
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-213550352
  
    **[Test build #56719 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56719/consoleFull)** for PR 12616 at commit [`a2cbc81`](https://github.com/apache/spark/commit/a2cbc810bc80890c77fa416921858e3cee39271f).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r62741054
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -97,21 +102,30 @@ class FileStreamSource(
         assert(startId <= endId)
         val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
         logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
    -    logDebug(s"Streaming ${files.mkString(", ")}")
    -    dataFrameBuilder(files)
    +    logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
    +    val newOptions = options.filterKeys(_ != "path")
    +    val newDataSource =
    +      DataSource(
    +        sparkSession,
    +        paths = files,
    +        userSpecifiedSchema = Some(schema),
    +        className = fileFormatClassName,
    +        options = new CaseInsensitiveMap(newOptions))
    --- End diff --
    
    No we dont need to. Not adding base path, and giving only a list of files (no dirs in them) ensures that partitioned directories are not parsed. That is the intention, we are not supporting partitioned directories in the file stream yet.
    
    On that note, if the user exlicitly specifies base path in the options, I dont know what will happen. Maybe we should throw an error in that case. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218323831
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/58285/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218040367
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/58189/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-215576141
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/57275/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218041368
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/58191/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-213567878
  
    I.e. something like `/dir/*/*`.  We either do it in DataSource or HDFSFileCatalog.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-215282807
  
    **[Test build #57200 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/57200/consoleFull)** for PR 12616 at commit [`5918df9`](https://github.com/apache/spark/commit/5918df91276a304122e5be267f5f71b36fa084c3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by yhuai <gi...@git.apache.org>.
Github user yhuai commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218324187
  
    Thanks! Merging to master and branch 2.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-215576137
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-213549288
  
    @marmbrus @zsxwing 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/12616


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-215297169
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-215562885
  
    @marmbrus Please take a look once again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218040366
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218041227
  
    **[Test build #58191 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58191/consoleFull)** for PR 12616 at commit [`f977872`](https://github.com/apache/spark/commit/f9778729cd52a100035bd28e0b2348bc184482dd).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218030218
  
    **[Test build #58191 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58191/consoleFull)** for PR 12616 at commit [`f977872`](https://github.com/apache/spark/commit/f9778729cd52a100035bd28e0b2348bc184482dd).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-215553704
  
    **[Test build #57275 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/57275/consoleFull)** for PR 12616 at commit [`b011b5e`](https://github.com/apache/spark/commit/b011b5e5053849b866c7ec8f6db8163fdd382645).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-215281708
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/57199/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218323662
  
    **[Test build #58285 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58285/consoleFull)** for PR 12616 at commit [`08fd2b4`](https://github.com/apache/spark/commit/08fd2b4c77f5cc3a12cc190d7dd686c6d78c4153).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218029616
  
    **[Test build #58189 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58189/consoleFull)** for PR 12616 at commit [`b3a0748`](https://github.com/apache/spark/commit/b3a074827d5f8fbd01b4abe4ea3d1ea7d3ebe866).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-218040169
  
    **[Test build #58189 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58189/consoleFull)** for PR 12616 at commit [`b3a0748`](https://github.com/apache/spark/commit/b3a074827d5f8fbd01b4abe4ea3d1ea7d3ebe866).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-215575885
  
    **[Test build #57275 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/57275/consoleFull)** for PR 12616 at commit [`b011b5e`](https://github.com/apache/spark/commit/b011b5e5053849b866c7ec8f6db8163fdd382645).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-215296860
  
    **[Test build #57200 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/57200/consoleFull)** for PR 12616 at commit [`5918df9`](https://github.com/apache/spark/commit/5918df91276a304122e5be267f5f71b36fa084c3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by ScrapCodes <gi...@git.apache.org>.
Github user ScrapCodes commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-214622436
  
    This is also supported by hadoop. For example, we can pass the globs in sc.textFile("/dir/*/*"). I was wondering, if it will be implemented again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12616#discussion_r61498388
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ---
    @@ -98,20 +109,49 @@ class FileStreamSource(
         val files = metadataLog.get(Some(startId + 1), Some(endId)).map(_._2).flatten
         logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
         logDebug(s"Streaming ${files.mkString(", ")}")
    -    dataFrameBuilder(files)
    +
    +    val newOptions = options.filterKeys(_ != "path") + ("basePath" -> basePath.toUri.toString)
    +    val newDataSource =
    +      DataSource(
    +        sparkSession,
    +        paths = files,
    +        userSpecifiedSchema = Some(schema),
    +        className = fileFormatClassName,
    +        options = new CaseInsensitiveMap(newOptions))
    +    Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
       }
     
       private def fetchAllFiles(): Seq[String] = {
    -    val startTime = System.nanoTime()
    -    val files = fs.listStatus(new Path(path))
    -      .filterNot(_.getPath.getName.startsWith("_"))
    -      .map(_.getPath.toUri.toString)
    -    val endTime = System.nanoTime()
    -    logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
    -    files
    +    logInfo("Path to glob " + pathGlobPattern)
    --- End diff --
    
    Remove this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-215281704
  
    **[Test build #57199 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/57199/consoleFull)** for PR 12616 at commit [`2404dc3`](https://github.com/apache/spark/commit/2404dc381c720b6403e3833641a03d8b7fe19e17).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-14837][SQL][STREAMING] Added support in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/12616#issuecomment-213550024
  
    **[Test build #56719 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/56719/consoleFull)** for PR 12616 at commit [`a2cbc81`](https://github.com/apache/spark/commit/a2cbc810bc80890c77fa416921858e3cee39271f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org