You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by xiliu82 <gi...@git.apache.org> on 2014/09/04 05:34:10 UTC

[GitHub] spark pull request: Augmented updateStateByKey API

GitHub user xiliu82 opened a pull request:

    https://github.com/apache/spark/pull/2267

    Augmented updateStateByKey API

    Augment updateStateByKey API by exposing both key and current batch timestamp to applications.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xiliu82/spark patch-3

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2267.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2267
    
----
commit 8f940571a49438b647bc8909b36afffe4efc19d9
Author: xiliu82 <xi...@gmail.com>
Date:   2014-09-04T02:48:33Z

    Create spark2

commit 9a2c9295373e8a59a40ed5e655bec5a32b151076
Author: Xi Liu <xi...@conviva.com>
Date:   2014-09-04T03:18:22Z

    augment the API for updateStateByKey

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Augmented updateStateByKey API

Posted by xiliu82 <gi...@git.apache.org>.
Github user xiliu82 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2267#discussion_r17147160
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala ---
    @@ -396,6 +396,26 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
     
       /**
        * Return a new "state" DStream where the state for each key is updated by applying
    +   * the given function on the previous state of the key and the new values of the key.
    +   * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    +   * @param updateFunc State update function. If `this` function returns None, then
    +   *                   corresponding state key-value pair will be eliminated.
    +   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
    +   *                    DStream.
    +   * @tparam S State type
    +   */
    +  def updateStateByKey[S: ClassTag](
    +      updateFunc: (Time, K, Seq[V], Option[S]) => Option[S],
    +      partitioner: Partitioner
    +    ): DStream[(K, S)] = {
    +    val newUpdateFunc = (time: Time, iterator: Iterator[(K, Seq[V], Option[S])]) => {
    --- End diff --
    
    Duplication is removed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [STREAMING] SPARK-3505: Augmenting SparkStream...

Posted by xiliu82 <gi...@git.apache.org>.
Github user xiliu82 closed the pull request at:

    https://github.com/apache/spark/pull/2267


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [STREAMING] SPARK-3505: Augmenting SparkStream...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2267#issuecomment-55368757
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20210/consoleFull) for   PR 2267 at commit [`db54919`](https://github.com/apache/spark/commit/db54919e7b0d1d7ce864cdff34317c61bcf3c0c4).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Augmented updateStateByKey API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2267#issuecomment-55365085
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20210/consoleFull) for   PR 2267 at commit [`db54919`](https://github.com/apache/spark/commit/db54919e7b0d1d7ce864cdff34317c61bcf3c0c4).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Augmented updateStateByKey API

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2267#issuecomment-54400007
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [STREAMING] SPARK-3505: Augmenting SparkStream...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2267#issuecomment-68081435
  
    There has been some update to the updateStateByKey API. Could you update this patch. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [STREAMING] SPARK-3505: Augmenting SparkStream...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2267#issuecomment-72719145
  
    This is not closed yet :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Augmented updateStateByKey API

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2267#discussion_r17101578
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala ---
    @@ -396,6 +396,26 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
     
       /**
        * Return a new "state" DStream where the state for each key is updated by applying
    +   * the given function on the previous state of the key and the new values of the key.
    +   * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    +   * @param updateFunc State update function. If `this` function returns None, then
    +   *                   corresponding state key-value pair will be eliminated.
    +   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
    +   *                    DStream.
    +   * @tparam S State type
    +   */
    +  def updateStateByKey[S: ClassTag](
    +      updateFunc: (Time, K, Seq[V], Option[S]) => Option[S],
    +      partitioner: Partitioner
    +    ): DStream[(K, S)] = {
    +    val newUpdateFunc = (time: Time, iterator: Iterator[(K, Seq[V], Option[S])]) => {
    --- End diff --
    
    The existing method could call the new method rather than duplicate the logic. If the user supplies a function with Seq[V], Option[S] as args, that can be made into a function that also accepts and does nothing with Time.
    
    What's the use case for this though?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Augmented updateStateByKey API

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2267#issuecomment-55364912
  
    @conviva @xiliu82 Could you create a JIRA and add the JIRA number to the PR title, similar to the other PRs? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [STREAMING] SPARK-3505: Augmenting SparkStream...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2267#issuecomment-72559622
  
    Hey @xiliu82 if you are not able to get a chance on this, mind closing this? You can always open it when you are ready to submit it for review again. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [STREAMING] SPARK-3505: Augmenting SparkStream...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2267#issuecomment-68765456
  
    Ping, for updating this PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Augmented updateStateByKey API

Posted by xiliu82 <gi...@git.apache.org>.
Github user xiliu82 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2267#discussion_r17133551
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala ---
    @@ -396,6 +396,26 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)])
     
       /**
        * Return a new "state" DStream where the state for each key is updated by applying
    +   * the given function on the previous state of the key and the new values of the key.
    +   * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    +   * @param updateFunc State update function. If `this` function returns None, then
    +   *                   corresponding state key-value pair will be eliminated.
    +   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
    +   *                    DStream.
    +   * @tparam S State type
    +   */
    +  def updateStateByKey[S: ClassTag](
    +      updateFunc: (Time, K, Seq[V], Option[S]) => Option[S],
    +      partitioner: Partitioner
    +    ): DStream[(K, S)] = {
    +    val newUpdateFunc = (time: Time, iterator: Iterator[(K, Seq[V], Option[S])]) => {
    --- End diff --
    
    I will check how I can clean it up a bit.
    
    On a high level, our application process values differently for different key and timestamps.
    1) We have some ID in the key, and different ID will be treated differently. 
    2) We need timestamp to decide if we are going to keep the state or drop it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Augmented updateStateByKey API

Posted by xiliu82 <gi...@git.apache.org>.
Github user xiliu82 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2267#discussion_r17131871
  
    --- Diff: spark2 ---
    @@ -0,0 +1 @@
    +Some changes
    --- End diff --
    
    Yep, this was from the fork, should be removed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Augmented updateStateByKey API

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2267#discussion_r17101501
  
    --- Diff: spark2 ---
    @@ -0,0 +1 @@
    +Some changes
    --- End diff --
    
    What is this file? accidentally added?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [STREAMING] SPARK-3505: Augmenting SparkStream...

Posted by xiliu82 <gi...@git.apache.org>.
Github user xiliu82 commented on the pull request:

    https://github.com/apache/spark/pull/2267#issuecomment-68785881
  
    I will try to do that this week.
    
    > On Jan 5, 2015, at 11:50 AM, Tathagata Das <no...@github.com> wrote:
    > 
    > Ping, for updating this PR.
    > 
    > —
    > Reply to this email directly or view it on GitHub <https://github.com/apache/spark/pull/2267#issuecomment-68765456>.
    > 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Augmented updateStateByKey API

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2267#issuecomment-54694323
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [STREAMING] SPARK-3505: Augmenting SparkStream...

Posted by xiliu82 <gi...@git.apache.org>.
Github user xiliu82 commented on the pull request:

    https://github.com/apache/spark/pull/2267#issuecomment-72559829
  
    Sorry, completely slipped my mind.. I will close it for now, and submit later.
    
    > On Feb 2, 2015, at 3:15 PM, Tathagata Das <no...@github.com> wrote:
    > 
    > Hey @xiliu82 <https://github.com/xiliu82> if you are not able to get a chance on this, mind closing this? You can always open it when you are ready to submit it for review again.
    > 
    > —
    > Reply to this email directly or view it on GitHub <https://github.com/apache/spark/pull/2267#issuecomment-72559622>.
    > 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: Augmented updateStateByKey API

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/2267#issuecomment-55364870
  
    Jenkins, this is ok to test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org