You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gearpump.apache.org by manuzhang <gi...@git.apache.org> on 2017/06/06 15:33:13 UTC

[GitHub] incubator-gearpump pull request #186: [GEARPUMP-316] Decouple groupBy from w...

GitHub user manuzhang opened a pull request:

    https://github.com/apache/incubator-gearpump/pull/186

    [GEARPUMP-316] Decouple groupBy from window

    Be sure to do all of the following to help us incorporate your contribution
    quickly and easily:
    
     - [ ] Make sure the commit message is formatted like:
       `[GEARPUMP-<Jira issue #>] Meaningful description of pull request` 
     - [ ] Make sure tests pass via `sbt clean test`.
     - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. 
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/manuzhang/incubator-gearpump window

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-gearpump/pull/186.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #186
    
----
commit 950f765c3488790afac04159b093de398a87fdfd
Author: manuzhang <ow...@gmail.com>
Date:   2017-06-05T04:54:12Z

    [GEARPUMP-316] Decouple groupBy from window

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump issue #186: [GEARPUMP-316] Decouple groupBy from window

Posted by codecov-io <gi...@git.apache.org>.
Github user codecov-io commented on the issue:

    https://github.com/apache/incubator-gearpump/pull/186
  
    # [Codecov](https://codecov.io/gh/apache/incubator-gearpump/pull/186?src=pr&el=h1) Report
    > Merging [#186](https://codecov.io/gh/apache/incubator-gearpump/pull/186?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gearpump/commit/c1370d9bf21b62c964d107a1f24765770116a316?src=pr&el=desc) will **decrease** coverage by `0.32%`.
    > The diff coverage is `69.53%`.
    
    
    
    ```diff
    @@            Coverage Diff            @@
    ##           master    #186      +/-   ##
    =========================================
    - Coverage   71.92%   71.6%   -0.33%     
    =========================================
      Files         191     189       -2     
      Lines        6087    6081       -6     
      Branches      532     536       +4     
    =========================================
    - Hits         4378    4354      -24     
    - Misses       1709    1727      +18
    ```
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump pull request #186: [GEARPUMP-316] Decouple groupBy from w...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/186#discussion_r121310835
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala ---
    @@ -30,7 +30,7 @@ case class Watermark(instant: Instant) {
     
     object Watermark {
     
    -  val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS)
    +  val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS + 1)
    --- End diff --
    
    Watermark means no more messages with timestamps **less than** the watermark time should be seen. Hence, the maximum watermark time should be 1ms larger than maximum timestamp 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump pull request #186: [GEARPUMP-316] Decouple groupBy from w...

Posted by huafengw <gi...@git.apache.org>.
Github user huafengw commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/186#discussion_r121446246
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---
    @@ -19,133 +19,121 @@ package org.apache.gearpump.streaming.dsl.window.impl
     
     import java.time.Instant
     
    -import akka.actor.ActorSystem
     import com.gs.collections.api.block.predicate.Predicate
    -import org.apache.gearpump.Message
    -import org.apache.gearpump.cluster.UserConfig
    -import com.gs.collections.api.block.procedure.{Procedure, Procedure2}
    +import com.gs.collections.api.block.procedure.Procedure
     import com.gs.collections.impl.list.mutable.FastList
    -import com.gs.collections.impl.map.mutable.UnifiedMap
     import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
    -import org.apache.gearpump.streaming.Constants._
     import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
    -import org.apache.gearpump.streaming.dsl.window.api.Discarding
    -import org.apache.gearpump.streaming.task.TaskContext
    -import org.apache.gearpump.util.LogUtil
    -import org.slf4j.Logger
    +import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
    +import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
     
    +import scala.collection.mutable.ArrayBuffer
     
    -trait WindowRunner {
    +trait WindowRunner[IN, OUT] extends java.io.Serializable {
     
    -  def process(message: Message): Unit
    +  def process(in: IN, time: Instant): Unit
     
    -  def trigger(time: Instant): Unit
    +  def trigger(time: Instant): TraversableOnce[(OUT, Instant)]
     }
     
    -object DefaultWindowRunner {
    +case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
    +    right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
     
    -  private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
    +  def process(in: IN, time: Instant): Unit = {
    +    left.process(in, time)
    +  }
    +
    +  def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = {
    +    left.trigger(time).foreach(result => right.process(result._1, result._2))
    +    right.trigger(time)
    +  }
     }
     
    -class DefaultWindowRunner[IN, GROUP, OUT](
    -    taskContext: TaskContext, userConfig: UserConfig,
    -    groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
    -  extends WindowRunner {
    -
    -  private val windowFn = groupBy.window.windowFn
    -  private val groupedWindowInputs = new UnifiedMap[GROUP, TreeSortedMap[Window, FastList[IN]]]
    -  private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
    -  private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean]
    -
    -  override def process(message: Message): Unit = {
    -    val input = message.value.asInstanceOf[IN]
    -    val (group, windows) = groupBy.groupBy(message)
    -    if (!groupedWindowInputs.containsKey(group)) {
    -      groupedWindowInputs.put(group, new TreeSortedMap[Window, FastList[IN]]())
    -    }
    -    val windowInputs = groupedWindowInputs.get(group)
    -    windows.foreach { win =>
    +class DefaultWindowRunner[IN, OUT](
    +    windows: Windows,
    +    fnRunner: FunctionRunner[IN, OUT])
    +  extends WindowRunner[IN, OUT] {
    +
    +  private val windowFn = windows.windowFn
    +  private val windowInputs = new TreeSortedMap[Window, FastList[(IN, Instant)]]
    +  private var setup = false
    +
    +  override def process(in: IN, time: Instant): Unit = {
    +    val wins = windowFn(new Context[IN] {
    +      override def element: IN = in
    +
    +      override def timestamp: Instant = time
    +    })
    +    wins.foreach { win =>
           if (windowFn.isNonMerging) {
             if (!windowInputs.containsKey(win)) {
    -          val inputs = new FastList[IN](1)
    +          val inputs = new FastList[(IN, Instant)]
               windowInputs.put(win, inputs)
             }
    -        windowInputs.get(win).add(input)
    +        windowInputs.get(win).add(in -> time)
           } else {
    -        merge(windowInputs, win, input)
    +        merge(windowInputs, win, in, time)
           }
         }
     
    -    if (!groupedFnRunners.containsKey(group)) {
    -      val runner = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
    -      groupedFnRunners.put(group, runner)
    -      groupedRunnerSetups.put(group, false)
    -    }
    -
    -    def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, input: IN): Unit = {
    -      val intersected = windowInputs.keySet.select(new Predicate[Window] {
    +    def merge(
    +        winIns: TreeSortedMap[Window, FastList[(IN, Instant)]],
    +        win: Window, in: IN, time: Instant): Unit = {
    +      val intersected = winIns.keySet.select(new Predicate[Window] {
             override def accept(each: Window): Boolean = {
               win.intersects(each)
             }
           })
           var mergedWin = win
    -      val mergedInputs = FastList.newListWith(input)
    +      val mergedInputs = FastList.newListWith(in -> time)
           intersected.forEach(new Procedure[Window] {
             override def value(each: Window): Unit = {
               mergedWin = mergedWin.span(each)
    -          mergedInputs.addAll(windowInputs.remove(each))
    +          mergedInputs.addAll(winIns.remove(each))
             }
           })
    -      windowInputs.put(mergedWin, mergedInputs)
    +      winIns.put(mergedWin, mergedInputs)
         }
    -
       }
     
    -  override def trigger(time: Instant): Unit = {
    -    groupedWindowInputs.forEachKeyValue(new Procedure2[GROUP, TreeSortedMap[Window, FastList[IN]]] {
    -      override def value(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = {
    -        onTrigger(group, windowInputs)
    -      }
    -    })
    -
    +  override def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = {
         @annotation.tailrec
    -    def onTrigger(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = {
    +    def onTrigger(outputs: ArrayBuffer[(OUT, Instant)]): TraversableOnce[(OUT, Instant)] = {
           if (windowInputs.notEmpty()) {
             val firstWin = windowInputs.firstKey
             if (!time.isBefore(firstWin.endTime)) {
               val inputs = windowInputs.remove(firstWin)
    -          if (groupedFnRunners.containsKey(group)) {
    -            val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
    -              (output: OUT) => {
    -                taskContext.output(Message(output, time))
    -              })
    -            val setup = groupedRunnerSetups.get(group)
    -            if (!setup) {
    -              runner.setup()
    -              groupedRunnerSetups.put(group, true)
    -            }
    -            inputs.forEach(new Procedure[IN] {
    -              override def value(t: IN): Unit = {
    -                // .toList forces eager evaluation
    -                runner.process(t).toList
    +          if (!setup) {
    +            fnRunner.setup()
    +            setup = true
    +          }
    +          inputs.forEach(new Procedure[(IN, Instant)] {
    +            override def value(v: (IN, Instant)): Unit = {
    +              fnRunner.process(v._1).foreach {
    +                out: OUT => outputs += (out -> v._2)
                   }
    -            })
    -            // .toList forces eager evaluation
    -            runner.finish().toList
    -            if (groupBy.window.accumulationMode == Discarding) {
    -              runner.teardown()
    -              groupedRunnerSetups.put(group, false)
    -              // dicarding, setup need to be called for each window
    -              onTrigger(group, windowInputs)
    -            } else {
    -              // accumulating, setup is only called for the first window
    -              onTrigger(group, windowInputs)
                 }
    +          })
    +          fnRunner.finish().foreach {
    +            out: OUT => outputs += (out -> firstWin.endTime.minusMillis(1))
    --- End diff --
    
    I'm wondering, does the message time order in `outputs` matter? If so, is it guaranteed? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump issue #186: [GEARPUMP-316] Decouple groupBy from window

Posted by huafengw <gi...@git.apache.org>.
Github user huafengw commented on the issue:

    https://github.com/apache/incubator-gearpump/pull/186
  
    First I got an irrelevant comment, there is a `org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner`, which looks like in a wrong place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump pull request #186: [GEARPUMP-316] Decouple groupBy from w...

Posted by huafengw <gi...@git.apache.org>.
Github user huafengw commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/186#discussion_r120526614
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.gearpump.streaming.dsl.task
    +
    +import java.time.Instant
    +import java.util.function.Consumer
    +
    +import com.gs.collections.impl.map.mutable.UnifiedMap
    +import org.apache.gearpump.Message
    +import org.apache.gearpump.cluster.UserConfig
    +import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR}
    +import org.apache.gearpump.streaming.dsl.window.impl.WindowRunner
    +import org.apache.gearpump.streaming.task.{Task, TaskContext}
    +
    +/**
    + * Processes messages in groups as defined by groupBy function.
    + */
    +class GroupByTask[IN, GROUP, OUT](
    +    groupBy: IN => GROUP,
    +    taskContext: TaskContext,
    +    userConfig: UserConfig) extends Task(taskContext, userConfig) {
    +
    +  def this(context: TaskContext, conf: UserConfig) = {
    +    this(
    +      conf.getValue[IN => GROUP](GEARPUMP_STREAMING_GROUPBY_FUNCTION)(context.system).get,
    +      context, conf
    +    )
    +  }
    +
    +  private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] =
    +    new UnifiedMap[GROUP, WindowRunner[IN, OUT]]
    +
    +  override def onNext(message: Message): Unit = {
    +    val input = message.value.asInstanceOf[IN]
    +    val group = groupBy(input)
    +
    +    if (!groups.containsKey(group)) {
    +      groups.put(group,
    +        userConfig.getValue[WindowRunner[IN, OUT]](
    +          GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get)
    +    }
    +
    +    groups.get(group).process(input, message.timestamp)
    +  }
    +
    +  override def onWatermarkProgress(watermark: Instant): Unit = {
    +    groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] {
    --- End diff --
    
    Looks like you just create a consumer but will it be called?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump pull request #186: [GEARPUMP-316] Decouple groupBy from w...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-gearpump/pull/186


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump pull request #186: [GEARPUMP-316] Decouple groupBy from w...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/186#discussion_r121539311
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---
    @@ -19,133 +19,121 @@ package org.apache.gearpump.streaming.dsl.window.impl
     
     import java.time.Instant
     
    -import akka.actor.ActorSystem
     import com.gs.collections.api.block.predicate.Predicate
    -import org.apache.gearpump.Message
    -import org.apache.gearpump.cluster.UserConfig
    -import com.gs.collections.api.block.procedure.{Procedure, Procedure2}
    +import com.gs.collections.api.block.procedure.Procedure
     import com.gs.collections.impl.list.mutable.FastList
    -import com.gs.collections.impl.map.mutable.UnifiedMap
     import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
    -import org.apache.gearpump.streaming.Constants._
     import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
    -import org.apache.gearpump.streaming.dsl.window.api.Discarding
    -import org.apache.gearpump.streaming.task.TaskContext
    -import org.apache.gearpump.util.LogUtil
    -import org.slf4j.Logger
    +import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
    +import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
     
    +import scala.collection.mutable.ArrayBuffer
     
    -trait WindowRunner {
    +trait WindowRunner[IN, OUT] extends java.io.Serializable {
     
    -  def process(message: Message): Unit
    +  def process(in: IN, time: Instant): Unit
     
    -  def trigger(time: Instant): Unit
    +  def trigger(time: Instant): TraversableOnce[(OUT, Instant)]
     }
     
    -object DefaultWindowRunner {
    +case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
    +    right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
     
    -  private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
    +  def process(in: IN, time: Instant): Unit = {
    +    left.process(in, time)
    +  }
    +
    +  def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = {
    +    left.trigger(time).foreach(result => right.process(result._1, result._2))
    +    right.trigger(time)
    +  }
     }
     
    -class DefaultWindowRunner[IN, GROUP, OUT](
    -    taskContext: TaskContext, userConfig: UserConfig,
    -    groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
    -  extends WindowRunner {
    -
    -  private val windowFn = groupBy.window.windowFn
    -  private val groupedWindowInputs = new UnifiedMap[GROUP, TreeSortedMap[Window, FastList[IN]]]
    -  private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
    -  private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean]
    -
    -  override def process(message: Message): Unit = {
    -    val input = message.value.asInstanceOf[IN]
    -    val (group, windows) = groupBy.groupBy(message)
    -    if (!groupedWindowInputs.containsKey(group)) {
    -      groupedWindowInputs.put(group, new TreeSortedMap[Window, FastList[IN]]())
    -    }
    -    val windowInputs = groupedWindowInputs.get(group)
    -    windows.foreach { win =>
    +class DefaultWindowRunner[IN, OUT](
    +    windows: Windows,
    +    fnRunner: FunctionRunner[IN, OUT])
    +  extends WindowRunner[IN, OUT] {
    +
    +  private val windowFn = windows.windowFn
    +  private val windowInputs = new TreeSortedMap[Window, FastList[(IN, Instant)]]
    +  private var setup = false
    +
    +  override def process(in: IN, time: Instant): Unit = {
    +    val wins = windowFn(new Context[IN] {
    +      override def element: IN = in
    +
    +      override def timestamp: Instant = time
    +    })
    +    wins.foreach { win =>
           if (windowFn.isNonMerging) {
             if (!windowInputs.containsKey(win)) {
    -          val inputs = new FastList[IN](1)
    +          val inputs = new FastList[(IN, Instant)]
               windowInputs.put(win, inputs)
             }
    -        windowInputs.get(win).add(input)
    +        windowInputs.get(win).add(in -> time)
           } else {
    -        merge(windowInputs, win, input)
    +        merge(windowInputs, win, in, time)
           }
         }
     
    -    if (!groupedFnRunners.containsKey(group)) {
    -      val runner = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
    -      groupedFnRunners.put(group, runner)
    -      groupedRunnerSetups.put(group, false)
    -    }
    -
    -    def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, input: IN): Unit = {
    -      val intersected = windowInputs.keySet.select(new Predicate[Window] {
    +    def merge(
    +        winIns: TreeSortedMap[Window, FastList[(IN, Instant)]],
    +        win: Window, in: IN, time: Instant): Unit = {
    +      val intersected = winIns.keySet.select(new Predicate[Window] {
             override def accept(each: Window): Boolean = {
               win.intersects(each)
             }
           })
           var mergedWin = win
    -      val mergedInputs = FastList.newListWith(input)
    +      val mergedInputs = FastList.newListWith(in -> time)
           intersected.forEach(new Procedure[Window] {
             override def value(each: Window): Unit = {
               mergedWin = mergedWin.span(each)
    -          mergedInputs.addAll(windowInputs.remove(each))
    +          mergedInputs.addAll(winIns.remove(each))
             }
           })
    -      windowInputs.put(mergedWin, mergedInputs)
    +      winIns.put(mergedWin, mergedInputs)
         }
    -
       }
     
    -  override def trigger(time: Instant): Unit = {
    -    groupedWindowInputs.forEachKeyValue(new Procedure2[GROUP, TreeSortedMap[Window, FastList[IN]]] {
    -      override def value(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = {
    -        onTrigger(group, windowInputs)
    -      }
    -    })
    -
    +  override def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = {
         @annotation.tailrec
    -    def onTrigger(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = {
    +    def onTrigger(outputs: ArrayBuffer[(OUT, Instant)]): TraversableOnce[(OUT, Instant)] = {
           if (windowInputs.notEmpty()) {
             val firstWin = windowInputs.firstKey
             if (!time.isBefore(firstWin.endTime)) {
               val inputs = windowInputs.remove(firstWin)
    -          if (groupedFnRunners.containsKey(group)) {
    -            val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
    -              (output: OUT) => {
    -                taskContext.output(Message(output, time))
    -              })
    -            val setup = groupedRunnerSetups.get(group)
    -            if (!setup) {
    -              runner.setup()
    -              groupedRunnerSetups.put(group, true)
    -            }
    -            inputs.forEach(new Procedure[IN] {
    -              override def value(t: IN): Unit = {
    -                // .toList forces eager evaluation
    -                runner.process(t).toList
    +          if (!setup) {
    +            fnRunner.setup()
    +            setup = true
    +          }
    +          inputs.forEach(new Procedure[(IN, Instant)] {
    +            override def value(v: (IN, Instant)): Unit = {
    +              fnRunner.process(v._1).foreach {
    +                out: OUT => outputs += (out -> v._2)
                   }
    -            })
    -            // .toList forces eager evaluation
    -            runner.finish().toList
    -            if (groupBy.window.accumulationMode == Discarding) {
    -              runner.teardown()
    -              groupedRunnerSetups.put(group, false)
    -              // dicarding, setup need to be called for each window
    -              onTrigger(group, windowInputs)
    -            } else {
    -              // accumulating, setup is only called for the first window
    -              onTrigger(group, windowInputs)
                 }
    +          })
    +          fnRunner.finish().foreach {
    +            out: OUT => outputs += (out -> firstWin.endTime.minusMillis(1))
    --- End diff --
    
    Yes, it could make the output minClock non increasing. I created https://issues.apache.org/jira/browse/GEARPUMP-317


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump pull request #186: [GEARPUMP-316] Decouple groupBy from w...

Posted by huafengw <gi...@git.apache.org>.
Github user huafengw commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/186#discussion_r120526897
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala ---
    @@ -32,35 +32,34 @@ object WindowFunction {
       }
     }
     
    -trait WindowFunction[T] {
    +trait WindowFunction {
     
    -  def apply(context: WindowFunction.Context[T]): Array[Window]
    +  def apply[T](context: WindowFunction.Context[T]): Array[Window]
     
       def isNonMerging: Boolean
     }
     
    -abstract class NonMergingWindowFunction[T] extends WindowFunction[T] {
    +abstract class NonMergingWindowFunction extends WindowFunction {
     
       override def isNonMerging: Boolean = true
     }
     
    -case class GlobalWindowFunction[T]() extends NonMergingWindowFunction[T] {
    +case class GlobalWindowFunction() extends NonMergingWindowFunction {
     
    -  override def apply(context: WindowFunction.Context[T]): Array[Window] = {
    +  override def apply[T](context: WindowFunction.Context[T]): Array[Window] = {
         Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS),
    --- End diff --
    
    We can create just one static object, no need to create one each time as it's immutable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump pull request #186: [GEARPUMP-316] Decouple groupBy from w...

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on a diff in the pull request:

    https://github.com/apache/incubator-gearpump/pull/186#discussion_r121310608
  
    --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---
    @@ -19,133 +19,121 @@ package org.apache.gearpump.streaming.dsl.window.impl
     
     import java.time.Instant
     
    -import akka.actor.ActorSystem
     import com.gs.collections.api.block.predicate.Predicate
    -import org.apache.gearpump.Message
    -import org.apache.gearpump.cluster.UserConfig
    -import com.gs.collections.api.block.procedure.{Procedure, Procedure2}
    +import com.gs.collections.api.block.procedure.Procedure
     import com.gs.collections.impl.list.mutable.FastList
    -import com.gs.collections.impl.map.mutable.UnifiedMap
     import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
    -import org.apache.gearpump.streaming.Constants._
     import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
    -import org.apache.gearpump.streaming.dsl.window.api.Discarding
    -import org.apache.gearpump.streaming.task.TaskContext
    -import org.apache.gearpump.util.LogUtil
    -import org.slf4j.Logger
    +import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
    +import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
     
    +import scala.collection.mutable.ArrayBuffer
     
    -trait WindowRunner {
    +trait WindowRunner[IN, OUT] extends java.io.Serializable {
     
    -  def process(message: Message): Unit
    +  def process(in: IN, time: Instant): Unit
     
    -  def trigger(time: Instant): Unit
    +  def trigger(time: Instant): TraversableOnce[(OUT, Instant)]
     }
     
    -object DefaultWindowRunner {
    +case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
    +    right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
     
    -  private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
    +  def process(in: IN, time: Instant): Unit = {
    +    left.process(in, time)
    +  }
    +
    +  def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = {
    +    left.trigger(time).foreach(result => right.process(result._1, result._2))
    +    right.trigger(time)
    +  }
     }
     
    -class DefaultWindowRunner[IN, GROUP, OUT](
    -    taskContext: TaskContext, userConfig: UserConfig,
    -    groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
    -  extends WindowRunner {
    -
    -  private val windowFn = groupBy.window.windowFn
    -  private val groupedWindowInputs = new UnifiedMap[GROUP, TreeSortedMap[Window, FastList[IN]]]
    -  private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
    -  private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean]
    -
    -  override def process(message: Message): Unit = {
    -    val input = message.value.asInstanceOf[IN]
    -    val (group, windows) = groupBy.groupBy(message)
    -    if (!groupedWindowInputs.containsKey(group)) {
    -      groupedWindowInputs.put(group, new TreeSortedMap[Window, FastList[IN]]())
    -    }
    -    val windowInputs = groupedWindowInputs.get(group)
    -    windows.foreach { win =>
    +class DefaultWindowRunner[IN, OUT](
    +    windows: Windows,
    +    fnRunner: FunctionRunner[IN, OUT])
    +  extends WindowRunner[IN, OUT] {
    +
    +  private val windowFn = windows.windowFn
    +  private val windowInputs = new TreeSortedMap[Window, FastList[(IN, Instant)]]
    +  private var setup = false
    +
    +  override def process(in: IN, time: Instant): Unit = {
    +    val wins = windowFn(new Context[IN] {
    +      override def element: IN = in
    +
    +      override def timestamp: Instant = time
    +    })
    +    wins.foreach { win =>
           if (windowFn.isNonMerging) {
             if (!windowInputs.containsKey(win)) {
    -          val inputs = new FastList[IN](1)
    +          val inputs = new FastList[(IN, Instant)]
               windowInputs.put(win, inputs)
             }
    -        windowInputs.get(win).add(input)
    +        windowInputs.get(win).add(in -> time)
           } else {
    -        merge(windowInputs, win, input)
    +        merge(windowInputs, win, in, time)
           }
         }
     
    -    if (!groupedFnRunners.containsKey(group)) {
    -      val runner = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
    -      groupedFnRunners.put(group, runner)
    -      groupedRunnerSetups.put(group, false)
    -    }
    -
    -    def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, input: IN): Unit = {
    -      val intersected = windowInputs.keySet.select(new Predicate[Window] {
    +    def merge(
    +        winIns: TreeSortedMap[Window, FastList[(IN, Instant)]],
    +        win: Window, in: IN, time: Instant): Unit = {
    +      val intersected = winIns.keySet.select(new Predicate[Window] {
             override def accept(each: Window): Boolean = {
               win.intersects(each)
             }
           })
           var mergedWin = win
    -      val mergedInputs = FastList.newListWith(input)
    +      val mergedInputs = FastList.newListWith(in -> time)
           intersected.forEach(new Procedure[Window] {
             override def value(each: Window): Unit = {
               mergedWin = mergedWin.span(each)
    -          mergedInputs.addAll(windowInputs.remove(each))
    +          mergedInputs.addAll(winIns.remove(each))
             }
           })
    -      windowInputs.put(mergedWin, mergedInputs)
    +      winIns.put(mergedWin, mergedInputs)
         }
    -
       }
     
    -  override def trigger(time: Instant): Unit = {
    -    groupedWindowInputs.forEachKeyValue(new Procedure2[GROUP, TreeSortedMap[Window, FastList[IN]]] {
    -      override def value(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = {
    -        onTrigger(group, windowInputs)
    -      }
    -    })
    -
    +  override def trigger(time: Instant): TraversableOnce[(OUT, Instant)] = {
         @annotation.tailrec
    -    def onTrigger(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = {
    +    def onTrigger(outputs: ArrayBuffer[(OUT, Instant)]): TraversableOnce[(OUT, Instant)] = {
           if (windowInputs.notEmpty()) {
             val firstWin = windowInputs.firstKey
             if (!time.isBefore(firstWin.endTime)) {
               val inputs = windowInputs.remove(firstWin)
    -          if (groupedFnRunners.containsKey(group)) {
    -            val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
    -              (output: OUT) => {
    -                taskContext.output(Message(output, time))
    -              })
    -            val setup = groupedRunnerSetups.get(group)
    -            if (!setup) {
    -              runner.setup()
    -              groupedRunnerSetups.put(group, true)
    -            }
    -            inputs.forEach(new Procedure[IN] {
    -              override def value(t: IN): Unit = {
    -                // .toList forces eager evaluation
    -                runner.process(t).toList
    +          if (!setup) {
    +            fnRunner.setup()
    +            setup = true
    +          }
    +          inputs.forEach(new Procedure[(IN, Instant)] {
    +            override def value(v: (IN, Instant)): Unit = {
    +              fnRunner.process(v._1).foreach {
    +                out: OUT => outputs += (out -> v._2)
                   }
    -            })
    -            // .toList forces eager evaluation
    -            runner.finish().toList
    -            if (groupBy.window.accumulationMode == Discarding) {
    -              runner.teardown()
    -              groupedRunnerSetups.put(group, false)
    -              // dicarding, setup need to be called for each window
    -              onTrigger(group, windowInputs)
    -            } else {
    -              // accumulating, setup is only called for the first window
    -              onTrigger(group, windowInputs)
                 }
    +          })
    +          fnRunner.finish().foreach {
    +            out: OUT => outputs += (out -> firstWin.endTime.minusMillis(1))
    --- End diff --
    
    @huafengw output timestamp for aggregated value is `END_OF_WINDOW - 1`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump issue #186: [GEARPUMP-316] Decouple groupBy from window

Posted by huafengw <gi...@git.apache.org>.
Github user huafengw commented on the issue:

    https://github.com/apache/incubator-gearpump/pull/186
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump issue #186: [GEARPUMP-316] Decouple groupBy from window

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on the issue:

    https://github.com/apache/incubator-gearpump/pull/186
  
    @huafengw. updated and the `GroupByPartitioner` has been moved to the `partitioner` package


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-gearpump issue #186: [GEARPUMP-316] Decouple groupBy from window

Posted by manuzhang <gi...@git.apache.org>.
Github user manuzhang commented on the issue:

    https://github.com/apache/incubator-gearpump/pull/186
  
    @huafengw Added more descriptions. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---