You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gearpump.apache.org by huafengw <gi...@git.apache.org> on 2017/09/29 08:03:35 UTC
[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...
GitHub user huafengw opened a pull request:
https://github.com/apache/incubator-gearpump/pull/227
[GEARPUMP-350] Fix the not started app clock
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
- [ ] Make sure the commit message is formatted like:
`[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
- [ ] Make sure tests pass via `sbt clean test`.
- [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/huafengw/incubator-gearpump clock
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-gearpump/pull/227.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #227
----
commit 31c124455d362f853aa122970fd2e8ad925a8976
Author: huafengw <fv...@gmail.com>
Date: 2017-09-29T08:02:59Z
[GEARPUMP-350] Fix the not started app clock
----
---
[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...
Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/227#discussion_r147024559
--- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala ---
@@ -53,6 +53,10 @@ object TaskUtil {
else t1
}
+ def min(t1: Instant, t2: Instant, t3: Instant): Instant = {
--- End diff --
If this is a util, then how about making it more general by accepting *varargs* ?
---
[GitHub] incubator-gearpump issue #227: [GEARPUMP-350] Fix the not started app clock
Posted by huafengw <gi...@git.apache.org>.
Github user huafengw commented on the issue:
https://github.com/apache/incubator-gearpump/pull/227
Verified streaming examples including complex dag, SOL, word count dsl and wordcount java.
---
[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...
Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/227#discussion_r147024731
--- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---
@@ -361,10 +358,11 @@ class TaskActor(
}
private def getSubscriptionWatermark(subs: List[(Int, Subscription)], wmk: Instant): Instant = {
+ val wmkInMilli = wmk.toEpochMilli
--- End diff --
what is this change for ?
---
[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...
Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/227#discussion_r147025062
--- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala ---
@@ -27,15 +27,15 @@ import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR}
import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
import org.apache.gearpump.streaming.source.Watermark
-import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil, WatermarkProducer}
/**
* Processes messages in groups as defined by groupBy function.
*/
class GroupByTask[IN, GROUP, OUT](
groupBy: IN => GROUP,
taskContext: TaskContext,
- userConfig: UserConfig) extends Task(taskContext, userConfig) {
+ userConfig: UserConfig) extends Task(taskContext, userConfig) with WatermarkProducer {
--- End diff --
And only `GroupByTask` ?
---
[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...
Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/227#discussion_r141988691
--- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---
@@ -254,6 +254,7 @@ class TaskActor(
case m: Message =>
count += 1
onNext(m)
+ processingWatermark = TaskUtil.max(processingWatermark, m.timestamp)
--- End diff --
This doesn't look right for window processing watermark which **can not** advance on every new message
---
[GitHub] incubator-gearpump issue #227: [GEARPUMP-350] Fix the not started app clock
Posted by codecov-io <gi...@git.apache.org>.
Github user codecov-io commented on the issue:
https://github.com/apache/incubator-gearpump/pull/227
# [Codecov](https://codecov.io/gh/apache/incubator-gearpump/pull/227?src=pr&el=h1) Report
> Merging [#227](https://codecov.io/gh/apache/incubator-gearpump/pull/227?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gearpump/commit/175b08e64c3363a2da9c61e05dac5f01d1f8db2d?src=pr&el=desc) will **increase** coverage by `0.36%`.
> The diff coverage is `100%`.
```diff
@@ Coverage Diff @@
## master #227 +/- ##
==========================================
+ Coverage 69.01% 69.37% +0.36%
==========================================
Files 191 191
Lines 6119 6120 +1
Branches 358 353 -5
==========================================
+ Hits 4223 4246 +23
+ Misses 1896 1874 -22
```
---
[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...
Posted by huafengw <gi...@git.apache.org>.
Github user huafengw commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/227#discussion_r147065810
--- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---
@@ -361,10 +358,11 @@ class TaskActor(
}
private def getSubscriptionWatermark(subs: List[(Int, Subscription)], wmk: Instant): Instant = {
+ val wmkInMilli = wmk.toEpochMilli
--- End diff --
To avoid multiple `toEpochMilli` calls
---
[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...
Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/227#discussion_r147024843
--- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala ---
@@ -27,15 +27,15 @@ import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR}
import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
import org.apache.gearpump.streaming.source.Watermark
-import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil, WatermarkProducer}
/**
* Processes messages in groups as defined by groupBy function.
*/
class GroupByTask[IN, GROUP, OUT](
groupBy: IN => GROUP,
taskContext: TaskContext,
- userConfig: UserConfig) extends Task(taskContext, userConfig) {
+ userConfig: UserConfig) extends Task(taskContext, userConfig) with WatermarkProducer {
--- End diff --
Why is `GroupByTask` a `WatermarkProducer` ?
---
[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...
Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/227#discussion_r147024506
--- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/task/WatermarkProducer.scala ---
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.task
+
+trait WatermarkProducer
--- End diff --
Please add comments on the purpose of this interface
---
[GitHub] incubator-gearpump issue #227: [GEARPUMP-350] Fix the not started app clock
Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on the issue:
https://github.com/apache/incubator-gearpump/pull/227
@huafengw This can be closed now
---
[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...
Posted by huafengw <gi...@git.apache.org>.
Github user huafengw closed the pull request at:
https://github.com/apache/incubator-gearpump/pull/227
---