You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gearpump.apache.org by manuzhang <gi...@git.apache.org> on 2016/09/21 07:04:14 UTC

[GitHub] incubator-gearpump pull request #85: [GEARPUMP-23] add window dsl

GitHub user manuzhang opened a pull request:

    https://github.com/apache/incubator-gearpump/pull/85

    [GEARPUMP-23] add window dsl

    The PR is opened for early review and the work is in progress with following todos.
    
    - [ ] basic window dsl support with `WindowedWordCount` example
    - [ ] improve `ReduceFunction` to not emit immediate results
    - [ ] add unit tests
    - [ ] add comments and update documentation
    - [ ] support different types of computation (e.g. monoid which doesn't require input elements to be held in the window)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/manuzhang/incubator-gearpump window_dsl

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-gearpump/pull/85.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #85
    
----
commit a69554bcdb2620a16107b23d6cee8eb5308a6043
Author: manuzhang <ow...@gmail.com>
Date:   2016-09-08T03:22:18Z

    [GEARPUMP-23] add window dsl

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump pull request #85: [GEARPUMP-23] add window dsl

Posted by kkasravi <gi...@git.apache.org>.
Github user kkasravi commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/85#discussion_r80744615
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala ---
    @@ -115,20 +121,17 @@ class Stream[T](
        *
        * For example,
        * {{{
    -   * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
    +   * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
        * }}}
        *
    -   * @param fun  Group by function
    +   * @param fn  Group by function
        * @param parallelism  Parallelism level
        * @param description  The description
        * @return  the grouped stream
        */
    -  def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null)
    -    : Stream[T] = {
    -    val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy"))
    -    graph.addVertex(groupOp)
    -    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
    -    new Stream[T](graph, groupOp)
    +  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, description: String = "groupBy")
    +    : GroupByStream[T, GROUP] = {
    +    groupByWindow(DefaultGroupBy(fn), parallelism, description)
    --- End diff --
    
    Seems odd: groupBy -> groupByWindow -> GroupByStream. Suggests that all groupBy's have windowing semantics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump issue #85: [GEARPUMP-23] add window dsl

Posted by kkasravi <gi...@git.apache.org>.
Github user kkasravi commented on the issue:

    https://github.com/apache/incubator-gearpump/pull/85
  
    +1 let's get this merged and i'll rebase 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump issue #85: [GEARPUMP-23] add window dsl

Posted by codecov-io <gi...@git.apache.org>.
Github user codecov-io commented on the issue:

    https://github.com/apache/incubator-gearpump/pull/85
  
    ## [Current coverage](https://codecov.io/gh/apache/incubator-gearpump/pull/85?src=pr) is 69.65% (diff: 39.77%)
    > Merging [#85](https://codecov.io/gh/apache/incubator-gearpump/pull/85?src=pr) into [master](https://codecov.io/gh/apache/incubator-gearpump/branch/master?src=pr) will decrease coverage by **1.15%**
    
    
    ```diff
    @@             master        #85   diff @@
    ==========================================
      Files           178        186     +8   
      Lines          5903       5965    +62   
      Methods        5584       5445   -139   
      Messages          0          0          
      Branches        319        520   +201   
    ==========================================
    - Hits           4180       4155    -25   
    - Misses         1723       1810    +87   
      Partials          0          0          
    ```
    
    ![Sunburst](https://codecov.io/gh/apache/incubator-gearpump/pull/85/graphs/sunburst.svg?src=pr&size=150)
    
    > Powered by [Codecov](https://codecov.io?src=pr). Last update [5cd5b93...26ecc56](https://codecov.io/gh/apache/incubator-gearpump/compare/5cd5b9304f0f703c6f89d2865e5bb7adbb631c74...26ecc56e416d9ff4a2b9f0db51ef548c96d3db2f?src=pr)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump pull request #85: [GEARPUMP-23] add window dsl

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-gearpump/pull/85


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump pull request #85: [GEARPUMP-23] add window dsl

Posted by kkasravi <gi...@git.apache.org>.
Github user kkasravi commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/85#discussion_r80745374
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala ---
    @@ -147,6 +150,33 @@ class Stream[T](
         graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
         new Stream[R](graph, processorOp, Some(Shuffle))
       }
    +
    +  private def groupByWindow[GROUP](fn: GroupByFn[T, GROUP],
    +      parallelism: Int, description: String): GroupByStream[T, GROUP] = {
    +    val groupOp = GroupByOp[T, GROUP](fn,
    +      parallelism, description)
    +    graph.addVertex(groupOp)
    +    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
    +    new GroupByStream[T, GROUP](graph, groupOp)
    +  }
    +
    +}
    +
    +class GroupByStream[T, GROUP](
    +    private val graph: Graph[Op, OpEdge],
    +    private val groupByOp: GroupByOp[T, GROUP],
    +    private val edge: Option[OpEdge] = None) extends Stream[T](graph, groupByOp, edge) {
    --- End diff --
    
    Is 'private val graph' equivalent to 'graph' (val adds public accessors so private takes them away?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump issue #85: [GEARPUMP-23] add window dsl

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on the issue:

    https://github.com/apache/incubator-gearpump/pull/85
  
    @kkasravi 
    
    `stream.groupBy` is now a shortcut for `stream.window(CountWindow.apply(1).triggering(CountTrigger).accumulating).groupBy`.
    
    so `groupBy` has windowing semantics and all window operations should follow `groupBy`
    
    here is the window semantics
    
    ```
    - Window
        - WindowFn 
             -> SlidingWindowFn // FixedWindow and SlidingWindow 
             -> CountWindowFn  // CountWindow
        - Trigger
             -> EventTimeTrigger  // event time aggregation
             -> ProcessingTimeTrigger  // processing time aggregation
             -> CountTrigger // count aggregation, used with CountWindowFn
         - AccumulationMode
             -> Accumulating   // states are accumulated across windows
             -> Discarding  // states are not accumulated  
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump pull request #85: [GEARPUMP-23] add window dsl

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/85#discussion_r80809137
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala ---
    @@ -147,6 +150,33 @@ class Stream[T](
         graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
         new Stream[R](graph, processorOp, Some(Shuffle))
       }
    +
    +  private def groupByWindow[GROUP](fn: GroupByFn[T, GROUP],
    +      parallelism: Int, description: String): GroupByStream[T, GROUP] = {
    +    val groupOp = GroupByOp[T, GROUP](fn,
    +      parallelism, description)
    +    graph.addVertex(groupOp)
    +    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
    +    new GroupByStream[T, GROUP](graph, groupOp)
    +  }
    +
    +}
    +
    +class GroupByStream[T, GROUP](
    +    private val graph: Graph[Op, OpEdge],
    +    private val groupByOp: GroupByOp[T, GROUP],
    +    private val edge: Option[OpEdge] = None) extends Stream[T](graph, groupByOp, edge) {
    --- End diff --
    
    `private val` is visible to companion object


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump pull request #85: [GEARPUMP-23] add window dsl

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/85#discussion_r80808708
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala ---
    @@ -115,20 +121,17 @@ class Stream[T](
        *
        * For example,
        * {{{
    -   * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
    +   * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
        * }}}
        *
    -   * @param fun  Group by function
    +   * @param fn  Group by function
        * @param parallelism  Parallelism level
        * @param description  The description
        * @return  the grouped stream
        */
    -  def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null)
    -    : Stream[T] = {
    -    val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy"))
    -    graph.addVertex(groupOp)
    -    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
    -    new Stream[T](graph, groupOp)
    +  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, description: String = "groupBy")
    +    : GroupByStream[T, GROUP] = {
    +    groupByWindow(DefaultGroupBy(fn), parallelism, description)
    --- End diff --
    
    Yes, I have a new version that does so. Will update today


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---