You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2017/02/08 05:35:14 UTC

[GitHub] spark pull request #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/16850

    [SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations

    ## What changes were proposed in this pull request?
    
    `mapGroupsWithState` is a new API for arbitrary stateful operations in Structured Streaming, similar to `DStream.mapWithState` 
    
    *Requirements*
    - Users should be able to specify a function that can do the following
    - Access the input row corresponding to a key
    - Access the previous state corresponding to a key
    - Optionally, update or remove the state
    - Output any number of new rows (or none at all)
    
    *Proposed API*
    ```
    // ------------ New methods on KeyValueGroupedDataset ------------
    class KeyValueGroupedDataset[K, V] {	
    	// Scala friendly
    	def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => U)
            def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => Iterator[U])
    	// Java friendly
           def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
           def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
    }
    
    // ------------------- New Java-friendly function classes ------------------- 
    public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
      R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
    }
    public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
      Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
    }
    
    // ---------------------- Wrapper class for state data ---------------------- 
    trait KeyedState[S] {
    	def exists(): Boolean  	
      	def get(): S 			// throws Exception is state does not exist
    	def getOption(): Option[S]       
    	def update(newState: S): Unit
    	def remove(): Unit		// exists() will be false after this
    }
    ```
    
    Key Semantics of the State class
    - The state can be null.
    - If the state.remove() is called, then state.exists() will return false, and getOption will returm None.
    - After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...). 
    - None of the operations are thread-safe. This is to avoid memory barriers.
    
    *Usage*
    ```
    val stateFunc = (word: String, words: Iterator[String, runningCount: KeyedState[Long]) => {
        val newCount = words.size + runningCount.getOption.getOrElse(0L)
        runningCount.update(newCount)
       (word, newCount)
    }
    
    dataset					                        // type is Dataset[String]
      .groupByKey[String](w => w)        	                // generates KeyValueGroupedDataset[String, String]
      .mapGroupsWithState[Long, (String, Long)](stateFunc)	// returns Dataset[(String, Long)]
    ```
    
    
    ## How was this patch tested?
    New unit tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark mapWithState-branch-2.1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16850.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16850
    
----
commit 5025cb7511a43e24cb3a181eb7b06c69b024479f
Author: Tathagata Das <ta...@gmail.com>
Date:   2017-02-08T04:21:00Z

    [SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations
    
    `mapGroupsWithState` is a new API for arbitrary stateful operations in Structured Streaming, similar to `DStream.mapWithState`
    
    *Requirements*
    - Users should be able to specify a function that can do the following
    - Access the input row corresponding to a key
    - Access the previous state corresponding to a key
    - Optionally, update or remove the state
    - Output any number of new rows (or none at all)
    
    *Proposed API*
    ```
    // ------------ New methods on KeyValueGroupedDataset ------------
    class KeyValueGroupedDataset[K, V] {
    	// Scala friendly
    	def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => U)
            def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => Iterator[U])
    	// Java friendly
           def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
           def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
    }
    
    // ------------------- New Java-friendly function classes -------------------
    public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
      R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
    }
    public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
      Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
    }
    
    // ---------------------- Wrapper class for state data ----------------------
    trait State[S] {
    	def exists(): Boolean
      	def get(): S 			// throws Exception is state does not exist
    	def getOption(): Option[S]
    	def update(newState: S): Unit
    	def remove(): Unit		// exists() will be false after this
    }
    ```
    
    Key Semantics of the State class
    - The state can be null.
    - If the state.remove() is called, then state.exists() will return false, and getOption will returm None.
    - After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...).
    - None of the operations are thread-safe. This is to avoid memory barriers.
    
    *Usage*
    ```
    val stateFunc = (word: String, words: Iterator[String, runningCount: KeyedState[Long]) => {
        val newCount = words.size + runningCount.getOption.getOrElse(0L)
        runningCount.update(newCount)
       (word, newCount)
    }
    
    dataset					                        // type is Dataset[String]
      .groupByKey[String](w => w)        	                // generates KeyValueGroupedDataset[String, String]
      .mapGroupsWithState[Long, (String, Long)](stateFunc)	// returns Dataset[(String, Long)]
    ```
    
    New unit tests.
    
    Author: Tathagata Das <ta...@gmail.com>
    
    Closes #16758 from tdas/mapWithState.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/16850
  
    Merged. Could you close this PR, please?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16850
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72586/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16850
  
    **[Test build #3563 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3563/testReport)** for PR 16850 at commit [`5025cb7`](https://github.com/apache/spark/commit/5025cb7511a43e24cb3a181eb7b06c69b024479f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

Posted by wobuxiangtong <gi...@git.apache.org>.
Github user wobuxiangtong commented on the issue:

    https://github.com/apache/spark/pull/16850
  
    Excuse me:
        I am using mapwithstate to storage data in sparkstreaming。What confused me is that rememberDuration must more than checkpointDuration。I read the code of MapWithStateRDD and changed is as the follow question and have done some tests  which indicated that it does not matter even though hold only one RDD in storage and no data will lose。it will cut down a lot of memory if doing so like this.
        Maybe i am wrong. Could you help me? the question is here:
    https://stackoverflow.com/questions/47784439/what-will-happen-if-i-change-the-rememberduration-in-dstream-less-than-checkpoin


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/16850
  
    LGTM. Merging to 2.1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16850
  
    **[Test build #72560 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72560/testReport)** for PR 16850 at commit [`5025cb7`](https://github.com/apache/spark/commit/5025cb7511a43e24cb3a181eb7b06c69b024479f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16850
  
    **[Test build #72586 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72586/testReport)** for PR 16850 at commit [`5025cb7`](https://github.com/apache/spark/commit/5025cb7511a43e24cb3a181eb7b06c69b024479f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16850
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72560/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrar...

Posted by tdas <gi...@git.apache.org>.
Github user tdas closed the pull request at:

    https://github.com/apache/spark/pull/16850


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16850
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16850
  
    **[Test build #3563 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3563/testReport)** for PR 16850 at commit [`5025cb7`](https://github.com/apache/spark/commit/5025cb7511a43e24cb3a181eb7b06c69b024479f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/16850
  
    jenkins test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16850
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16850: [SPARK-19413][SS] MapGroupsWithState for arbitrary state...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16850
  
    **[Test build #72586 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72586/testReport)** for PR 16850 at commit [`5025cb7`](https://github.com/apache/spark/commit/5025cb7511a43e24cb3a181eb7b06c69b024479f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org