You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by aalobaidi <gi...@git.apache.org> on 2018/06/05 18:47:24 UTC

[GitHub] spark pull request #21500: Scalable Memory option for HDFSBackedStateStore

GitHub user aalobaidi opened a pull request:

    https://github.com/apache/spark/pull/21500

    Scalable Memory option for HDFSBackedStateStore

    More scalable memory management for HDFSBackedStateStore. This is controlled by a configuration (`spark.sql.streaming.stateStore.unloadAfterCommit`), if enabled HDFSBackedStateStore will unload state after commit. 
    
    ## What changes were proposed in this pull request?
    
    (Please fill in changes proposed in this fix)
    
    ## How was this patch tested?
    
    This is been tested manually but need unit tests.
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aalobaidi/spark Memory-HDFSBackedStateStore

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21500.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21500
    
----
commit fecbc23ec47c30d69d58bcfa3573751b5432eba9
Author: Ahmed Al-Obaidi <ah...@...>
Date:   2018-03-31T19:43:45Z

    Scalable Memory option for HDFSBackedStateStore
    
    Allow configuration option to unload loadedMaps from memory after commit

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    I agree that current cache approach may consume excessive memory unnecessarily, and that's also same to my finding in #21469. 
    
    The issue is not that simple however, because in micro-batch mode, each batch should read previous version of state, otherwise it should read from file system, in worst case seeking and reading multiple files in remote file system. So previous version of state is encouraged to be available in memory.
    
    There're three cases here (please add if I'm missing here): 1. fail before commit 2. committed but batch failed afterwards 3. committed and batch succeeds. It might be better to think about all the cases.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by aalobaidi <gi...@git.apache.org>.
Github user aalobaidi commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    @HeartSaVioR 
    
    1. As I mentioned before, this option is beneficial for use cases with bigger micro-batches. This way the overhead of loading the state from disk will be spread across large number of events in each task. The overall throughput will not be affected as bad as loading the state for small number of events. It is typical space-time tradeoff. Using less memory space will result in increase in execution time. 
    
    2. Currently, as far as I know, there is no way for a Spark to handle state that is bigger than the total size of the cluster memory. With this option, users can set the number of partitions (using `spark.sql.shuffle.partitions`)  to be, for example, 10 times the number of total cores in the cluster. The result will be the cluster will load only ~10% of the state at any given time. 
    
    3. By default, the option is disabled and will not change the current behavior. Also, enabling and disabling this option doesn't require rebuilding the state. Users will be able test easily and decide if the performance impact is acceptable or not.
    
    4. In future implementations, we can have an executor-wide state manager, that will evict state partitions from memory only when needed. 
    
    Also, I agree that a better solution should be developed, maybe using RocksDB. But for the time being and for this store implementation, this will enable one extra use case with very little code to maintain. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by aalobaidi <gi...@git.apache.org>.
Github user aalobaidi commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    Sorry for the late reply. The option is useful for specific use case which is micro-batches with relatively large number partitions with each of the partitions is very big in size. When this option is enabled, Spark will load the state of a partition from disk, process all events belonging to the partition and then commit the new state (delta) to disk and unloaded the entire partition state from memory. And go to the next partition(task). This way each executor will keep in memory the state of the partitions running concurrently as opposite to keeping all the state of all partitions executed.
    
    You can control the balance between memory usage and IOs by setting `spark.sql.shuffle.partitions` (should be set before the first run of the query).
    
    I did JVM profiling and benchmarks with 5M events micro-batchs of total state of ~600M key 6 nodes EMR cluster. The memory usage was much better (in fact the default behavior failed with less than 200M key) and performance wasn't affected significantly. (I will have to compile more specific numbers).
    
    @HeartSaVioR brings a good point regarding state compaction (snapshots). I can’t confirm if compactions was working or not during the test, I will have to get back to you guys about this.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    @aalobaidi 
    I just would like to see the benefit of unloading the version of state which is expected to be read from the next batch. Totally I agree current mechanism of cache is excessive, but we can still avoid reloading in every batch. Are you considering multiple stages which executor is encouraged to clean up memory as much as it can, despite of redundant reloading state?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    @aalobaidi 
    When starting batch, latest version state is being read to start a new version of state. If the state should be restored from snapshot as well as delta files, it will incur huge latency on restoring.
    
    #21506 logs messages when loading state requires dealing with (remote) filesystem. That's why I suggest to merge my patch and run your case again.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by aalobaidi <gi...@git.apache.org>.
Github user aalobaidi commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    I can confirm that snapshots are still being built normally with no issue. 
    
    @HeartSaVioR not sure why executor must load at least 1 version of state in memory. Could you elaborate? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by aalobaidi <gi...@git.apache.org>.
Github user aalobaidi commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    @tdas this is the change I mentioned in our chat in SparkSummit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    Retaining versions of state is also relevant to do snapshotting the last version in files: HDFSBackedStateStoreProvider doesn't snapshot if the version doesn't exist in loadedMaps. So we may want to check whether this option also works with current approach of snapshotting.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    @TomaszGaweda @aalobaidi 
    Please correct me if I'm missing here.
    
    From every start of batch, state store loads previous version of state so that it can be read and written. If we unload all the version "after committing" the cache will no longer contain previous version of state and it will try to load the state via reading files, adding huge latency on starting batch. That's why I stated about three cases before to avoid loading state from files when starting a new batch.
    
    Please apply #21469 manually and see how much HDFSBackedStateStoreProvider consumes memory due to storing multiple versions (it will show the state size on the latest version as well as overall state size in cache). Please also observe and provide numbers of latency to show how much it is and how much it will be after the patch. We always have to ask ourselves that we are addressing the issue correctly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by TomaszGaweda <gi...@git.apache.org>.
Github user TomaszGaweda commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    @HeartSaVioR IMHO we should consider new state provider such as RocksDB, like Flink and Databricks Delta did. It is not a direct fix, but will improve latency and memory consumption, maybe additional management on Spark side won't be required


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    Clearing the map after each commit might make things worse, since the maps needs to be loaded from the snapshot + delta files for the next micro-batch. Setting `spark.sql.streaming.minBatchesToRetain` to a lower value might address the memory consumption to some extend. 
    
    Maybe we need to explore how to avoid maintaining multiple copies of the state in memory within HDFS state store or even explore Rocks DB for incremental checkpointing.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    @aalobaidi 
    You can also merge #21506 (maybe with changing log level or modify the patch to set message to INFO level) and see latencies on loading state, snapshotting, cleaning up.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    @aalobaidi 
    One thing you may want to be aware is that in point of executor's view, executor must load at least 1 version of state in memory regardless of caching versions. I guess you may get better result if you unload entire cache but leaving the last version you just committed. Cache miss will occur for one of three cases `2. committed but batch failed afterwards` but it will happen rarely and still better than cache miss from two of three cases (2 and 3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/21500
  
    After enabling option, I've observed small expected latency whenever starting batch per each partition per each batch. Median/average was 4~50 ms for my case, but max latency was a bit higher than 700 ms.
    
    Please note that state size in my experiment is not that super huge, so if partition has much bigger state the latency could be much higher: 
    
    ```
    memory used by state total (min, med, max): 812.6 KB (2.1 KB, 4.1 KB, 4.1 KB)
    time to commit changes total (min, med, max): 13.5 s (21 ms, 35 ms, 449 ms)
    total time to remove rows total (min, med, max): 22 ms (22 ms, 22 ms, 22 ms)
    number of updated state rows: 5,692
    total time to update rows total (min, med, max): 1.4 s (3 ms, 5 ms, 42 ms)
    ```
    
    As I explained earlier, loading the last version from files brings avoidable latency.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org