You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2017/05/25 11:11:07 UTC

[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/18107

    [SPARK-20883][SPARK-20376][SS] Refactored StateStore APIs and added conf to choose implementation

    
    ## What changes were proposed in this pull request?
    
    A bunch of changes to the StateStore APIs and implementation.
    Current state store API has a bunch of problems that causes too many transient objects causing memory pressure.
    - `StateStore.get(): Option` forces creation of Some/None objects for every get. Changed this to return the row or null.
    - `StateStore.iterator(): (UnsafeRow, UnsafeRow)` forces creation of new tuple for each record returned. Changed this to return a UnsafeRowTuple which can be reused across records.
    - `StateStore.updates()` requires the implementation to keep track of updates, while this is used minimally (only by Append mode in streaming aggregations). Removed this.
    
    Additionally,
    - Added a configuration that allows the user to specify which implementation to use. 
    - Added new metrics to understand the time taken to update keys, remove keys and commit all changes to the state store. These metrics will be visible on the plan diagram in the SQL tab of the UI.
    - Refactored unit tests such that they can be reused to test any implementation of StateStore.
    
    ## How was this patch tested?
    Old and new unit tests


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-20376

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18107.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18107
    
----
commit 03f5bf3f1fc4e6d60b43d7c05a3cdc6dddcbd1af
Author: Tathagata Das <ta...@gmail.com>
Date:   2017-05-25T10:55:09Z

    Refactored StateStore APIs and added conf to choose StateStore
    implementation

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77546/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #77402 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77402/testReport)** for PR 18107 at commit [`3e49621`](https://github.com/apache/spark/commit/3e496217811755f756a9acaabb686200d0be15f1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class StateStoreId(`
      * `case class UnsafeRowPair(var key: UnsafeRow = null, var value: UnsafeRow = null) `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77402/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r119014948
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala ---
    @@ -165,54 +189,88 @@ case class StateStoreSaveExec(
         child.execute().mapPartitionsWithStateStore(
           getStateId.checkpointLocation,
           getStateId.operatorId,
    +      storeName = "default",
           getStateId.batchId,
           keyExpressions.toStructType,
           child.output.toStructType,
    +      indexOrdinal = None,
           sqlContext.sessionState,
           Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
             val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
             val numOutputRows = longMetric("numOutputRows")
             val numTotalStateRows = longMetric("numTotalStateRows")
             val numUpdatedStateRows = longMetric("numUpdatedStateRows")
    +        val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
    +        val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
    +        val commitTimeMs = longMetric("commitTimeMs")
     
             outputMode match {
               // Update and output all rows in the StateStore.
               case Some(Complete) =>
    -            while (iter.hasNext) {
    -              val row = iter.next().asInstanceOf[UnsafeRow]
    -              val key = getKey(row)
    -              store.put(key.copy(), row.copy())
    -              numUpdatedStateRows += 1
    +            allUpdatesTimeMs += timeTakenMs {
    +              while (iter.hasNext) {
    +                val row = iter.next().asInstanceOf[UnsafeRow]
    +                val key = getKey(row)
    +                store.put(key, row)
    +                numUpdatedStateRows += 1
    +              }
    +            }
    +            allRemovalsTimeMs += 0
    +            commitTimeMs += timeTakenMs {
    +              store.commit()
                 }
    -            store.commit()
                 numTotalStateRows += store.numKeys()
    -            store.iterator().map { case (k, v) =>
    +            store.iterator().map { case UnsafeRowPair(_, v) =>
                   numOutputRows += 1
                   v.asInstanceOf[InternalRow]
                 }
     
               // Update and output only rows being evicted from the StateStore
    +          // Assumption: watermark predicates must be non-empty if append mode is allowed
               case Some(Append) =>
    -            while (iter.hasNext) {
    -              val row = iter.next().asInstanceOf[UnsafeRow]
    -              val key = getKey(row)
    -              store.put(key.copy(), row.copy())
    -              numUpdatedStateRows += 1
    +            allUpdatesTimeMs += timeTakenMs {
    +              val filteredIter = iter.filter(row => !watermarkPredicateForData.get.eval(row))
    +              while (filteredIter.hasNext) {
    +                val row = filteredIter.next().asInstanceOf[UnsafeRow]
    +                val key = getKey(row)
    +                store.put(key, row)
    +                numUpdatedStateRows += 1
    +              }
                 }
     
    -            // Assumption: Append mode can be done only when watermark has been specified
    -            store.remove(watermarkPredicateForKeys.get.eval _)
    -            store.commit()
    +            val removalStartTime = System.currentTimeMillis
    +            val rangeIter = store.getRange(None, None)
    +
    +            new NextIterator[InternalRow] {
    +              override protected def getNext(): InternalRow = {
    +                var removedValueRow: InternalRow = null
    +                while(rangeIter.hasNext && removedValueRow == null) {
    +                  val UnsafeRowPair(keyRow, valueRow) = rangeIter.next()
    +                  if (watermarkPredicateForKeys.get.eval(keyRow)) {
    +                    store.remove(keyRow)
    +                    removedValueRow = valueRow
    +                  }
    +                }
    +                if (removedValueRow == null) {
    +                  finished = true
    +                  null
    +                } else {
    +                  removedValueRow
    +                }
    +              }
     
    -            numTotalStateRows += store.numKeys()
    -            store.updates().filter(_.isInstanceOf[ValueRemoved]).map { removed =>
    -              numOutputRows += 1
    -              removed.value.asInstanceOf[InternalRow]
    +              override protected def close(): Unit = {
    +                allRemovalsTimeMs += System.currentTimeMillis - removalStartTime
    +                commitTimeMs += timeTakenMs { store.commit() }
    +                numTotalStateRows += store.numKeys()
    +              }
                 }
     
               // Update and output modified rows from the StateStore.
               case Some(Update) =>
     
    +            val updatesStartTimeMs = System.currentTimeMillis
    --- End diff --
    
    nit: please use `nanoTime`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #77546 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77546/testReport)** for PR 18107 at commit [`baba63d`](https://github.com/apache/spark/commit/baba63d68adff67b99d651225083a89353ecb154).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #77402 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77402/testReport)** for PR 18107 at commit [`3e49621`](https://github.com/apache/spark/commit/3e496217811755f756a9acaabb686200d0be15f1).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #77547 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77547/testReport)** for PR 18107 at commit [`5c0961c`](https://github.com/apache/spark/commit/5c0961cf302ae7a02bfa603a8ddd0ccfa54a062c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #77362 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77362/testReport)** for PR 18107 at commit [`03f5bf3`](https://github.com/apache/spark/commit/03f5bf3f1fc4e6d60b43d7c05a3cdc6dddcbd1af).
     * This patch **fails build dependency tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class StateStoreId(`
      * `case class StateStoreStats()`
      * `case class UnsafeRowTuple(var key: UnsafeRow = null, var value: UnsafeRow = null) `
      * `trait StateStoreWriter extends StatefulOperator `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #77363 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77363/testReport)** for PR 18107 at commit [`d645b41`](https://github.com/apache/spark/commit/d645b416ddd79b56c00bb443569de4c7af5de4fb).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class StateStoreId(`
      * `case class StateStoreStats()`
      * `case class UnsafeRowTuple(var key: UnsafeRow = null, var value: UnsafeRow = null) `
      * `trait StateStoreWriter extends StatefulOperator `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r119014924
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala ---
    @@ -165,54 +189,88 @@ case class StateStoreSaveExec(
         child.execute().mapPartitionsWithStateStore(
           getStateId.checkpointLocation,
           getStateId.operatorId,
    +      storeName = "default",
           getStateId.batchId,
           keyExpressions.toStructType,
           child.output.toStructType,
    +      indexOrdinal = None,
           sqlContext.sessionState,
           Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
             val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
             val numOutputRows = longMetric("numOutputRows")
             val numTotalStateRows = longMetric("numTotalStateRows")
             val numUpdatedStateRows = longMetric("numUpdatedStateRows")
    +        val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
    +        val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
    +        val commitTimeMs = longMetric("commitTimeMs")
     
             outputMode match {
               // Update and output all rows in the StateStore.
               case Some(Complete) =>
    -            while (iter.hasNext) {
    -              val row = iter.next().asInstanceOf[UnsafeRow]
    -              val key = getKey(row)
    -              store.put(key.copy(), row.copy())
    -              numUpdatedStateRows += 1
    +            allUpdatesTimeMs += timeTakenMs {
    +              while (iter.hasNext) {
    +                val row = iter.next().asInstanceOf[UnsafeRow]
    +                val key = getKey(row)
    +                store.put(key, row)
    +                numUpdatedStateRows += 1
    +              }
    +            }
    +            allRemovalsTimeMs += 0
    +            commitTimeMs += timeTakenMs {
    +              store.commit()
                 }
    -            store.commit()
                 numTotalStateRows += store.numKeys()
    -            store.iterator().map { case (k, v) =>
    +            store.iterator().map { case UnsafeRowPair(_, v) =>
                   numOutputRows += 1
                   v.asInstanceOf[InternalRow]
                 }
     
               // Update and output only rows being evicted from the StateStore
    +          // Assumption: watermark predicates must be non-empty if append mode is allowed
               case Some(Append) =>
    -            while (iter.hasNext) {
    -              val row = iter.next().asInstanceOf[UnsafeRow]
    -              val key = getKey(row)
    -              store.put(key.copy(), row.copy())
    -              numUpdatedStateRows += 1
    +            allUpdatesTimeMs += timeTakenMs {
    +              val filteredIter = iter.filter(row => !watermarkPredicateForData.get.eval(row))
    +              while (filteredIter.hasNext) {
    +                val row = filteredIter.next().asInstanceOf[UnsafeRow]
    +                val key = getKey(row)
    +                store.put(key, row)
    +                numUpdatedStateRows += 1
    +              }
                 }
     
    -            // Assumption: Append mode can be done only when watermark has been specified
    -            store.remove(watermarkPredicateForKeys.get.eval _)
    -            store.commit()
    +            val removalStartTime = System.currentTimeMillis
    +            val rangeIter = store.getRange(None, None)
    +
    +            new NextIterator[InternalRow] {
    +              override protected def getNext(): InternalRow = {
    +                var removedValueRow: InternalRow = null
    +                while(rangeIter.hasNext && removedValueRow == null) {
    +                  val UnsafeRowPair(keyRow, valueRow) = rangeIter.next()
    +                  if (watermarkPredicateForKeys.get.eval(keyRow)) {
    +                    store.remove(keyRow)
    +                    removedValueRow = valueRow
    +                  }
    +                }
    +                if (removedValueRow == null) {
    +                  finished = true
    +                  null
    +                } else {
    +                  removedValueRow
    +                }
    +              }
     
    -            numTotalStateRows += store.numKeys()
    -            store.updates().filter(_.isInstanceOf[ValueRemoved]).map { removed =>
    -              numOutputRows += 1
    -              removed.value.asInstanceOf[InternalRow]
    +              override protected def close(): Unit = {
    +                allRemovalsTimeMs += System.currentTimeMillis - removalStartTime
    --- End diff --
    
    nit: please use `nanoTime`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r118465966
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ---
    @@ -47,31 +60,25 @@ trait StateStore {
       /** Version of the data in this store before committing updates. */
       def version: Long
     
    -  /** Get the current value of a key. */
    -  def get(key: UnsafeRow): Option[UnsafeRow]
    -
       /**
    -   * Return an iterator of key-value pairs that satisfy a certain condition.
    -   * Note that the iterator must be fail-safe towards modification to the store, that is,
    -   * it must be based on the snapshot of store the time of this call, and any change made to the
    -   * store while iterating through iterator should not cause the iterator to fail or have
    -   * any affect on the values in the iterator.
    +   * Get the current value of a key.
        */
    -  def filter(condition: (UnsafeRow, UnsafeRow) => Boolean): Iterator[(UnsafeRow, UnsafeRow)]
    -
    -  /** Put a new value for a key. */
    -  def put(key: UnsafeRow, value: UnsafeRow): Unit
    +  def get(key: UnsafeRow): UnsafeRow
     
       /**
    -   * Remove keys that match the following condition.
    +   * Put a new value for a key.
    +   * @return Previous value of the key or null.
        */
    -  def remove(condition: UnsafeRow => Boolean): Unit
    +  def put(key: UnsafeRow, value: UnsafeRow): Unit
     
       /**
        * Remove a single key.
    +   * @return Previous value of the removed key or null.
        */
       def remove(key: UnsafeRow): Unit
     
    +  def getRange(start: Option[UnsafeRow], end: Option[UnsafeRow]): Iterator[UnsafeRowTuple]
    --- End diff --
    
    add docs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #3755 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3755/testReport)** for PR 18107 at commit [`d645b41`](https://github.com/apache/spark/commit/d645b416ddd79b56c00bb443569de4c7af5de4fb).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77363/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #77363 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77363/testReport)** for PR 18107 at commit [`d645b41`](https://github.com/apache/spark/commit/d645b416ddd79b56c00bb443569de4c7af5de4fb).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r119014976
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala ---
    @@ -273,27 +333,34 @@ case class StreamingDeduplicateExec(
         child.execute().mapPartitionsWithStateStore(
           getStateId.checkpointLocation,
           getStateId.operatorId,
    +      storeName = "default",
           getStateId.batchId,
           keyExpressions.toStructType,
           child.output.toStructType,
    +      indexOrdinal = None,
           sqlContext.sessionState,
           Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
           val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
           val numOutputRows = longMetric("numOutputRows")
           val numTotalStateRows = longMetric("numTotalStateRows")
           val numUpdatedStateRows = longMetric("numUpdatedStateRows")
    +      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
    +      val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
    +      val commitTimeMs = longMetric("commitTimeMs")
     
           val baseIterator = watermarkPredicateForData match {
             case Some(predicate) => iter.filter(row => !predicate.eval(row))
             case None => iter
           }
     
    +      val updatesStartTimeMs = System.currentTimeMillis
    --- End diff --
    
    nit: please use `nanoTime`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77549/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r118465998
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ---
    @@ -105,25 +102,42 @@ trait StateStore {
     /** Trait representing a provider of a specific version of a [[StateStore]]. */
     trait StateStoreProvider {
     
    -  /** Get the store with the existing version. */
    +  def init(
    +      stateStoreId: StateStoreId,
    +      keySchema: StructType,
    +      valueSchema: StructType,
    +      indexOrdinal: Option[Int], // for sorting the data
    +      storeConfs: StateStoreConf,
    +      hadoopConf: Configuration): Unit
    +
    +  def id: StateStoreId
    +
    +  def close(): Unit
    +
       def getStore(version: Long): StateStore
     
       /** Optional method for providers to allow for background maintenance */
       def doMaintenance(): Unit = { }
     }
     
    -
    -/** Trait representing updates made to a [[StateStore]]. */
    -sealed trait StoreUpdate {
    -  def key: UnsafeRow
    -  def value: UnsafeRow
    +object StateStoreProvider {
    +  def instantiate(
    --- End diff --
    
    add docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #77386 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77386/testReport)** for PR 18107 at commit [`324fc24`](https://github.com/apache/spark/commit/324fc24f4b9e6c6aa31df45955afa7ca5471ab63).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #77362 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77362/testReport)** for PR 18107 at commit [`03f5bf3`](https://github.com/apache/spark/commit/03f5bf3f1fc4e6d60b43d7c05a3cdc6dddcbd1af).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Thanks! Merging to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r119172218
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -828,6 +837,8 @@ class SQLConf extends Serializable with Logging {
     
       def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
     
    +  def stateStoreProviderClass: Option[String] = getConf(STATE_STORE_PROVIDER_CLASS)
    --- End diff --
    
    Good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r118465084
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -552,6 +552,15 @@ object SQLConf {
         .booleanConf
         .createWithDefault(true)
     
    +  val STATE_STORE_PROVIDER_CLASS =
    +    buildConf("spark.sql.streaming.stateStore.providerClass")
    +      .internal()
    +      .doc(
    +        "Minimum number of state store delta files that needs to be generated before they " +
    --- End diff --
    
    update this description


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r119014982
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala ---
    @@ -304,8 +371,9 @@ case class StreamingDeduplicateExec(
           }
     
           CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
    -        watermarkPredicateForKeys.foreach(f => store.remove(f.eval _))
    -        store.commit()
    +        allUpdatesTimeMs += System.currentTimeMillis - updatesStartTimeMs
    --- End diff --
    
    nit: please use `nanoTime`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r118615627
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala ---
    @@ -61,11 +60,24 @@ trait StateStoreReader extends StatefulOperator {
     }
     
     /** An operator that writes to a StateStore. */
    -trait StateStoreWriter extends StatefulOperator {
    +trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
    +
       override lazy val metrics = Map(
         "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
         "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"),
    -    "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"))
    +    "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"),
    +    "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "total time to update rows"),
    +    "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "total time to remove rows"),
    +    "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to commit changes")
    +  )
    +
    +  /** Records the duration of running `body` for the next query progress update. */
    +  protected def timeTakenMs(body: => Unit): Long = {
    +    val startTime = System.currentTimeMillis
    --- End diff --
    
    nit: Use `nanoTime` instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r118595928
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---
    @@ -508,22 +508,6 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
         expectedState = Some(5),                                  // state should change
         expectedTimeoutTimestamp = 5000)                          // timestamp should change
     
    -  test("StateStoreUpdater - rows are cloned before writing to StateStore") {
    --- End diff --
    
    This is not needed any more as the operator is not responsible for cloning the rows when writing to the store.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77547/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/18107


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r119014965
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala ---
    @@ -253,6 +311,8 @@ case class StateStoreSaveExec(
       override def output: Seq[Attribute] = child.output
     
       override def outputPartitioning: Partitioning = child.outputPartitioning
    +
    --- End diff --
    
    nit: extra empty lines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77362/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r118466139
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala ---
    @@ -165,54 +189,87 @@ case class StateStoreSaveExec(
         child.execute().mapPartitionsWithStateStore(
           getStateId.checkpointLocation,
           getStateId.operatorId,
    +      storeName = "default",
           getStateId.batchId,
           keyExpressions.toStructType,
           child.output.toStructType,
    +      indexOrdinal = None,
           sqlContext.sessionState,
           Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
             val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
             val numOutputRows = longMetric("numOutputRows")
             val numTotalStateRows = longMetric("numTotalStateRows")
             val numUpdatedStateRows = longMetric("numUpdatedStateRows")
    +        val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
    +        val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
    +        val commitTimeMs = longMetric("commitTimeMs")
     
             outputMode match {
               // Update and output all rows in the StateStore.
               case Some(Complete) =>
    -            while (iter.hasNext) {
    -              val row = iter.next().asInstanceOf[UnsafeRow]
    -              val key = getKey(row)
    -              store.put(key.copy(), row.copy())
    -              numUpdatedStateRows += 1
    +            allUpdatesTimeMs += timeTakenMs {
    +              while (iter.hasNext) {
    +                val row = iter.next().asInstanceOf[UnsafeRow]
    +                val key = getKey(row)
    +                store.put(key, row)
    +                numUpdatedStateRows += 1
    +              }
    +            }
    +            allRemovalsTimeMs += 0
    +            commitTimeMs += timeTakenMs {
    +              store.commit()
                 }
    -            store.commit()
                 numTotalStateRows += store.numKeys()
    -            store.iterator().map { case (k, v) =>
    +            store.iterator().map { case UnsafeRowTuple(_, v) =>
                   numOutputRows += 1
                   v.asInstanceOf[InternalRow]
                 }
     
               // Update and output only rows being evicted from the StateStore
    +          // Assumption: watermark predicates must be non-empty if append mode is allowed
               case Some(Append) =>
    -            while (iter.hasNext) {
    -              val row = iter.next().asInstanceOf[UnsafeRow]
    -              val key = getKey(row)
    -              store.put(key.copy(), row.copy())
    -              numUpdatedStateRows += 1
    +            allUpdatesTimeMs += timeTakenMs {
    --- End diff --
    
    this update is to accommodate for removal of StateStore.updates()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #3755 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3755/testReport)** for PR 18107 at commit [`d645b41`](https://github.com/apache/spark/commit/d645b416ddd79b56c00bb443569de4c7af5de4fb).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class StateStoreId(`
      * `case class StateStoreStats()`
      * `case class UnsafeRowTuple(var key: UnsafeRow = null, var value: UnsafeRow = null) `
      * `trait StateStoreWriter extends StatefulOperator `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    LGTM pending tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #77386 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77386/testReport)** for PR 18107 at commit [`324fc24`](https://github.com/apache/spark/commit/324fc24f4b9e6c6aa31df45955afa7ca5471ab63).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #77546 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77546/testReport)** for PR 18107 at commit [`baba63d`](https://github.com/apache/spark/commit/baba63d68adff67b99d651225083a89353ecb154).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class UnsafeRowPair(var key: UnsafeRow = null, var value: UnsafeRow = null) `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r118465937
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ---
    @@ -29,12 +29,25 @@ import org.apache.spark.SparkEnv
     import org.apache.spark.internal.Logging
     import org.apache.spark.sql.catalyst.expressions.UnsafeRow
     import org.apache.spark.sql.types.StructType
    -import org.apache.spark.util.ThreadUtils
    +import org.apache.spark.util.{ThreadUtils, Utils}
     
     
     /** Unique identifier for a [[StateStore]] */
    -case class StateStoreId(checkpointLocation: String, operatorId: Long, partitionId: Int)
    -
    +case class StateStoreId(
    +    checkpointLocation: String,
    +    operatorId: Long,
    +    partitionId: Int,
    +    name: String = "")
    +
    +case class StateStoreStats()
    --- End diff --
    
    remove this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #77549 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77549/testReport)** for PR 18107 at commit [`fdfdcab`](https://github.com/apache/spark/commit/fdfdcabacbec7cd1c1040dcd0c86d74e417c2f5e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r119172029
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala ---
    @@ -165,54 +189,88 @@ case class StateStoreSaveExec(
         child.execute().mapPartitionsWithStateStore(
           getStateId.checkpointLocation,
           getStateId.operatorId,
    +      storeName = "default",
           getStateId.batchId,
           keyExpressions.toStructType,
           child.output.toStructType,
    +      indexOrdinal = None,
           sqlContext.sessionState,
           Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
             val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
             val numOutputRows = longMetric("numOutputRows")
             val numTotalStateRows = longMetric("numTotalStateRows")
             val numUpdatedStateRows = longMetric("numUpdatedStateRows")
    +        val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
    +        val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
    +        val commitTimeMs = longMetric("commitTimeMs")
     
             outputMode match {
               // Update and output all rows in the StateStore.
               case Some(Complete) =>
    -            while (iter.hasNext) {
    -              val row = iter.next().asInstanceOf[UnsafeRow]
    -              val key = getKey(row)
    -              store.put(key.copy(), row.copy())
    -              numUpdatedStateRows += 1
    +            allUpdatesTimeMs += timeTakenMs {
    +              while (iter.hasNext) {
    +                val row = iter.next().asInstanceOf[UnsafeRow]
    +                val key = getKey(row)
    +                store.put(key, row)
    +                numUpdatedStateRows += 1
    +              }
    +            }
    +            allRemovalsTimeMs += 0
    +            commitTimeMs += timeTakenMs {
    +              store.commit()
                 }
    -            store.commit()
                 numTotalStateRows += store.numKeys()
    -            store.iterator().map { case (k, v) =>
    +            store.iterator().map { case UnsafeRowPair(_, v) =>
                   numOutputRows += 1
                   v.asInstanceOf[InternalRow]
                 }
     
               // Update and output only rows being evicted from the StateStore
    +          // Assumption: watermark predicates must be non-empty if append mode is allowed
               case Some(Append) =>
    -            while (iter.hasNext) {
    -              val row = iter.next().asInstanceOf[UnsafeRow]
    -              val key = getKey(row)
    -              store.put(key.copy(), row.copy())
    -              numUpdatedStateRows += 1
    +            allUpdatesTimeMs += timeTakenMs {
    +              val filteredIter = iter.filter(row => !watermarkPredicateForData.get.eval(row))
    +              while (filteredIter.hasNext) {
    +                val row = filteredIter.next().asInstanceOf[UnsafeRow]
    +                val key = getKey(row)
    +                store.put(key, row)
    +                numUpdatedStateRows += 1
    +              }
                 }
     
    -            // Assumption: Append mode can be done only when watermark has been specified
    -            store.remove(watermarkPredicateForKeys.get.eval _)
    -            store.commit()
    +            val removalStartTime = System.currentTimeMillis
    +            val rangeIter = store.getRange(None, None)
    +
    +            new NextIterator[InternalRow] {
    +              override protected def getNext(): InternalRow = {
    +                var removedValueRow: InternalRow = null
    +                while(rangeIter.hasNext && removedValueRow == null) {
    +                  val UnsafeRowPair(keyRow, valueRow) = rangeIter.next()
    --- End diff --
    
    That is true! I had assumed unapply will get desugared into something simple, but its probably best to not to rely on the Scala compiler so much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r119016141
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala ---
    @@ -165,54 +189,88 @@ case class StateStoreSaveExec(
         child.execute().mapPartitionsWithStateStore(
           getStateId.checkpointLocation,
           getStateId.operatorId,
    +      storeName = "default",
           getStateId.batchId,
           keyExpressions.toStructType,
           child.output.toStructType,
    +      indexOrdinal = None,
           sqlContext.sessionState,
           Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
             val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
             val numOutputRows = longMetric("numOutputRows")
             val numTotalStateRows = longMetric("numTotalStateRows")
             val numUpdatedStateRows = longMetric("numUpdatedStateRows")
    +        val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
    +        val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
    +        val commitTimeMs = longMetric("commitTimeMs")
     
             outputMode match {
               // Update and output all rows in the StateStore.
               case Some(Complete) =>
    -            while (iter.hasNext) {
    -              val row = iter.next().asInstanceOf[UnsafeRow]
    -              val key = getKey(row)
    -              store.put(key.copy(), row.copy())
    -              numUpdatedStateRows += 1
    +            allUpdatesTimeMs += timeTakenMs {
    +              while (iter.hasNext) {
    +                val row = iter.next().asInstanceOf[UnsafeRow]
    +                val key = getKey(row)
    +                store.put(key, row)
    +                numUpdatedStateRows += 1
    +              }
    +            }
    +            allRemovalsTimeMs += 0
    +            commitTimeMs += timeTakenMs {
    +              store.commit()
                 }
    -            store.commit()
                 numTotalStateRows += store.numKeys()
    -            store.iterator().map { case (k, v) =>
    +            store.iterator().map { case UnsafeRowPair(_, v) =>
                   numOutputRows += 1
                   v.asInstanceOf[InternalRow]
                 }
     
               // Update and output only rows being evicted from the StateStore
    +          // Assumption: watermark predicates must be non-empty if append mode is allowed
               case Some(Append) =>
    -            while (iter.hasNext) {
    -              val row = iter.next().asInstanceOf[UnsafeRow]
    -              val key = getKey(row)
    -              store.put(key.copy(), row.copy())
    -              numUpdatedStateRows += 1
    +            allUpdatesTimeMs += timeTakenMs {
    +              val filteredIter = iter.filter(row => !watermarkPredicateForData.get.eval(row))
    +              while (filteredIter.hasNext) {
    +                val row = filteredIter.next().asInstanceOf[UnsafeRow]
    +                val key = getKey(row)
    +                store.put(key, row)
    +                numUpdatedStateRows += 1
    +              }
                 }
     
    -            // Assumption: Append mode can be done only when watermark has been specified
    -            store.remove(watermarkPredicateForKeys.get.eval _)
    -            store.commit()
    +            val removalStartTime = System.currentTimeMillis
    +            val rangeIter = store.getRange(None, None)
    +
    +            new NextIterator[InternalRow] {
    +              override protected def getNext(): InternalRow = {
    +                var removedValueRow: InternalRow = null
    +                while(rangeIter.hasNext && removedValueRow == null) {
    +                  val UnsafeRowPair(keyRow, valueRow) = rangeIter.next()
    --- End diff --
    
    Case class's `unapply` will create a `Tuple`. You should not use this Scala syntactic sugar :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r119014486
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ---
    @@ -47,50 +44,54 @@ trait StateStore {
       /** Version of the data in this store before committing updates. */
       def version: Long
     
    -  /** Get the current value of a key. */
    -  def get(key: UnsafeRow): Option[UnsafeRow]
    -
       /**
    -   * Return an iterator of key-value pairs that satisfy a certain condition.
    -   * Note that the iterator must be fail-safe towards modification to the store, that is,
    -   * it must be based on the snapshot of store the time of this call, and any change made to the
    -   * store while iterating through iterator should not cause the iterator to fail or have
    -   * any affect on the values in the iterator.
    +   * Get the current value of a non-null key.
    --- End diff --
    
    nit: please mention that `null` means key doesn't exist.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r118608662
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -828,6 +837,8 @@ class SQLConf extends Serializable with Logging {
     
       def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
     
    +  def stateStoreProviderClass: Option[String] = getConf(STATE_STORE_PROVIDER_CLASS)
    --- End diff --
    
    Also add this to `StateStoreConf` for consistency?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r119014511
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ---
    @@ -47,50 +44,54 @@ trait StateStore {
       /** Version of the data in this store before committing updates. */
       def version: Long
     
    -  /** Get the current value of a key. */
    -  def get(key: UnsafeRow): Option[UnsafeRow]
    -
       /**
    -   * Return an iterator of key-value pairs that satisfy a certain condition.
    -   * Note that the iterator must be fail-safe towards modification to the store, that is,
    -   * it must be based on the snapshot of store the time of this call, and any change made to the
    -   * store while iterating through iterator should not cause the iterator to fail or have
    -   * any affect on the values in the iterator.
    +   * Get the current value of a non-null key.
        */
    -  def filter(condition: (UnsafeRow, UnsafeRow) => Boolean): Iterator[(UnsafeRow, UnsafeRow)]
    +  def get(key: UnsafeRow): UnsafeRow
     
    -  /** Put a new value for a key. */
    +  /**
    +   * Put a new value for a non-null key. Implementations must be aware that the UnsafeRows in
    +   * the params can be reused, and must make copies of the data as needed for persistence.
    +   * @note put cannot be done once
    --- End diff --
    
    Could you clarify it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r118802674
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -552,6 +552,15 @@ object SQLConf {
         .booleanConf
         .createWithDefault(true)
     
    +  val STATE_STORE_PROVIDER_CLASS =
    +    buildConf("spark.sql.streaming.stateStore.providerClass")
    +      .internal()
    +      .doc(
    +        "The class used to manage state data in stateful streaming queries. This class must" +
    +          "be a subclass of StateStoreProvider, and must have a zero-arg constructor.")
    --- End diff --
    
    nit: missing space before `be`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r118804254
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -719,3 +745,23 @@ object ThrowingInterruptedIOException {
        */
       @volatile var createSourceLatch: CountDownLatch = null
     }
    +
    +class TestStateStoreProvider extends StateStoreProvider {
    +
    +  override def init(
    +      stateStoreId: StateStoreId,
    +      keySchema: StructType,
    +      valueSchema: StructType,
    +      indexOrdinal: Option[Int],
    +      storeConfs: StateStoreConf,
    +      hadoopConf: Configuration): Unit = {
    +    throw new Exception("Successfully instantiated")
    +
    --- End diff --
    
    nit: extra empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/77386/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #77547 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77547/testReport)** for PR 18107 at commit [`5c0961c`](https://github.com/apache/spark/commit/5c0961cf302ae7a02bfa603a8ddd0ccfa54a062c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    **[Test build #77549 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/77549/testReport)** for PR 18107 at commit [`fdfdcab`](https://github.com/apache/spark/commit/fdfdcabacbec7cd1c1040dcd0c86d74e417c2f5e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r119172385
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ---
    @@ -47,50 +44,54 @@ trait StateStore {
       /** Version of the data in this store before committing updates. */
       def version: Long
     
    -  /** Get the current value of a key. */
    -  def get(key: UnsafeRow): Option[UnsafeRow]
    -
       /**
    -   * Return an iterator of key-value pairs that satisfy a certain condition.
    -   * Note that the iterator must be fail-safe towards modification to the store, that is,
    -   * it must be based on the snapshot of store the time of this call, and any change made to the
    -   * store while iterating through iterator should not cause the iterator to fail or have
    -   * any affect on the values in the iterator.
    +   * Get the current value of a non-null key.
    --- End diff --
    
    Good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateSt...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r119013976
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ---
    @@ -102,28 +103,100 @@ trait StateStore {
     }
     
     
    -/** Trait representing a provider of a specific version of a [[StateStore]]. */
    +/**
    + * Trait representing a provider that provide [[StateStore]] instances representing
    + * versions of state data.
    + *
    + * The life cycle of a provider and its provide stores are as follows.
    + *
    + * - A StateStoreProvider is created in a executor for each unique [[StateStoreId]] when
    + *   the first batch of a streaming query is executed on the executor. All subsequent batches reuse
    + *   this provider instance until the query is stopped.
    + *
    + * - Every batch of streaming data request a specific version of the state data by invoking
    + *   `getStore(version)` which returns an instance of [[StateStore]] through which the required
    + *   version of the data can be accessed. It is the responsible of the provider to populate
    + *   this store with context information like the schema of keys and values, etc.
    + *
    + * - After the streaming query is stopped, the created provider instances are lazily disposed off.
    + */
     trait StateStoreProvider {
     
    -  /** Get the store with the existing version. */
    +  /**
    +   * Initialize the provide with more contextual information from the SQL operator.
    +   * This method will be called first after creating an instance of the StateStoreProvider by
    +   * reflection.
    +   *
    +   * @param stateStoreId Id of the versioned StateStores that this provider will generate
    +   * @param keySchema Schema of keys to be stored
    +   * @param valueSchema Schema of value to be stored
    +   * @param keyIndexOrdinal Optional column (represent as the ordinal of the field in keySchema) by
    +   *                        which the StateStore implementation could index the data.
    +   * @param storeConfs Configurations used by the StateStores
    +   * @param hadoopConf Hadoop configuration that could be used by StateStore to save state data
    +   */
    +  def init(
    +      stateStoreId: StateStoreId,
    +      keySchema: StructType,
    +      valueSchema: StructType,
    +      keyIndexOrdinal: Option[Int], // for sorting the data by their keys
    +      storeConfs: StateStoreConf,
    +      hadoopConf: Configuration): Unit
    +
    +  /**
    +   * Return the id of the StateStores this provider will generate.
    +   * Should be the same as the one passed in init().
    +   */
    +  def id: StateStoreId
    +
    +  /** Called when the provider instance is unloaded from the executor */
    +  def close(): Unit
    +
    +  /** Return an instance of [[StateStore]] representing state data of the given version */
       def getStore(version: Long): StateStore
     
    -  /** Optional method for providers to allow for background maintenance */
    +  /** Optional method for providers to allow for background maintenance (e.g. compactions) */
       def doMaintenance(): Unit = { }
     }
     
    -
    -/** Trait representing updates made to a [[StateStore]]. */
    -sealed trait StoreUpdate {
    -  def key: UnsafeRow
    -  def value: UnsafeRow
    +object StateStoreProvider {
    +  /**
    +   * Return a provider instance of the given provider class.
    +   * The instance will be already initialized.
    +   */
    +  def instantiate(
    +      providerClass: String,
    +      stateStoreId: StateStoreId,
    +      keySchema: StructType,
    +      valueSchema: StructType,
    +      indexOrdinal: Option[Int], // for sorting the data
    +      storeConf: StateStoreConf,
    +      hadoopConf: Configuration): StateStoreProvider = {
    +    val provider = Utils.getContextOrSparkClassLoader
    --- End diff --
    
    nit: Use `Utils.classForName(providerClass)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18107: [SPARK-20883][SPARK-20376][SS] Refactored StateStore API...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18107
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org