You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2023/03/15 06:54:00 UTC

[spark] branch master updated: [SPARK-42792][SS] Add support for WRITE_FLUSH_BYTES for RocksDB used in streaming stateful operators

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6259b97855d [SPARK-42792][SS] Add support for WRITE_FLUSH_BYTES for RocksDB used in streaming stateful operators
6259b97855d is described below

commit 6259b97855d623de8cbe1d669ed9fe968bddb197
Author: Anish Shrigondekar <an...@databricks.com>
AuthorDate: Wed Mar 15 15:53:44 2023 +0900

    [SPARK-42792][SS] Add support for WRITE_FLUSH_BYTES for RocksDB used in streaming stateful operators
    
    ### What changes were proposed in this pull request?
    Add support for WRITE_FLUSH_BYTES for RocksDB used in streaming stateful operators
    
    ### Why are the changes needed?
    Its useful to get this metric for bytes written during flush from RocksDB as part of the DB custom metrics. We propose to add this to the existing metrics that are collected. There is no additional overhead since we are just querying the internal ticker guage, similar to other metrics.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added unit test
    ```
    [info] Run completed in 45 seconds, 260 milliseconds.
    [info] Total number of tests run: 18
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 18, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    [success] Total time: 152 s (02:32), completed Mar 14, 2023, 3:43:41 PM
    ```
    
    Info log on executor:
    ```
    StateStoreId(opId=0,partId=3,name=default): Committed 2, stats = {"numCommittedKeys":4,"numUncommittedKeys":4,"totalMemUsageBytes":7818,"writeBatchMemUsageBytes":272,"totalSSTFilesBytes":2614,"nativeOpsHistograms":{"get":{"sum":14,"avg":7.0,"stddev":1.0,"media
    n":6.0,"p95":8.0,"p99":8.0,"count":2},"put":{"sum":37966,"avg":37966.0,"stddev":0.0,"median":37966.0,"p95":37966.0,"p99":37966.0,"count":1},"compaction":{"sum":0,"avg":0.0,"stddev":0.0,"median":0.0,"p95":0.0,"p99":0.0,"count":0}},"lastCommitLatencyMs":{"fileSync":188,"writeBatch":37,"flush":61,"pause":0,"checkpoint":61,"compact":0}
    ,"filesCopied":1,"bytesCopied":1280,"filesReused":1,"zipFileBytesUncompressed":7675,"nativeOpsMetrics":{"writerStallDuration":0,"totalBytesReadThroughIterator":254,"totalBytesWrittenByFlush":1490,"readBlockCacheHitCount":2,"totalBytesWrittenByCompaction":0,"readBlockCacheMissCount":0,"totalBytesReadByCompaction":0,"totalBytesWritte
    n":272,"totalBytesRead":73}}
    ```
    
    Info log on driver:
    ```
        "customMetrics" : {
          "rocksdbBytesCopied" : 2544,
          "rocksdbCommitCheckpointLatency" : 416,
          "rocksdbCommitCompactLatency" : 0,
          "rocksdbCommitFileSyncLatencyMs" : 742,
          "rocksdbCommitFlushLatency" : 194,
          "rocksdbCommitPauseLatency" : 0,
          "rocksdbCommitWriteBatchLatency" : 132,
          "rocksdbFilesCopied" : 2,
          "rocksdbFilesReused" : 2,
          "rocksdbGetCount" : 4,
          "rocksdbGetLatency" : 0,
          "rocksdbPutCount" : 5,
          "rocksdbPutLatency" : 132,
          "rocksdbReadBlockCacheHitCount" : 4,
          "rocksdbReadBlockCacheMissCount" : 0,
          "rocksdbSstFileSize" : 5143,
          "rocksdbTotalBytesRead" : 138,
          "rocksdbTotalBytesReadByCompaction" : 0,
          "rocksdbTotalBytesReadThroughIterator" : 714,
          "rocksdbTotalBytesWritten" : 548,
          "rocksdbTotalBytesWrittenByCompaction" : 0,
          "rocksdbTotalBytesWrittenByFlush" : 2948,
          "rocksdbTotalCompactionLatencyMs" : 0,
          "rocksdbWriterStallLatencyMs" : 0,
          "rocksdbZipFileBytesUncompressed" : 36542
        }
      } ],
    ```
    
    Closes #40427 from anishshri-db/task/SPARK-42792.
    
    Authored-by: Anish Shrigondekar <an...@databricks.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala | 4 +++-
 .../execution/streaming/state/RocksDBStateStoreProvider.scala    | 9 +++++++--
 .../streaming/state/RocksDBStateStoreIntegrationSuite.scala      | 2 +-
 .../spark/sql/execution/streaming/state/RocksDBSuite.scala       | 2 ++
 4 files changed, 13 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 4ce4a03822e..89872afb80e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -403,7 +403,9 @@ class RocksDB(
       /** Number of bytes read during compaction */
       "totalBytesReadByCompaction" -> COMPACT_READ_BYTES,
       /** Number of bytes written during compaction */
-      "totalBytesWrittenByCompaction" -> COMPACT_WRITE_BYTES
+      "totalBytesWrittenByCompaction" -> COMPACT_WRITE_BYTES,
+      /** Number of bytes written during flush */
+      "totalBytesWrittenByFlush" -> FLUSH_WRITE_BYTES
     ).toMap
     val nativeOpsMetrics = nativeOpsMetricTickers.mapValues { typ =>
       nativeStats.getTickerCount(typ)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 79614df6299..3a128561b1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -142,7 +142,8 @@ private[sql] class RocksDBStateStoreProvider
         CUSTOM_METRIC_STALL_TIME -> nativeOpsLatencyMillis("writerStallDuration"),
         CUSTOM_METRIC_TOTAL_COMPACT_TIME -> sumNativeOpsLatencyMillis("compaction"),
         CUSTOM_METRIC_COMPACT_READ_BYTES -> nativeOpsMetrics("totalBytesReadByCompaction"),
-        CUSTOM_METRIC_COMPACT_WRITTEN_BYTES -> nativeOpsMetrics("totalBytesWrittenByCompaction")
+        CUSTOM_METRIC_COMPACT_WRITTEN_BYTES -> nativeOpsMetrics("totalBytesWrittenByCompaction"),
+        CUSTOM_METRIC_FLUSH_WRITTEN_BYTES -> nativeOpsMetrics("totalBytesWrittenByFlush")
       ) ++ rocksDBMetrics.zipFileBytesUncompressed.map(bytes =>
         Map(CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED -> bytes)).getOrElse(Map())
 
@@ -296,6 +297,10 @@ object RocksDBStateStoreProvider {
   val CUSTOM_METRIC_COMPACT_WRITTEN_BYTES = StateStoreCustomSizeMetric(
     "rocksdbTotalBytesWrittenByCompaction",
     "RocksDB: compaction - total bytes written by the compaction process")
+  val CUSTOM_METRIC_FLUSH_WRITTEN_BYTES = StateStoreCustomSizeMetric(
+    "rocksdbTotalBytesWrittenByFlush",
+    "RocksDB: flush - total bytes written by flush"
+  )
 
   // Total SST file size
   val CUSTOM_METRIC_SST_FILE_SIZE = StateStoreCustomSizeMetric(
@@ -310,6 +315,6 @@ object RocksDBStateStoreProvider {
     CUSTOM_METRIC_BLOCK_CACHE_MISS, CUSTOM_METRIC_BLOCK_CACHE_HITS, CUSTOM_METRIC_BYTES_READ,
     CUSTOM_METRIC_BYTES_WRITTEN, CUSTOM_METRIC_ITERATOR_BYTES_READ, CUSTOM_METRIC_STALL_TIME,
     CUSTOM_METRIC_TOTAL_COMPACT_TIME, CUSTOM_METRIC_COMPACT_READ_BYTES,
-    CUSTOM_METRIC_COMPACT_WRITTEN_BYTES
+    CUSTOM_METRIC_COMPACT_WRITTEN_BYTES, CUSTOM_METRIC_FLUSH_WRITTEN_BYTES
   )
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
index dc505963b4d..d1af0acb530 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
@@ -95,7 +95,7 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest {
               "rocksdbReadBlockCacheHitCount", "rocksdbReadBlockCacheMissCount",
               "rocksdbTotalBytesReadByCompaction", "rocksdbTotalBytesWrittenByCompaction",
               "rocksdbTotalCompactionLatencyMs", "rocksdbWriterStallLatencyMs",
-              "rocksdbTotalBytesReadThroughIterator"))
+              "rocksdbTotalBytesReadThroughIterator", "rocksdbTotalBytesWrittenByFlush"))
           }
         } finally {
           query.stop()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index cf0ff4808fc..417eff65482 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -544,6 +544,8 @@ class RocksDBSuite extends SparkFunSuite {
       assert(metrics.nativeOpsMetrics("writerStallDuration") >= 0)
       assert(metrics.nativeOpsMetrics("totalBytesReadByCompaction") >= 0)
       assert(metrics.nativeOpsMetrics("totalBytesWrittenByCompaction") >=0)
+
+      assert(metrics.nativeOpsMetrics("totalBytesWrittenByFlush") >= 0)
     }
 
     withTempDir { dir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org