You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by wangxiaojing <gi...@git.apache.org> on 2014/10/11 10:58:01 UTC

[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

GitHub user wangxiaojing opened a pull request:

    https://github.com/apache/spark/pull/2765

    [spark-3586][streaming]Support nested directories in Spark Streaming

    For text files, the method streamingContext.textFileStream(dataDirectory). 
    The improvement of the streaming to Support subdirectories,spark streaming can  monitor the subdirectories dataDirectory and process any files created in that directory.
    eg:
    streamingContext.textFileStream(/test). 
    Look at the direction contents:
    /test/file1
    /test/file2
    /test/dr/file1
    if the directory "/test/dr/" have new file "file2" ,spark streaming can process  the file
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wangxiaojing/spark spark-3586

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2765.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2765
    
----
commit 98ead547f90520819b421b0f4436bfe7d8a3d4f4
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-10-11T08:22:31Z

    Support nested directories in Spark Streaming

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323986
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +120,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path:Path, fs:FileSystem):List[Path]={
    +    var pathList = List[Path]()
    +    pathList = path:: pathList
    +    var tmp =List[Path]()
    +    tmp=path::tmp
    +    for(i <- 0 until depth){
    +      tmp =getSubPathList(tmp,fs)
    +      pathList=tmp:::pathList
    +    }
    +    pathList.filter(path=>{
    +      val  modTime = fs.getFileStatus(path).getModificationTime
    +      logDebug(s"Mod time for $path is $modTime")
    +      if (modTime > ignoreTime) {
    +        logDebug(s"Mod time $modTime more than ignore time $ignoreTime")
    +        true
    +      }
    +      else false
    +    })
    +  }
    +
    +  def getSubPathList(path:List[Path],fs:FileSystem):List[Path]={
    --- End diff --
    
    Spaces after `:` and `,`, and around `=`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19004975
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -207,6 +220,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     
         def accept(path: Path): Boolean = {
           try {
    +        if (fs.getFileStatus(path).isDirectory()){
    +          return false
    +        }
             if (!filter(path)) {  // Reject file if it does not satisfy filter
               logDebug("Rejected by filter " + path)
               return false
    --- End diff --
    
    Because  the logic is not strict ,if not <pre>return false</pre>,it will continue to run , eventually return to true.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-68036643
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24770/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19324027
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
    @@ -141,6 +141,108 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
       }
     
    +  test("file input stream -depth = 1") {
    --- End diff --
    
    Space after `-`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r18745606
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -240,6 +260,31 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
           true
         }
       }
    +
    +  private[streaming]
    +  class SubPathFilter extends PathFilter {
    +
    +    def accept(path: Path): Boolean = {
    +      try {
    +        if(fs.getFileStatus(path).isDirectory()){
    --- End diff --
    
    Nit: spaces before `(` and `{`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25502765
  
    --- Diff: project/MimaExcludes.scala ---
    @@ -83,7 +83,16 @@ object MimaExcludes {
                 // SPARK-2757
                 ProblemFilters.exclude[IncompatibleResultTypeProblem](
                   "org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler." +
    -                "removeAndGetProcessor")
    +                "removeAndGetProcessor"),
    +            // SPARK-3586
    --- End diff --
    
    I think we should not have any MiMa failures here. The Java errors are correct and need to be avoided. I'd have to look again at why the Scala API is changed by this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25502448
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
    @@ -278,11 +294,14 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         }
       }
     
    -  def testFileStream(newFilesOnly: Boolean) {
    +  def testFileStream(newFilesOnly: Boolean, depth :Int = 1) {
    --- End diff --
    
    This should be `depth: Int = 1`. There is at least another instance above where `name : Type` is written; might as well standardize to `name: Type` everywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323936
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +120,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path:Path, fs:FileSystem):List[Path]={
    +    var pathList = List[Path]()
    +    pathList = path:: pathList
    +    var tmp =List[Path]()
    +    tmp=path::tmp
    +    for(i <- 0 until depth){
    +      tmp =getSubPathList(tmp,fs)
    --- End diff --
    
    Space after `=` and `,`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r22708454
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -327,6 +327,7 @@ class StreamingContext private[streaming] (
        * Files must be written to the monitored directory by "moving" them from another
        * location within the same file system. File names starting with . are ignored.
    --- End diff --
    
    Please update JavaStreamingContext similarly



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76363323
  
      [Test build #28062 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28062/consoleFull) for   PR 2765 at commit [`6620143`](https://github.com/apache/spark/commit/6620143e72f90a34421cc3dd4cf2cf1289b16557).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69531261
  
      [Test build #25398 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25398/consoleFull) for   PR 2765 at commit [`1764c34`](https://github.com/apache/spark/commit/1764c34d99b4c361626fef90120fb444c5c730bb).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69169225
  
    @tdas Thanks a lot . I fix them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-67936460
  
      [Test build #24735 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24735/consoleFull) for   PR 2765 at commit [`e488919`](https://github.com/apache/spark/commit/e488919eb3ffe3b4d6509995720f4e33c48c0762).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76676922
  
      [Test build #28161 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28161/consoleFull) for   PR 2765 at commit [`2068255`](https://github.com/apache/spark/commit/20682555dbd6728c534d5a5e634aa7ce9a37915c).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25066019
  
    --- Diff: docs/streaming-programming-guide.md ---
    @@ -659,6 +659,7 @@ methods for creating DStreams from files and Akka actors as input sources.
          + Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
     
     	For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores.
    +	If Spark Streaming monitor the directory in nested directories, there is an easier method `streamingContext.textFileStream(dataDirectory, depth)`.
    --- End diff --
    
    I don't think this is necessary here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323906
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -104,7 +105,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         logDebug("Trying to get new files for time " + currentTime)
         lastNewFileFindingTime = System.currentTimeMillis
         val filter = new CustomPathFilter(currentTime)
    -    val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
    +    val pathList = getPathList(directoryPath,fs)
    --- End diff --
    
    Space after `,`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-59485316
  
    @jerryshao @tdas  First,According to the depth to check all the directory ,then filter the directory if the modification time more then  the ignore time.Is this method optimal? thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-68998328
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25066204
  
    --- Diff: streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java ---
    @@ -1739,7 +1739,11 @@ public Integer call(String s) throws Exception {
       // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
       // InputStream functionality is deferred to the existing Scala tests.
       @Test
    -  public void testSocketTextStream() {
    +  public void testSocketTextStream(
    +
    +
    --- End diff --
    
    Undo this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69073812
  
    @wangxiaojing This PR has merge conflicts. Could you fix them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19324008
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -230,16 +266,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
             if (minNewFileModTime < 0 || modTime < minNewFileModTime) {
               minNewFileModTime = modTime
             }
    +        if(path.getName().startsWith("_")){
    --- End diff --
    
    Space after `if` and before `{`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-107278995
  
    @wangxiaojing could you update this PR? It conflicts with master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76514746
  
      [Test build #28114 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28114/consoleFull) for   PR 2765 at commit [`348657e`](https://github.com/apache/spark/commit/348657e2069c3732d2a43bbc6ddb873eec7a3a48).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-67277093
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24528/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76939929
  
    This is getting better. I am still concerned about the MiMa excludes. There should be no API changes, so, there should be no excludes. Those are still in this PR.
    
    The additions to the Scala API are not using optional arguments anymore. I think it's most natural to create an option depth argument on the most "complete" version of each API method. It is not necessary in Scala to overload a method to add an optional argument, and it is not necessary to add this argument to every version of the method.
    
    That is, I think I would expect to see one, maybe two versions of each Scala API method add a `depth: Int = 1` parameter, only.
    
    In Java, there are no optional params (even though we're writing in Scala) but I also believe it is not desirable to overload every single method. The one with the most args can take the overload.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69318962
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25316/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76671073
  
      [Test build #28161 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28161/consoleFull) for   PR 2765 at commit [`2068255`](https://github.com/apache/spark/commit/20682555dbd6728c534d5a5e634aa7ce9a37915c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69318958
  
      [Test build #25316 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25316/consoleFull) for   PR 2765 at commit [`dc847c0`](https://github.com/apache/spark/commit/dc847c02e359bb879c49485478a04473482b4a7a).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-72158225
  
      [Test build #26383 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26383/consoleFull) for   PR 2765 at commit [`dbb6aa0`](https://github.com/apache/spark/commit/dbb6aa0a6d98be3b922298983ca9bd0c0a28781e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25502732
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---
    @@ -279,13 +289,14 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
        */
       def fileStream[K, V, F <: NewInputFormat[K, V]](
           directory: String,
    +      depth: Int = 1,
    --- End diff --
    
    This breaks the Java API since there is no longer a version without a depth param in Java right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-65637093
  
    @liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r18740452
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +119,18 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList( path:Path, fs:FileSystem):List[Path]={
    +    val filter = new SubPathFilter()
    +    var pathList = List[Path]()
    +    fs.listStatus(path,filter).map(x=>{
    +      if(x.isDirectory()){
    --- End diff --
    
    Doesn't this only list immediate subdirectories?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69288062
  
    @tdas rebase the latest master and update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-65619652
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24137/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-65619648
  
      [Test build #24137 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24137/consoleFull) for   PR 2765 at commit [`cba8a2e`](https://github.com/apache/spark/commit/cba8a2e6cf11741867561ce1c0d7d2eda66033c6).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323941
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +120,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path:Path, fs:FileSystem):List[Path]={
    +    var pathList = List[Path]()
    +    pathList = path:: pathList
    +    var tmp =List[Path]()
    +    tmp=path::tmp
    +    for(i <- 0 until depth){
    +      tmp =getSubPathList(tmp,fs)
    +      pathList=tmp:::pathList
    --- End diff --
    
    Spaces around `=` and `:::`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25502749
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---
    @@ -309,12 +321,13 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
           vClass: Class[V],
           fClass: Class[F],
           filter: JFunction[Path, JBoolean],
    -      newFilesOnly: Boolean): JavaPairInputDStream[K, V] = {
    +      newFilesOnly: Boolean,
    +      depth: Int = 1): JavaPairInputDStream[K, V] = {
    --- End diff --
    
    Same issue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323893
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -351,8 +351,8 @@ class StreamingContext private[streaming] (
         K: ClassTag,
         V: ClassTag,
         F <: NewInputFormat[K, V]: ClassTag
    -  ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = {
    -    new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
    +  ] (directory: String,filter: Path => Boolean, newFilesOnly: Boolean, depth : Int =0): InputDStream[(K, V)] = {
    --- End diff --
    
    - Remove space before `:`
    - Add space after `=`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-70437483
  
    @tdas 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323975
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +120,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path:Path, fs:FileSystem):List[Path]={
    +    var pathList = List[Path]()
    +    pathList = path:: pathList
    +    var tmp =List[Path]()
    +    tmp=path::tmp
    +    for(i <- 0 until depth){
    +      tmp =getSubPathList(tmp,fs)
    +      pathList=tmp:::pathList
    +    }
    +    pathList.filter(path=>{
    +      val  modTime = fs.getFileStatus(path).getModificationTime
    +      logDebug(s"Mod time for $path is $modTime")
    +      if (modTime > ignoreTime) {
    +        logDebug(s"Mod time $modTime more than ignore time $ignoreTime")
    +        true
    +      }
    +      else false
    --- End diff --
    
    Please reformat this `if` according to the coding convention, like this:
    
    ```scala
    if (...) {
      ...
    } else {
      ...
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-58890751
  
    Hi @wangxiaojing ,a small suggestion, why not making this improvement more flexible by adding a parameter to control the searching depth of directories, this will be more general than the current 1-depth searching implementation. Like:
    
    ```scala
    class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
        @transient ssc_ : StreamingContext,
        directory: String,
        filter: Path => Boolean = FileInputDStream.defaultFilter,
        depth: Int = 1,
        newFilesOnly: Boolean = true)
    ```
    People can use this parameter to control the searching depth, default 1 keeps the same semantics as current code.
    
    Besides some while space related code styles should be changed to align with Scala style.
    
     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76891500
  
    @srowen Can we test this again please ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19327246
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +122,41 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path: Path, fs: FileSystem): List[Path] = {
    +    var pathList = List[Path]()
    +    pathList = path :: pathList
    +    var tmp = List[Path]()
    +    tmp = path :: tmp
    +    for (i <- 0 until depth) {
    +      tmp = getSubPathList(tmp, fs)
    +      pathList = tmp ::: pathList
    +    }
    +    pathList.filter {
    +      path =>
    +        val modTime = fs.getFileStatus(path).getModificationTime
    --- End diff --
    
    Calling `getFileStatus` for each path here can be really slow for S3 files. It often takes over 100ms or even hundreds of ms to accomplish 1 call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76514804
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28115/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r18740436
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -240,6 +260,31 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
           true
         }
       }
    +
    +  private[streaming]
    +  class SubPathFilter extends PathFilter {
    +
    +    def accept(path: Path): Boolean = {
    +      try {
    +        if(fs.getFileStatus(path).isDirectory()){
    +          val modTime = getFileModTime(path)
    +          logDebug("Mod time for " + path + " is " + modTime)
    +          if (modTime > ignoreTime) {
    +            // Reject file if it was created before the ignore time (or, before last interval)
    +            logDebug("Mod time " + modTime + " less than ignore time " + ignoreTime)
    +            return false
    +          }
    +          return true
    +        }
    +      } catch {
    +        case fnfe: java.io.FileNotFoundException =>
    --- End diff --
    
    Why not import this, and what about more general `IOException`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r31427177
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -183,8 +189,46 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
           val filter = new PathFilter {
             def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
           }
    -      val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
    -      val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
    +      val directoryDepth = fs.getFileStatus(directoryPath).getPath.depth()
    +
    +      // Nested directories to find new files.
    +      def dfs(status: FileStatus): List[FileStatus] = {
    +        val path = status.getPath
    +        val depthFilter = depth + directoryDepth - path.depth()
    +        if (status.isDir) {
    +          if (depthFilter - 1 >= 0) {
    +            if (lastFoundDirs.contains(path)) {
    +              if (status.getModificationTime > modTimeIgnoreThreshold) {
    +                fs.listStatus(path).toList.flatMap(dfs(_))
    +              } else Nil
    +            } else {
    +              lastFoundDirs += path
    +              fs.listStatus(path).toList.flatMap(dfs(_))
    +            }
    +          } else Nil
    +        } else {
    +          if (filter.accept(path)) status :: Nil else Nil
    +        }
    +      }
    +
    +      val path = if (lastFoundDirs.isEmpty) Seq(fs.getFileStatus(directoryPath))
    +      else {
    +        lastFoundDirs.filter { path =>
    +          // If the mod time of directory is more than ignore time, no new files in this directory.
    +          try {
    +            val status = fs.getFileStatus(path)
    +            status != null && status.getModificationTime > modTimeIgnoreThreshold
    +          } catch {
    +            // If the directory don't find, remove the directory from `lastFoundDirs`
    +            case e: FileNotFoundException =>
    +              lastFoundDirs.remove(path)
    +              false
    +          }
    +        }
    +      }.flatMap(fs.listStatus(_)).toSeq
    +
    +      val newFiles = path.flatMap(dfs(_)).map(_.getPath.toString).toArray
    +      val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
    --- End diff --
    
    Could you change `System.currentTimeMillis` to `clock.getTimeMillis()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69071318
  
    Jenkins this is OK to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-61441563
  
    Rebased with master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25502255
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -173,8 +180,48 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
           val filter = new PathFilter {
             def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
           }
    -      val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
    -      val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
    +      val directoryDepth = fs.getFileStatus(directoryPath).getPath.depth()
    +
    +      // Nested directories to find new files.
    +      def dfs(status: FileStatus): List[FileStatus] = {
    +        val path = status.getPath
    +        val depthFilter = depth + directoryDepth - path.depth()
    +        if (status.isDir) {
    +          if (depthFilter - 1 >= 0) {
    +            if (lastFoundDirs.contains(path)) {
    +              if (status.getModificationTime > modTimeIgnoreThreshold) {
    +                fs.listStatus(path).toList.flatMap(dfs(_))
    +              } else Nil
    +            } else {
    +              lastFoundDirs += path
    +              fs.listStatus(path).toList.flatMap(dfs(_))
    +            }
    +          } else Nil
    +        } else {
    +          if (filter.accept(path)) status :: Nil else Nil
    +        }
    +      }
    +
    +      val path = if (lastFoundDirs.isEmpty) Seq(fs.getFileStatus(directoryPath))
    +      else {
    +        lastFoundDirs.filter { path =>
    +          // If the mod time of directory is more than ignore time, no new files in this directory.
    +          try {
    +            val status = fs.getFileStatus(path)
    +            if (status != null && status.getModificationTime > modTimeIgnoreThreshold) true
    --- End diff --
    
    You can get rid of the entire `if` since you're saying `if (x) true else false`. Just write `status != null && status.getModificationTime > modTimeIgnoreThreshold`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19327377
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -33,10 +33,13 @@ private[streaming]
     class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
         @transient ssc_ : StreamingContext,
         directory: String,
    +    depth: Int = 0,
         filter: Path => Boolean = FileInputDStream.defaultFilter,
         newFilesOnly: Boolean = true)
       extends InputDStream[(K, V)](ssc_) {
     
    +  require(depth >= 0 ,"nested directories depth must >= 0")
    --- End diff --
    
    Space after `,` rather than before...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323992
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +120,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path:Path, fs:FileSystem):List[Path]={
    +    var pathList = List[Path]()
    +    pathList = path:: pathList
    +    var tmp =List[Path]()
    +    tmp=path::tmp
    +    for(i <- 0 until depth){
    +      tmp =getSubPathList(tmp,fs)
    +      pathList=tmp:::pathList
    +    }
    +    pathList.filter(path=>{
    +      val  modTime = fs.getFileStatus(path).getModificationTime
    +      logDebug(s"Mod time for $path is $modTime")
    +      if (modTime > ignoreTime) {
    +        logDebug(s"Mod time $modTime more than ignore time $ignoreTime")
    +        true
    +      }
    +      else false
    +    })
    +  }
    +
    +  def getSubPathList(path:List[Path],fs:FileSystem):List[Path]={
    +    val filter = new SubPathFilter()
    +    var pathList = List[Path]()
    +    path.map(subPath=>{
    --- End diff --
    
    Similar to [this one](https://github.com/apache/spark/pull/2765/files#r19323955).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19327572
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +122,41 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path: Path, fs: FileSystem): List[Path] = {
    +    var pathList = List[Path]()
    +    pathList = path :: pathList
    +    var tmp = List[Path]()
    +    tmp = path :: tmp
    +    for (i <- 0 until depth) {
    +      tmp = getSubPathList(tmp, fs)
    +      pathList = tmp ::: pathList
    +    }
    +    pathList.filter {
    +      path =>
    +        val modTime = fs.getFileStatus(path).getModificationTime
    +        logDebug(s"Mod time for $path is $modTime")
    +        if (modTime > ignoreTime) {
    +          logDebug(s"Mod time $modTime more than ignore time $ignoreTime")
    +          true
    +        } else {
    +          false
    +        }
    +    }
    +  }
    +
    +  def getSubPathList(path: List[Path], fs: FileSystem): List[Path] = {
    +    val filter = new SubPathFilter()
    +    var pathList = List[Path]()
    +    path.map {
    +      subPath =>
    +        fs.listStatus(subPath, filter).map {
    +          x =>
    +            pathList = x.getPath() :: pathList
    +        }
    +    }
    +    pathList
    +  }
    --- End diff --
    
    Oh, one more thing, please limit the visibility to `private`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323922
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +120,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path:Path, fs:FileSystem):List[Path]={
    +    var pathList = List[Path]()
    +    pathList = path:: pathList
    +    var tmp =List[Path]()
    --- End diff --
    
    Space after `=`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19327451
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
    @@ -141,6 +141,108 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
       }
     
    +  test("file input stream - depth = 1") {
    +    // Disable manual clock as FileInputDStream does not work with manual clock
    +    conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
    +
    +    // Set up the streaming context and input streams
    +    val testDir = Utils.createTempDir()
    +    val subDir = Utils.createTempDir(testDir.toString)
    +    val ssc = new StreamingContext(conf, batchDuration)
    +    val fileStream = ssc.textFileStream(testDir.toString,1)
    --- End diff --
    
    Space after `,`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-58743237
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-96770167
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69311769
  
    Jenkins, this is ok to test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-67936464
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24735/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-75228839
  
    @wangxiaojing I'd like to revive this PR and get it committed. There have been a number of requests for this functionality, and several JIRAs and PRs about it. Would you be willing to start by rebasing? It might even be easier if you squash first on your end. I will leave some more comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-77388969
  
    I don't think an additional switch helps; a depth parameter is the right way to specify the depth. I also don't think it's a configuration parameter as it is specific to the method call.
    
    What I'm suggesting is to simply not have so many overloads. A method like `fileStream` need only add the depth parameter to one of its overloads, not all. It can go on the overload with the most arguments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76514803
  
      [Test build #28115 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28115/consoleFull) for   PR 2765 at commit [`beaed4c`](https://github.com/apache/spark/commit/beaed4c901bca8fe91361901e5ba0cb30b8a94b5).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25066040
  
    --- Diff: python/pyspark/streaming/context.py ---
    @@ -242,14 +242,14 @@ def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_
             return DStream(self._jssc.socketTextStream(hostname, port, jlevel), self,
                            UTF8Deserializer())
     
    -    def textFileStream(self, directory):
    +    def textFileStream(self, directory, depth):
    --- End diff --
    
    We need to keep the existing method without the depth param right? or else the API changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r18740883
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -207,6 +220,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     
         def accept(path: Path): Boolean = {
           try {
    +        if (fs.getFileStatus(path).isDirectory()){
    +          return false
    +        }
             if (!filter(path)) {  // Reject file if it does not satisfy filter
               logDebug("Rejected by filter " + path)
               return false
    --- End diff --
    
    I mean you can write `false`, not `return false`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r18740430
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -207,6 +220,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     
         def accept(path: Path): Boolean = {
           try {
    +        if (fs.getFileStatus(path).isDirectory()){
    +          return false
    +        }
             if (!filter(path)) {  // Reject file if it does not satisfy filter
               logDebug("Rejected by filter " + path)
               return false
    --- End diff --
    
    You don't need `return` anywhere


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323995
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +120,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path:Path, fs:FileSystem):List[Path]={
    +    var pathList = List[Path]()
    +    pathList = path:: pathList
    +    var tmp =List[Path]()
    +    tmp=path::tmp
    +    for(i <- 0 until depth){
    +      tmp =getSubPathList(tmp,fs)
    +      pathList=tmp:::pathList
    +    }
    +    pathList.filter(path=>{
    +      val  modTime = fs.getFileStatus(path).getModificationTime
    +      logDebug(s"Mod time for $path is $modTime")
    +      if (modTime > ignoreTime) {
    +        logDebug(s"Mod time $modTime more than ignore time $ignoreTime")
    +        true
    +      }
    +      else false
    +    })
    +  }
    +
    +  def getSubPathList(path:List[Path],fs:FileSystem):List[Path]={
    +    val filter = new SubPathFilter()
    +    var pathList = List[Path]()
    +    path.map(subPath=>{
    +     fs.listStatus(subPath,filter).map(x=>{
    --- End diff --
    
    Similar to [this one](https://github.com/apache/spark/pull/2765/files#r19323955). And space after `,`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-60341312
  
    @liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69563246
  
      [Test build #25411 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25411/consoleFull) for   PR 2765 at commit [`4c0e261`](https://github.com/apache/spark/commit/4c0e261485139e3c24ff6a942bb646d3766f8d9d).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-65367093
  
    Please rebase first :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19327073
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -230,16 +272,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
             if (minNewFileModTime < 0 || modTime < minNewFileModTime) {
               minNewFileModTime = modTime
             }
    +        if (path.getName().startsWith("_")) {
    --- End diff --
    
    Should we also ignore dot files here? (I'm not sure about the exact ignore pattern list.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-72163009
  
      [Test build #26383 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26383/consoleFull) for   PR 2765 at commit [`dbb6aa0`](https://github.com/apache/spark/commit/dbb6aa0a6d98be3b922298983ca9bd0c0a28781e).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25065970
  
    --- Diff: docs/streaming-programming-guide.md ---
    @@ -641,17 +641,17 @@ methods for creating DStreams from files and Akka actors as input sources.
     
         <div class="codetabs">
         <div data-lang="scala" markdown="1">
    -        streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
    +        streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory, depth)
    --- End diff --
    
    I think all of the examples should continue to show this without the `depth` parameter, since it's optional and for particular use cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
GitHub user wangxiaojing reopened a pull request:

    https://github.com/apache/spark/pull/2765

    [SPARK-3586][streaming]Support nested directories in Spark Streaming

    For text files, the method streamingContext.textFileStream(dataDirectory). 
    The improvement of the streaming to Support subdirectories,spark streaming can  monitor the subdirectories dataDirectory and process any files created in that directory.
    eg:
    streamingContext.textFileStream(/test). 
    Look at the direction contents:
    /test/file1
    /test/file2
    /test/dr/file1
    if the directory "/test/dr/" have new file "file2" ,spark streaming can process  the file
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wangxiaojing/spark spark-3586

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2765.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2765
    
----
commit 843f905cdf4ecb45aaa2edb9b34dc213e59e65c0
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-10-11T08:22:31Z

    Support nested directories in Spark Streaming

commit 6d30f6373fe06176043364c6bf4f7da81a37cf01
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-10-12T05:27:22Z

    change Nit

commit f00f2822dfbf21501458dd6f59e24eb4e7aac9c9
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-10-17T03:46:01Z

    support depth

commit 703754517d7077b891346b83e821c081215621db
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-10-17T06:22:12Z

    Change space before brace

commit 3d9bb2a7ef7ebea3f12866381af4201f4ddc7d60
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-10-17T07:24:38Z

    change process any files created in nested directories

commit 27dd88425b91471fcf9202364ddd9abb970e8223
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-10-24T07:12:17Z

    reformat code

commit 70d1b1fba5bb09636f4f3655771a98287c73b9ee
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-10-24T07:54:09Z

    add a require(depth >= 0)

commit 03489f28f4d8cae05564b41c98a839cf88bfba2f
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-10-24T08:54:03Z

    reformat code

commit 113c6d4e61e8c7e3fe4dfa4ed65cfe228575508f
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-10-28T02:52:01Z

    change performance

commit 2cc32fa187bc371f033da5bb2b67bbc7694964ed
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-10-28T08:48:37Z

    change filter name

commit 0ea8eda9831fcb3796720b0bfe95e51c8f1c3ab0
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-11-03T09:55:46Z

    change line exceeds 100 columns

commit 997ae5151d13a56ce0f97078ba4dacf454d43edd
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-11-03T10:09:15Z

    no braces for case clauses

commit 2bb9e9a148747c68d780ccbeaa8206e3b55cecfa
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-11-10T03:46:00Z

    Performance optimization:directory records have judgment

commit 8bc22af117ca73f10b7f642ccc566493ba6c4a1a
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-11-10T05:47:09Z

    line over 100

commit 15c389371d645a7fa6f13f3909034fb57ea7360c
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-12-04T09:27:12Z

    remove line

commit 21f0d82153f2f7d8967256251fe1ef02c84ffa71
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-12-04T10:21:19Z

    style

commit e488919eb3ffe3b4d6509995720f4e33c48c0762
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-12-17T04:49:36Z

    change get depth

commit ce86bcc5be8a790245787f75dfd2cba51ab50f55
Author: wangxiaojing <u9...@gmail.com>
Date:   2014-12-24T06:07:43Z

    Use 'isDir' to  modify the compatibility

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69693994
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25454/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69689987
  
      [Test build #25454 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25454/consoleFull) for   PR 2765 at commit [`dad1748`](https://github.com/apache/spark/commit/dad174886aa65d997e0410f1f6a002c386969143).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25502824
  
    --- Diff: python/pyspark/streaming/tests.py ---
    @@ -446,7 +446,7 @@ def test_queue_stream(self):
         def test_text_file_stream(self):
             d = tempfile.mkdtemp()
             self.ssc = StreamingContext(self.sc, self.duration)
    -        dstream2 = self.ssc.textFileStream(d).map(int)
    +        dstream2 = self.ssc.textFileStream(d, 1).map(int)
    --- End diff --
    
    I'd omit ", 1" since that is the 'default'. This test actually does not change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-59462998
  
    Hi @jerryshao,It's changing the code to use this parameter to control the searching depth,but if the depth is greater than 1,the ignore time  is not reasonable,because if the secondary subdirectories has a new file,the modification time of the first subdirectories is not change.like:
    The streaming monitor the directory /tmp/
    The directory structure is :
     2014-10-16 19:17 /tmp/spark1
     2014-10-16 19:17 /tmp/spark1/spark2
    
    A files created in /tmp/spark1/spark2 
    
     2014-10-16 19:17 /tmp/spark1
     2014-10-16 19:18 /tmp/spark1/spark2
     2014-10-16 19:18 /tmp/spark1/spark2/file
    
    If you use the ignore time to do filtering,the first subdirectories is always ignore,Can you give me some advice?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76871502
  
    @srowen This PR  does not affect ·org.apache.spark.JavaAPISuite.aggregateByKey·. Please test this again, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r18740443
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -240,6 +260,31 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
           true
         }
       }
    +
    +  private[streaming]
    +  class SubPathFilter extends PathFilter {
    +
    +    def accept(path: Path): Boolean = {
    +      try {
    +        if(fs.getFileStatus(path).isDirectory()){
    +          val modTime = getFileModTime(path)
    +          logDebug("Mod time for " + path + " is " + modTime)
    --- End diff --
    
    Nit: you can use string interpolation to make it a little simpler


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19327366
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +122,41 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path: Path, fs: FileSystem): List[Path] = {
    +    var pathList = List[Path]()
    +    pathList = path :: pathList
    +    var tmp = List[Path]()
    +    tmp = path :: tmp
    +    for (i <- 0 until depth) {
    +      tmp = getSubPathList(tmp, fs)
    +      pathList = tmp ::: pathList
    +    }
    +    pathList.filter {
    +      path =>
    +        val modTime = fs.getFileStatus(path).getModificationTime
    +        logDebug(s"Mod time for $path is $modTime")
    +        if (modTime > ignoreTime) {
    +          logDebug(s"Mod time $modTime more than ignore time $ignoreTime")
    +          true
    +        } else {
    +          false
    +        }
    +    }
    +  }
    +
    +  def getSubPathList(path: List[Path], fs: FileSystem): List[Path] = {
    +    val filter = new SubPathFilter()
    +    var pathList = List[Path]()
    +    path.map {
    +      subPath =>
    +        fs.listStatus(subPath, filter).map {
    +          x =>
    +            pathList = x.getPath() :: pathList
    +        }
    +    }
    +    pathList
    +  }
    --- End diff --
    
    I'd suggest to refactor `getPathList` and `getSubPathList` into the following version, which also addresses the performance issue [mentioned above](https://github.com/apache/spark/pull/2765/files#r19327246) by replacing individual `getFileStatus` calls with batch style `listStatus` calls:
    
    ```scala
      def getPathList(path: Path, fs: FileSystem): List[Path] = {
        def dfs(status: FileStatus, currentDepth: Int): List[FileStatus] = status match {
          case _ if currentDepth < 0 => Nil
          case _ if !status.isDirectory => status :: Nil
          case _ => fs.listStatus(status.getPath).toList.flatMap(dfs(_, currentDepth - 1))
        }
    
        dfs(fs.getFileStatus(path), depth).filter { status =>
          val isNewlyModified = status.getModificationTime >= ignoreTime
          logDebug("Newly modified file detected, modification time is " + status.getModificationTime)
          isNewlyModified
        }.map(_.getPath)
      }
    ```
    
    Also, the `fs.getFileStatus(path).isDirectory()` call can be removed with this version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r18740429
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -230,6 +246,10 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
             if (minNewFileModTime < 0 || modTime < minNewFileModTime) {
               minNewFileModTime = modTime
             }
    +        if(path.getName().startsWith("_")){
    +          System.out.println("startsWith:" + path.getName())
    --- End diff --
    
    Remove this System.out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69311867
  
      [Test build #25316 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25316/consoleFull) for   PR 2765 at commit [`dc847c0`](https://github.com/apache/spark/commit/dc847c02e359bb879c49485478a04473482b4a7a).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r18740849
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -207,6 +220,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     
         def accept(path: Path): Boolean = {
           try {
    +        if (fs.getFileStatus(path).isDirectory()){
    +          return false
    +        }
             if (!filter(path)) {  // Reject file if it does not satisfy filter
               logDebug("Rejected by filter " + path)
               return false
    --- End diff --
    
    Why?if the file is directory ,the file should not consider.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r18745609
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -240,6 +260,31 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
           true
         }
       }
    +
    +  private[streaming]
    +  class SubPathFilter extends PathFilter {
    --- End diff --
    
    No need to wrap this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-72163013
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26383/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76514748
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28114/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-65637021
  
    @liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-68032263
  
      [Test build #24770 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24770/consoleFull) for   PR 2765 at commit [`ce86bcc`](https://github.com/apache/spark/commit/ce86bcc5be8a790245787f75dfd2cba51ab50f55).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-65380717
  
    Hm, just tried locally, as what GitHub indicates, this PR still can't be merged cleanly. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-65362730
  
    @liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323902
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -363,8 +363,8 @@ class StreamingContext private[streaming] (
        * file system. File names starting with . are ignored.
        * @param directory HDFS directory to monitor for new file
        */
    -  def textFileStream(directory: String): DStream[String] = {
    -    fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
    +  def textFileStream(directory: String,depth: Int =0): DStream[String] = {
    --- End diff --
    
    Space after `,` and `=`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323896
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -351,8 +351,8 @@ class StreamingContext private[streaming] (
         K: ClassTag,
         V: ClassTag,
         F <: NewInputFormat[K, V]: ClassTag
    -  ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = {
    -    new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
    +  ] (directory: String,filter: Path => Boolean, newFilesOnly: Boolean, depth : Int =0): InputDStream[(K, V)] = {
    +    new FileInputDStream[K, V, F](this, directory,depth, filter, newFilesOnly)
    --- End diff --
    
    Space before `depth`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323915
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +120,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path:Path, fs:FileSystem):List[Path]={
    --- End diff --
    
    Space after all `:`s.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323886
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -331,8 +331,8 @@ class StreamingContext private[streaming] (
         K: ClassTag,
         V: ClassTag,
         F <: NewInputFormat[K, V]: ClassTag
    -  ] (directory: String): InputDStream[(K, V)] = {
    -    new FileInputDStream[K, V, F](this, directory)
    +  ] (directory: String,depth :Int =0): InputDStream[(K, V)] = {
    +    new FileInputDStream[K, V, F](this, directory,depth)
    --- End diff --
    
    Space after `,`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-77104507
  
    @srowen There is two possible solution,no API change.
         1.Adding a switch Whether to  monitor files in subdirectories,Defaults to false will monitor the directory `dataDirectory` , If set to true,monitor files in subdirectories. 
       2.Adding a  configuration `streaming.monitor.directory.depth` to  control the searching depth of directories,Defaults to 1 will monitor the directory `dataDirectory` , If set to  greater than 1, monitor files in subdirectories.
        Can you give some advice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76376474
  
      [Test build #28062 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28062/consoleFull) for   PR 2765 at commit [`6620143`](https://github.com/apache/spark/commit/6620143e72f90a34421cc3dd4cf2cf1289b16557).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-67931020
  
      [Test build #24735 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24735/consoleFull) for   PR 2765 at commit [`e488919`](https://github.com/apache/spark/commit/e488919eb3ffe3b4d6509995720f4e33c48c0762).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25226058
  
    --- Diff: streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java ---
    @@ -1739,7 +1739,11 @@ public Integer call(String s) throws Exception {
       // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
       // InputStream functionality is deferred to the existing Scala tests.
       @Test
    -  public void testSocketTextStream() {
    +  public void testSocketTextStream(
    +
    +
    --- End diff --
    
    Yes, this is a good functionality to revive. Thanks Sean
    
    On Fri, Feb 20, 2015 at 4:17 AM, Sean Owen <no...@github.com> wrote:
    
    > In streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
    > <https://github.com/apache/spark/pull/2765#discussion_r25066204>:
    >
    > > @@ -1739,7 +1739,11 @@ public Integer call(String s) throws Exception {
    > >    // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
    > >    // InputStream functionality is deferred to the existing Scala tests.
    > >    @Test
    > > -  public void testSocketTextStream() {
    > > +  public void testSocketTextStream(
    > > +
    > > +
    >
    > Undo this change?
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/2765/files#r25066204>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25066177
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -34,8 +34,10 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
      * This class represents an input stream that monitors a Hadoop-compatible filesystem for new
      * files and creates a stream out of them. The way it works as follows.
      *
    - * At each batch interval, the file system is queried for files in the given directory and
    - * detected new files are selected for that batch. In this case "new" means files that
    + * At each batch interval, Use `depth` to find files in the directory recursively,
    --- End diff --
    
    This also needs rewording, it doesn't quite make sense. Maybe use similar text to what I suggested above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69693990
  
      [Test build #25454 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25454/consoleFull) for   PR 2765 at commit [`dad1748`](https://github.com/apache/spark/commit/dad174886aa65d997e0410f1f6a002c386969143).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25066028
  
    --- Diff: examples/src/main/python/streaming/hdfs_wordcount.py ---
    @@ -39,7 +39,7 @@
         sc = SparkContext(appName="PythonStreamingHDFSWordCount")
         ssc = StreamingContext(sc, 1)
     
    -    lines = ssc.textFileStream(sys.argv[1])
    +    lines = ssc.textFileStream(sys.argv[1], 1)
    --- End diff --
    
    Same, I'd leave this as-is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323885
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -331,8 +331,8 @@ class StreamingContext private[streaming] (
         K: ClassTag,
         V: ClassTag,
         F <: NewInputFormat[K, V]: ClassTag
    -  ] (directory: String): InputDStream[(K, V)] = {
    -    new FileInputDStream[K, V, F](this, directory)
    +  ] (directory: String,depth :Int =0): InputDStream[(K, V)] = {
    --- End diff --
    
    - Add space after `,`
    - Remove space before `:`
    - Add space after `:`
    - Add space after `=`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r31466854
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -223,6 +266,11 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
        */
       private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = {
         val pathStr = path.toString
    +    // Reject file if it start with _
    +    if (path.getName().startsWith("_")) {
    --- End diff --
    
    Its good to be consistent with the core, however, I am worried that if we
    add another filter, it will silently break existing workloads that may be
    using files that start with "_"
    
    On Mon, Jun 1, 2015 at 7:25 AM, Shixiong Zhu <no...@github.com>
    wrote:
    
    > In
    > streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
    > <https://github.com/apache/spark/pull/2765#discussion_r31429608>:
    >
    > > @@ -223,6 +266,11 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
    > >     */
    > >    private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = {
    > >      val pathStr = path.toString
    > > +    // Reject file if it start with _
    > > +    if (path.getName().startsWith("_")) {
    >
    > Could you add this filter to FileInputDStream.defaultFilter?
    >
    > @tdas <https://github.com/tdas> is it a bug that
    > FileInputDStream.defaultFilter only ignores files that start with .?
    > Hadoop uses the following path filter (FileInputFormat.hiddenFileFilter):
    >
    >     private static final PathFilter hiddenFileFilter = new PathFilter() {
    >         public boolean accept(Path p) {
    >             String name = p.getName();
    >             return !name.startsWith("_") && !name.startsWith(".");
    >         }
    >     };
    >
    > Since SparkContext.textFile uses TextInputFormat, it uses this filter and
    > ignores files that start with "-" or ".". Should Streaming be consistent
    > with Core?
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/2765/files#r31429608>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69556749
  
      [Test build #25411 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25411/consoleFull) for   PR 2765 at commit [`4c0e261`](https://github.com/apache/spark/commit/4c0e261485139e3c24ff6a942bb646d3766f8d9d).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323955
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +120,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path:Path, fs:FileSystem):List[Path]={
    +    var pathList = List[Path]()
    +    pathList = path:: pathList
    +    var tmp =List[Path]()
    +    tmp=path::tmp
    +    for(i <- 0 until depth){
    +      tmp =getSubPathList(tmp,fs)
    +      pathList=tmp:::pathList
    +    }
    +    pathList.filter(path=>{
    --- End diff --
    
    Spaces before `=>`, and remove the redundant braces, like this:
    
    ```scala
    pathList.filter { path =>
      ...
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19324022
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
    @@ -91,7 +91,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
       }
     
     
    -  test("file input stream") {
    +  test("file input stream -depth = 0  ") {
    --- End diff --
    
    Space after `=`, and remove the trailing spaces in the string.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r18740446
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -240,6 +260,31 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
           true
         }
       }
    +
    +  private[streaming]
    +  class SubPathFilter extends PathFilter {
    +
    +    def accept(path: Path): Boolean = {
    +      try {
    +        if(fs.getFileStatus(path).isDirectory()){
    +          val modTime = getFileModTime(path)
    +          logDebug("Mod time for " + path + " is " + modTime)
    +          if (modTime > ignoreTime) {
    +            // Reject file if it was created before the ignore time (or, before last interval)
    +            logDebug("Mod time " + modTime + " less than ignore time " + ignoreTime)
    --- End diff --
    
    Log message is inconsistent with conditional


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69563254
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25411/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-58743235
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69854635
  
    @tdas 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323904
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -363,8 +363,8 @@ class StreamingContext private[streaming] (
        * file system. File names starting with . are ignored.
        * @param directory HDFS directory to monitor for new file
        */
    -  def textFileStream(directory: String): DStream[String] = {
    -    fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
    +  def textFileStream(directory: String,depth: Int =0): DStream[String] = {
    +    fileStream[LongWritable, Text, TextInputFormat](directory,depth).map(_._2.toString)
    --- End diff --
    
    Space after `,`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing closed the pull request at:

    https://github.com/apache/spark/pull/2765


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r18740431
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -207,6 +220,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     
         def accept(path: Path): Boolean = {
           try {
    +        if (fs.getFileStatus(path).isDirectory()){
    --- End diff --
    
    Nit: space before brace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-68036640
  
      [Test build #24770 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24770/consoleFull) for   PR 2765 at commit [`ce86bcc`](https://github.com/apache/spark/commit/ce86bcc5be8a790245787f75dfd2cba51ab50f55).
     * This patch **fails MiMa tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25066003
  
    --- Diff: docs/streaming-programming-guide.md ---
    @@ -641,17 +641,17 @@ methods for creating DStreams from files and Akka actors as input sources.
     
         <div class="codetabs">
         <div data-lang="scala" markdown="1">
    -        streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
    +        streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory, depth)
         </div>
         <div data-lang="java" markdown="1">
    -		streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);
    +		streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory, depth);
         </div>
         <div data-lang="python" markdown="1">
    -		streamingContext.textFileStream(dataDirectory)
    +		streamingContext.textFileStream(dataDirectory, depth)
         </div>
         </div>
     
    -	Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that
    +	Spark Streaming will monitor the directory `dataDirectory`, the `depth` is default 1 and process any files created in that directory. If supported files written in nested directories, set the `depth` is greater than 1. Note that
    --- End diff --
    
    I think this needs to be rewritten, perhaps like:
    
    ```
    Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). It can also monitor files in subdirectories by setting the optional `depth` parameter to a value greater than 1.
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-59478360
  
    Can we just check the time of file, not directory to filter out some unqualified files, I'm not sure about this.
    
    cc @tdas , mind taking a look at this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r31429608
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -223,6 +266,11 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
        */
       private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = {
         val pathStr = path.toString
    +    // Reject file if it start with _
    +    if (path.getName().startsWith("_")) {
    --- End diff --
    
    Could you add this filter to `FileInputDStream.defaultFilter`? 
    
    @tdas is it a bug that `FileInputDStream.defaultFilter` only ignores files that start with `.`? Hadoop uses the following path filter (FileInputFormat.hiddenFileFilter):
    ```Java
        private static final PathFilter hiddenFileFilter = new PathFilter() {
            public boolean accept(Path p) {
                String name = p.getName();
                return !name.startsWith("_") && !name.startsWith(".");
            }
        };
    ```
    Since SparkContext.textFile uses `TextInputFormat`, it uses this filter and ignores files that start with "-" or ".". Should Streaming be consistent with Core?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19721494
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -104,7 +107,25 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         logDebug("Trying to get new files for time " + currentTime)
         lastNewFileFindingTime = System.currentTimeMillis
         val filter = new CustomPathFilter(currentTime)
    -    val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
    +
    +    def dfs(status: FileStatus, currentDepth: Int): List[FileStatus] = {
    +      val modTime = status.getModificationTime
    +      status match {
    +        case _ if currentDepth < 0 => Nil
    +        case _ if (!status.isDirectory) => {
    --- End diff --
    
    In general we don't use braces for `case` clauses.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19327514
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -230,16 +272,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
             if (minNewFileModTime < 0 || modTime < minNewFileModTime) {
               minNewFileModTime = modTime
             }
    +        if (path.getName().startsWith("_")) {
    --- End diff --
    
    If use <code>saveAsTextFile("tmp")</code> ,first create a <code>-</code> file prefix,when finished will have the files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76512835
  
      [Test build #28114 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28114/consoleFull) for   PR 2765 at commit [`348657e`](https://github.com/apache/spark/commit/348657e2069c3732d2a43bbc6ddb873eec7a3a48).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r22707922
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -335,8 +336,8 @@ class StreamingContext private[streaming] (
         K: ClassTag,
         V: ClassTag,
         F <: NewInputFormat[K, V]: ClassTag
    -  ] (directory: String): InputDStream[(K, V)] = {
    -    new FileInputDStream[K, V, F](this, directory)
    +  ] (directory: String, depth: Int = 1): InputDStream[(K, V)] = {
    --- End diff --
    
    Lets not add two new versions of `fileStream`, keep only the other version with all the parameters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-67277090
  
      [Test build #24528 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24528/consoleFull) for   PR 2765 at commit [`5b76b8e`](https://github.com/apache/spark/commit/5b76b8e723f56de9945176872507e1b926b51c4d).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76376480
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28062/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25502811
  
    --- Diff: python/pyspark/streaming/context.py ---
    @@ -260,6 +260,12 @@ def textFileStream(self, directory):
             """
             return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
    --- End diff --
    
    Should this just call the other method with arg 1 instead of duplicating the logic?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25684896
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---
    @@ -293,6 +302,34 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
        * for new files and reads them using the given key-value types and input format.
        * Files must be written to the monitored directory by "moving" them from another
        * location within the same file system. File names starting with . are ignored.
    +   * It can also monitor files in subdirectories by setting the optional `depth`
    +   * parameter to a value greater than 1.
    +   * @param directory HDFS directory to monitor for new file
    +   * @param depth Searching depth of HDFS directory
    +   * @param kClass class of key for reading HDFS file
    +   * @param vClass class of value for reading HDFS file
    +   * @param fClass class of input format for reading HDFS file
    +   * @tparam K Key type for reading HDFS file
    +   * @tparam V Value type for reading HDFS file
    +   * @tparam F Input format for reading HDFS file
    +   */
    +  def fileStream[K, V, F <: NewInputFormat[K, V]](
    +      directory: String,
    +      depth: Int,
    --- End diff --
    
    This looks out of order, but, I question the need for this overload anyway. Maybe @tdas has an opinion. I'd like to keep the scope of this change much more limited to adding a depth param to 1 version of each method (maybe 2 if it really made sense) in each language.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19327511
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
    @@ -141,6 +141,108 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
       }
     
    +  test("file input stream - depth = 1") {
    +    // Disable manual clock as FileInputDStream does not work with manual clock
    +    conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
    +
    +    // Set up the streaming context and input streams
    +    val testDir = Utils.createTempDir()
    +    val subDir = Utils.createTempDir(testDir.toString)
    +    val ssc = new StreamingContext(conf, batchDuration)
    +    val fileStream = ssc.textFileStream(testDir.toString,1)
    +    val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
    +    def output = outputBuffer.flatMap(x => x)
    +    val outputStream = new TestOutputStream(fileStream, outputBuffer)
    +    outputStream.register()
    +    ssc.start()
    +
    +    // Create files in the temporary directory so that Spark Streaming can read data from it
    +    val input = Seq(1, 2, 3, 4, 5)
    +    val expectedOutput = input.map(_.toString)
    +    Thread.sleep(1000)
    +    for (i <- 0 until input.size) {
    +      val file = new File(subDir, i.toString)
    +      Files.write(input(i) + "\n", file, Charset.forName("UTF-8"))
    +      logInfo("Created file " + file)
    +      Thread.sleep(batchDuration.milliseconds)
    +      Thread.sleep(1000)
    +    }
    +    val startTime = System.currentTimeMillis()
    +    Thread.sleep(1000)
    +    val timeTaken = System.currentTimeMillis() - startTime
    +    assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
    +    logInfo("Stopping context")
    +    ssc.stop()
    +
    +    // Verify whether data received by Spark Streaming was as expected
    +    logInfo("--------------------------------")
    +    logInfo("output, size = " + outputBuffer.size)
    +    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
    +    logInfo("expected output, size = " + expectedOutput.size)
    +    expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
    +    logInfo("--------------------------------")
    +
    +    // Verify whether all the elements received are as expected
    +    // (whether the elements were received one in each interval is not verified)
    +    assert(output.toList === expectedOutput.toList)
    +
    +    Utils.deleteRecursively(testDir)
    +
    +    // Enable manual clock back again for other tests
    +    conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
    +  }
    +
    +  test("file input stream  - depth = 2") {
    --- End diff --
    
    One redundant space before `-` :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323911
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -104,7 +105,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         logDebug("Trying to get new files for time " + currentTime)
         lastNewFileFindingTime = System.currentTimeMillis
         val filter = new CustomPathFilter(currentTime)
    -    val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
    +    val pathList = getPathList(directoryPath,fs)
    +    val newFiles = fs.listStatus(pathList.toArray,filter).map(_.getPath.toString)
    --- End diff --
    
    Space after `,`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76676933
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/28161/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-67277092
  
      [Test build #24528 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24528/consoleFull) for   PR 2765 at commit [`5b76b8e`](https://github.com/apache/spark/commit/5b76b8e723f56de9945176872507e1b926b51c4d).
     * This patch **fails RAT tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-76513278
  
      [Test build #28115 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/28115/consoleFull) for   PR 2765 at commit [`beaed4c`](https://github.com/apache/spark/commit/beaed4c901bca8fe91361901e5ba0cb30b8a94b5).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19324425
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -33,6 +33,7 @@ private[streaming]
     class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
         @transient ssc_ : StreamingContext,
         directory: String,
    +    depth: Int = 0,
    --- End diff --
    
    Would better to add a `require(depth >= 0)` in the constructor to reinforce the contract.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69533541
  
      [Test build #25398 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25398/consoleFull) for   PR 2765 at commit [`1764c34`](https://github.com/apache/spark/commit/1764c34d99b4c361626fef90120fb444c5c730bb).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25502300
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -173,8 +180,48 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
           val filter = new PathFilter {
             def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
           }
    -      val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
    -      val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
    +      val directoryDepth = fs.getFileStatus(directoryPath).getPath.depth()
    +
    +      // Nested directories to find new files.
    +      def dfs(status: FileStatus): List[FileStatus] = {
    +        val path = status.getPath
    +        val depthFilter = depth + directoryDepth - path.depth()
    +        if (status.isDir) {
    +          if (depthFilter - 1 >= 0) {
    +            if (lastFoundDirs.contains(path)) {
    +              if (status.getModificationTime > modTimeIgnoreThreshold) {
    +                fs.listStatus(path).toList.flatMap(dfs(_))
    +              } else Nil
    +            } else {
    +              lastFoundDirs += path
    +              fs.listStatus(path).toList.flatMap(dfs(_))
    +            }
    +          } else Nil
    +        } else {
    +          if (filter.accept(path)) status :: Nil else Nil
    +        }
    +      }
    +
    +      val path = if (lastFoundDirs.isEmpty) Seq(fs.getFileStatus(directoryPath))
    +      else {
    +        lastFoundDirs.filter { path =>
    +          // If the mod time of directory is more than ignore time, no new files in this directory.
    +          try {
    +            val status = fs.getFileStatus(path)
    +            if (status != null && status.getModificationTime > modTimeIgnoreThreshold) true
    +            else false
    +          }
    --- End diff --
    
    Tiny, but I think our style is to write `} catch {` and to put an `else` block on its own new line generally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323999
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +120,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path:Path, fs:FileSystem):List[Path]={
    +    var pathList = List[Path]()
    +    pathList = path:: pathList
    +    var tmp =List[Path]()
    +    tmp=path::tmp
    +    for(i <- 0 until depth){
    +      tmp =getSubPathList(tmp,fs)
    +      pathList=tmp:::pathList
    +    }
    +    pathList.filter(path=>{
    +      val  modTime = fs.getFileStatus(path).getModificationTime
    +      logDebug(s"Mod time for $path is $modTime")
    +      if (modTime > ignoreTime) {
    +        logDebug(s"Mod time $modTime more than ignore time $ignoreTime")
    +        true
    +      }
    +      else false
    +    })
    +  }
    +
    +  def getSubPathList(path:List[Path],fs:FileSystem):List[Path]={
    +    val filter = new SubPathFilter()
    +    var pathList = List[Path]()
    +    path.map(subPath=>{
    +     fs.listStatus(subPath,filter).map(x=>{
    +        pathList = x.getPath()::pathList
    --- End diff --
    
    Spaces round `::`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19721473
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -351,8 +351,8 @@ class StreamingContext private[streaming] (
         K: ClassTag,
         V: ClassTag,
         F <: NewInputFormat[K, V]: ClassTag
    -  ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = {
    -    new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
    +  ](directory: String, filter: Path => Boolean, newFilesOnly: Boolean, depth: Int = 1): InputDStream[(K, V)] = {
    --- End diff --
    
    This line exceeds 100 columns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-69533545
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25398/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r31428558
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -183,8 +189,46 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
           val filter = new PathFilter {
             def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
           }
    -      val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
    -      val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
    +      val directoryDepth = fs.getFileStatus(directoryPath).getPath.depth()
    +
    +      // Nested directories to find new files.
    +      def dfs(status: FileStatus): List[FileStatus] = {
    +        val path = status.getPath
    +        val depthFilter = depth + directoryDepth - path.depth()
    +        if (status.isDir) {
    --- End diff --
    
    Could you move `val depthFilter = depth + directoryDepth - path.depth()` into the `if` block? It can avoid calling `path.depth()` for all files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r18740834
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +119,18 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList( path:Path, fs:FileSystem):List[Path]={
    +    val filter = new SubPathFilter()
    +    var pathList = List[Path]()
    +    fs.listStatus(path,filter).map(x=>{
    +      if(x.isDirectory()){
    --- End diff --
    
    Yes,because this only support subdirectories,because nested all the directories,processing time is too long 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323932
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +120,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path:Path, fs:FileSystem):List[Path]={
    +    var pathList = List[Path]()
    +    pathList = path:: pathList
    +    var tmp =List[Path]()
    +    tmp=path::tmp
    +    for(i <- 0 until depth){
    --- End diff --
    
    Space before `{`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-96776750
  
    Do you mind closing this PR? It seems like it is not being worked on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

Posted by liancheng <gi...@git.apache.org>.
Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r19323929
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -118,6 +120,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
         (newFiles, filter.minNewFileModTime)
       }
     
    +  def getPathList(path:Path, fs:FileSystem):List[Path]={
    +    var pathList = List[Path]()
    +    pathList = path:: pathList
    +    var tmp =List[Path]()
    +    tmp=path::tmp
    --- End diff --
    
    Spaces around `=` and `::`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by wangxiaojing <gi...@git.apache.org>.
Github user wangxiaojing commented on the pull request:

    https://github.com/apache/spark/pull/2765#issuecomment-65376390
  
    @liancheng  rebase ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25066151
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---
    @@ -204,9 +204,10 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
        * monitored directory by "moving" them from another location within the same
        * file system. File names starting with . are ignored.
        * @param directory HDFS directory to monitor for new file
    +   * @param depth Searching depth of directory
        */
    -  def textFileStream(directory: String): JavaDStream[String] = {
    -    ssc.textFileStream(directory)
    +  def textFileStream(directory: String, depth: Int = 1): JavaDStream[String] = {
    --- End diff --
    
    This won't work for Java API compatibility, I think. I think we need two methods, one of which has a non-optional new depth param.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r22707861
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -355,8 +357,13 @@ class StreamingContext private[streaming] (
         K: ClassTag,
         V: ClassTag,
         F <: NewInputFormat[K, V]: ClassTag
    -  ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = {
    -    new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
    +  ] (
    +     directory: String,
    --- End diff --
    
    I think the indentation is wrong. See Scala code style guide.
    http://docs.scala-lang.org/style/declarations.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org