You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gearpump.apache.org by manuzhang <gi...@git.apache.org> on 2016/09/21 07:04:14 UTC
[GitHub] incubator-gearpump pull request #85: [GEARPUMP-23] add window dsl
GitHub user manuzhang opened a pull request:
https://github.com/apache/incubator-gearpump/pull/85
[GEARPUMP-23] add window dsl
The PR is opened for early review and the work is in progress with following todos.
- [ ] basic window dsl support with `WindowedWordCount` example
- [ ] improve `ReduceFunction` to not emit immediate results
- [ ] add unit tests
- [ ] add comments and update documentation
- [ ] support different types of computation (e.g. monoid which doesn't require input elements to be held in the window)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/manuzhang/incubator-gearpump window_dsl
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-gearpump/pull/85.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #85
----
commit a69554bcdb2620a16107b23d6cee8eb5308a6043
Author: manuzhang <ow...@gmail.com>
Date: 2016-09-08T03:22:18Z
[GEARPUMP-23] add window dsl
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-gearpump pull request #85: [GEARPUMP-23] add window dsl
Posted by kkasravi <gi...@git.apache.org>.
Github user kkasravi commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/85#discussion_r80744615
--- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala ---
@@ -115,20 +121,17 @@ class Stream[T](
*
* For example,
* {{{
- * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
+ * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
* }}}
*
- * @param fun Group by function
+ * @param fn Group by function
* @param parallelism Parallelism level
* @param description The description
* @return the grouped stream
*/
- def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null)
- : Stream[T] = {
- val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy"))
- graph.addVertex(groupOp)
- graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
- new Stream[T](graph, groupOp)
+ def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, description: String = "groupBy")
+ : GroupByStream[T, GROUP] = {
+ groupByWindow(DefaultGroupBy(fn), parallelism, description)
--- End diff --
Seems odd: groupBy -> groupByWindow -> GroupByStream. Suggests that all groupBy's have windowing semantics.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-gearpump issue #85: [GEARPUMP-23] add window dsl
Posted by kkasravi <gi...@git.apache.org>.
Github user kkasravi commented on the issue:
https://github.com/apache/incubator-gearpump/pull/85
+1 let's get this merged and i'll rebase
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-gearpump issue #85: [GEARPUMP-23] add window dsl
Posted by codecov-io <gi...@git.apache.org>.
Github user codecov-io commented on the issue:
https://github.com/apache/incubator-gearpump/pull/85
## [Current coverage](https://codecov.io/gh/apache/incubator-gearpump/pull/85?src=pr) is 69.65% (diff: 39.77%)
> Merging [#85](https://codecov.io/gh/apache/incubator-gearpump/pull/85?src=pr) into [master](https://codecov.io/gh/apache/incubator-gearpump/branch/master?src=pr) will decrease coverage by **1.15%**
```diff
@@ master #85 diff @@
==========================================
Files 178 186 +8
Lines 5903 5965 +62
Methods 5584 5445 -139
Messages 0 0
Branches 319 520 +201
==========================================
- Hits 4180 4155 -25
- Misses 1723 1810 +87
Partials 0 0
```
![Sunburst](https://codecov.io/gh/apache/incubator-gearpump/pull/85/graphs/sunburst.svg?src=pr&size=150)
> Powered by [Codecov](https://codecov.io?src=pr). Last update [5cd5b93...26ecc56](https://codecov.io/gh/apache/incubator-gearpump/compare/5cd5b9304f0f703c6f89d2865e5bb7adbb631c74...26ecc56e416d9ff4a2b9f0db51ef548c96d3db2f?src=pr)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-gearpump pull request #85: [GEARPUMP-23] add window dsl
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/incubator-gearpump/pull/85
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-gearpump pull request #85: [GEARPUMP-23] add window dsl
Posted by kkasravi <gi...@git.apache.org>.
Github user kkasravi commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/85#discussion_r80745374
--- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala ---
@@ -147,6 +150,33 @@ class Stream[T](
graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
new Stream[R](graph, processorOp, Some(Shuffle))
}
+
+ private def groupByWindow[GROUP](fn: GroupByFn[T, GROUP],
+ parallelism: Int, description: String): GroupByStream[T, GROUP] = {
+ val groupOp = GroupByOp[T, GROUP](fn,
+ parallelism, description)
+ graph.addVertex(groupOp)
+ graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
+ new GroupByStream[T, GROUP](graph, groupOp)
+ }
+
+}
+
+class GroupByStream[T, GROUP](
+ private val graph: Graph[Op, OpEdge],
+ private val groupByOp: GroupByOp[T, GROUP],
+ private val edge: Option[OpEdge] = None) extends Stream[T](graph, groupByOp, edge) {
--- End diff --
Is 'private val graph' equivalent to 'graph' (val adds public accessors so private takes them away?)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-gearpump issue #85: [GEARPUMP-23] add window dsl
Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on the issue:
https://github.com/apache/incubator-gearpump/pull/85
@kkasravi
`stream.groupBy` is now a shortcut for `stream.window(CountWindow.apply(1).triggering(CountTrigger).accumulating).groupBy`.
so `groupBy` has windowing semantics and all window operations should follow `groupBy`
here is the window semantics
```
- Window
- WindowFn
-> SlidingWindowFn // FixedWindow and SlidingWindow
-> CountWindowFn // CountWindow
- Trigger
-> EventTimeTrigger // event time aggregation
-> ProcessingTimeTrigger // processing time aggregation
-> CountTrigger // count aggregation, used with CountWindowFn
- AccumulationMode
-> Accumulating // states are accumulated across windows
-> Discarding // states are not accumulated
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-gearpump pull request #85: [GEARPUMP-23] add window dsl
Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/85#discussion_r80809137
--- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala ---
@@ -147,6 +150,33 @@ class Stream[T](
graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
new Stream[R](graph, processorOp, Some(Shuffle))
}
+
+ private def groupByWindow[GROUP](fn: GroupByFn[T, GROUP],
+ parallelism: Int, description: String): GroupByStream[T, GROUP] = {
+ val groupOp = GroupByOp[T, GROUP](fn,
+ parallelism, description)
+ graph.addVertex(groupOp)
+ graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
+ new GroupByStream[T, GROUP](graph, groupOp)
+ }
+
+}
+
+class GroupByStream[T, GROUP](
+ private val graph: Graph[Op, OpEdge],
+ private val groupByOp: GroupByOp[T, GROUP],
+ private val edge: Option[OpEdge] = None) extends Stream[T](graph, groupByOp, edge) {
--- End diff --
`private val` is visible to companion object
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-gearpump pull request #85: [GEARPUMP-23] add window dsl
Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/85#discussion_r80808708
--- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala ---
@@ -115,20 +121,17 @@ class Stream[T](
*
* For example,
* {{{
- * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
+ * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
* }}}
*
- * @param fun Group by function
+ * @param fn Group by function
* @param parallelism Parallelism level
* @param description The description
* @return the grouped stream
*/
- def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null)
- : Stream[T] = {
- val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy"))
- graph.addVertex(groupOp)
- graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
- new Stream[T](graph, groupOp)
+ def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, description: String = "groupBy")
+ : GroupByStream[T, GROUP] = {
+ groupByWindow(DefaultGroupBy(fn), parallelism, description)
--- End diff --
Yes, I have a new version that does so. Will update today
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---