You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gearpump.apache.org by huafengw <gi...@git.apache.org> on 2017/09/29 08:03:35 UTC

[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...

GitHub user huafengw opened a pull request:

    https://github.com/apache/incubator-gearpump/pull/227

    [GEARPUMP-350] Fix the not started app clock

    Be sure to do all of the following to help us incorporate your contribution
    quickly and easily:
    
     - [ ] Make sure the commit message is formatted like:
       `[GEARPUMP-<Jira issue #>] Meaningful description of pull request` 
     - [ ] Make sure tests pass via `sbt clean test`.
     - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. 
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/huafengw/incubator-gearpump clock

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-gearpump/pull/227.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #227
    
----
commit 31c124455d362f853aa122970fd2e8ad925a8976
Author: huafengw <fv...@gmail.com>
Date:   2017-09-29T08:02:59Z

    [GEARPUMP-350] Fix the not started app clock

----


---

[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/227#discussion_r147024559
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala ---
    @@ -53,6 +53,10 @@ object TaskUtil {
         else t1
       }
     
    +  def min(t1: Instant, t2: Instant, t3: Instant): Instant = {
    --- End diff --
    
    If this is a util, then how about making it more general by accepting *varargs* ?


---

[GitHub] incubator-gearpump issue #227: [GEARPUMP-350] Fix the not started app clock

Posted by huafengw <gi...@git.apache.org>.
Github user huafengw commented on the issue:

    https://github.com/apache/incubator-gearpump/pull/227
  
    Verified streaming examples including complex dag, SOL, word count dsl and wordcount java.


---

[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/227#discussion_r147024731
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---
    @@ -361,10 +358,11 @@ class TaskActor(
       }
     
       private def getSubscriptionWatermark(subs: List[(Int, Subscription)], wmk: Instant): Instant = {
    +    val wmkInMilli = wmk.toEpochMilli
    --- End diff --
    
    what is this change for ?


---

[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/227#discussion_r147025062
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala ---
    @@ -27,15 +27,15 @@ import org.apache.gearpump.cluster.UserConfig
     import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR}
     import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
     import org.apache.gearpump.streaming.source.Watermark
    -import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
    +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil, WatermarkProducer}
     
     /**
      * Processes messages in groups as defined by groupBy function.
      */
     class GroupByTask[IN, GROUP, OUT](
         groupBy: IN => GROUP,
         taskContext: TaskContext,
    -    userConfig: UserConfig) extends Task(taskContext, userConfig) {
    +    userConfig: UserConfig) extends Task(taskContext, userConfig) with WatermarkProducer {
    --- End diff --
    
    And only `GroupByTask` ?


---

[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/227#discussion_r141988691
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---
    @@ -254,6 +254,7 @@ class TaskActor(
               case m: Message =>
                 count += 1
                 onNext(m)
    +            processingWatermark = TaskUtil.max(processingWatermark, m.timestamp)
    --- End diff --
    
    This doesn't look right for window processing watermark which **can not** advance on every new message


---

[GitHub] incubator-gearpump issue #227: [GEARPUMP-350] Fix the not started app clock

Posted by codecov-io <gi...@git.apache.org>.
Github user codecov-io commented on the issue:

    https://github.com/apache/incubator-gearpump/pull/227
  
    # [Codecov](https://codecov.io/gh/apache/incubator-gearpump/pull/227?src=pr&el=h1) Report
    > Merging [#227](https://codecov.io/gh/apache/incubator-gearpump/pull/227?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gearpump/commit/175b08e64c3363a2da9c61e05dac5f01d1f8db2d?src=pr&el=desc) will **increase** coverage by `0.36%`.
    > The diff coverage is `100%`.
    
    
    
    ```diff
    @@            Coverage Diff             @@
    ##           master     #227      +/-   ##
    ==========================================
    + Coverage   69.01%   69.37%   +0.36%     
    ==========================================
      Files         191      191              
      Lines        6119     6120       +1     
      Branches      358      353       -5     
    ==========================================
    + Hits         4223     4246      +23     
    + Misses       1896     1874      -22
    ```
    



---

[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...

Posted by huafengw <gi...@git.apache.org>.
Github user huafengw commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/227#discussion_r147065810
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---
    @@ -361,10 +358,11 @@ class TaskActor(
       }
     
       private def getSubscriptionWatermark(subs: List[(Int, Subscription)], wmk: Instant): Instant = {
    +    val wmkInMilli = wmk.toEpochMilli
    --- End diff --
    
    To avoid multiple `toEpochMilli` calls


---

[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/227#discussion_r147024843
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala ---
    @@ -27,15 +27,15 @@ import org.apache.gearpump.cluster.UserConfig
     import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR}
     import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
     import org.apache.gearpump.streaming.source.Watermark
    -import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
    +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil, WatermarkProducer}
     
     /**
      * Processes messages in groups as defined by groupBy function.
      */
     class GroupByTask[IN, GROUP, OUT](
         groupBy: IN => GROUP,
         taskContext: TaskContext,
    -    userConfig: UserConfig) extends Task(taskContext, userConfig) {
    +    userConfig: UserConfig) extends Task(taskContext, userConfig) with WatermarkProducer {
    --- End diff --
    
    Why is `GroupByTask` a `WatermarkProducer` ?


---

[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/227#discussion_r147024506
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/task/WatermarkProducer.scala ---
    @@ -0,0 +1,21 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.gearpump.streaming.task
    +
    +trait WatermarkProducer
    --- End diff --
    
    Please add comments on the purpose of this interface


---

[GitHub] incubator-gearpump issue #227: [GEARPUMP-350] Fix the not started app clock

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on the issue:

    https://github.com/apache/incubator-gearpump/pull/227
  
    @huafengw This can be closed now


---

[GitHub] incubator-gearpump pull request #227: [GEARPUMP-350] Fix the not started app...

Posted by huafengw <gi...@git.apache.org>.
Github user huafengw closed the pull request at:

    https://github.com/apache/incubator-gearpump/pull/227


---