You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by guoxiaolongzte <gi...@git.apache.org> on 2018/01/30 11:33:39 UTC

[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

GitHub user guoxiaolongzte opened a pull request:

    https://github.com/apache/spark/pull/20437

    [SPARK-23270][Streaming][WEB-UI]FileInputDStream Streaming UI 's records should not be set to the default value of 0, it should be the total number of rows of new files.

    ## What changes were proposed in this pull request?
    
    FileInputDStream Streaming UI 's records should not be set to the default value of 0, it should be the total number of rows of new files.
    ------------------------------------------in FileInputDStream.scala start------------------------------------
    val inputInfo = StreamInputInfo(id, 0, metadata) // set to the default value of 0
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
    case class StreamInputInfo(
    inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty)
    ------------------------------------in FileInputDStream.scala end--------------------------
     
    ------------------------------------------in DirectKafkaInputDStream.scala start------------------------------------
    val inputInfo = StreamInputInfo(id, rdd.count, metadata) //set to rdd count as numRecords
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
    case class StreamInputInfo(
    inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty)
    ------------------------------------in DirectKafkaInputDStream.scala end----------------------
     
    test method:
    ./bin/spark-submit --class org.apache.spark.examples.streaming.HdfsWordCount examples/jars/spark-examples_2.11-2.4.0-SNAPSHOT.jar /spark/tmp/
    
    fix after:
    ![1](https://user-images.githubusercontent.com/26266482/35564207-5c19c946-05f4-11e8-9367-a36dd321a111.png)
    
    
    ## How was this patch tested?
    
    manual tests
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/guoxiaolongzte/spark SPARK-23270

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20437.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20437
    
----
commit 41148c605ddf48c155fc03611bca03af9d4e25a3
Author: guoxiaolong <gu...@...>
Date:   2018-01-30T11:30:49Z

    [SPARK-23270][Streaming][WEB-UI]FileInputDStream Streaming UI 's records should not be set to the default value of 0, it should be the total number of rows of new files.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20437
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by guoxiaolongzte <gi...@git.apache.org>.
Github user guoxiaolongzte commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r164973156
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
    --- End diff --
    
    Because of this little overhead, that 'Records' is not recorded? This is a obvious bug.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream Streami...

Posted by guoxiaolongzte <gi...@git.apache.org>.
Github user guoxiaolongzte commented on the issue:

    https://github.com/apache/spark/pull/20437
  
    thanks, Thank you for your review.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r165252724
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
    --- End diff --
    
    I don't think it's a good idea. Actually I'm incline of leaving as it is.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by guoxiaolongzte <gi...@git.apache.org>.
Github user guoxiaolongzte commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r165247567
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
    --- End diff --
    
    Asynchronous processing, does not affect the backbone of the Streaming job, also can get the number of records.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by guoxiaolongzte <gi...@git.apache.org>.
Github user guoxiaolongzte commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r164996836
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
    --- End diff --
    
    I would like another from a thread, try hdfs api to count the number of documents, does not affect the main thread spark jobs, i will try. ok? @jerryshao


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r165249764
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
    --- End diff --
    
    I'm not in favor of such changes. No matter the process is sync or async, because `reportInfo` is invoked here, so you have to wait for the process to end.
    
    Anyway I think reading twice is unacceptable for streaming scenario (even for batch scenario). I guess the previous code set to "0" by intention.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by guoxiaolongzte <gi...@git.apache.org>.
Github user guoxiaolongzte commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r164975752
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
    --- End diff --
    
    I see what you mean. I'll try to make it read once. Can you give me some idea?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20437
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20437
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by guoxiaolongzte <gi...@git.apache.org>.
Github user guoxiaolongzte commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r165251810
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
    --- End diff --
    
    If you can add a switch parameter, the default value is false.
    
    If it is true, then it needs to be count (read the file again) so that the records can be correctly counted. Of course, it shows that when the parameter is opened to true, the streaming performance problem will be affected.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/20437


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r164976661
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
    --- End diff --
    
    I'm not sure if there's a solution to fix it here. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by guoxiaolongzte <gi...@git.apache.org>.
Github user guoxiaolongzte commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r165253828
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
    --- End diff --
    
    I am very sad. I'm working on whether there's a better way.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r164973745
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
    --- End diff --
    
    This is not a small overhead. The changes will read/scan all the new files, this is a big overhead for streaming application (data is unnecessarily read twice).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r164997663
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
    --- End diff --
    
    What actually do you want, number of files or number of records?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r165240426
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
    --- End diff --
    
    What's the difference between using HDFS API to count in another thread compared to current solution? You still cannot avoid reading files twice.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r164769366
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    var numRecords = 0L
    --- End diff --
    
    I'm not sure if this change is correct, but, you should write it as `rdds.map(_.count).sum`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by guoxiaolongzte <gi...@git.apache.org>.
Github user guoxiaolongzte commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r165239860
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
    --- End diff --
    
    The number of rows in a file. Is this solution possible?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20437#discussion_r164968292
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -157,7 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
         val metadata = Map(
           "files" -> newFiles.toList,
           StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    -    val inputInfo = StreamInputInfo(id, 0, metadata)
    +    val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata)
    --- End diff --
    
    This will kick off a new Spark job to read files and count, which will bring in obvious overhead. Whereas `count` in `DirectKafkaInputDStream` only calculates offsets.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20437: [SPARK-23270][Streaming][WEB-UI]FileInputDStream Streami...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20437
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org