You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "anishshri-db (via GitHub)" <gi...@apache.org> on 2024/03/01 19:34:40 UTC

[PR] [SPARK-47250] Refactor streaming operator dir layout and fix some errors in RocksDB state store provider [spark]

anishshri-db opened a new pull request, #45360:
URL: https://github.com/apache/spark/pull/45360

   ### What changes were proposed in this pull request?
   Refactor streaming operator dir layout and fix some errors in RocksDB state store provider
   
   
   ### Why are the changes needed?
   Fixing the layout structure for stateless/stateful operators and fixing misc bugs/NERF errors in RocksDB state store provider
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Unit tests
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45360:
URL: https://github.com/apache/spark/pull/45360#discussion_r1513631712


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -246,25 +246,35 @@ class RocksDB(
     colFamilyNameToHandleMap.contains(colFamilyName)
   }
 
-  private def verifyColFamilyExists(colFamilyName: String): Unit = {
-    if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
-      throw new RuntimeException(s"Column family with name=$colFamilyName does not exist")
+  private def verifyColFamilyExists(operationName: String, colFamilyName: String): Unit = {
+    if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+      if (!useColumnFamilies) {
+        throw StateStoreErrors.unsupportedOperationException(operationName,
+          "RocksDBStateStoreProvider and multiple column families disabled")
+      }
+
+      if (!checkColFamilyExists(colFamilyName)) {
+        throw StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName,
+          colFamilyName)
+      }
     }
   }
 
   /**
    * Create RocksDB column family, if not created already
    */
   def createColFamilyIfAbsent(colFamilyName: String): Unit = {
-    if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
-      throw new SparkUnsupportedOperationException(
-        errorClass = "_LEGACY_ERROR_TEMP_3197",
-        messageParameters = Map("colFamilyName" -> colFamilyName).toMap)
+    // Remove leading and trailing whitespaces
+    val cfName = colFamilyName.trim

Review Comment:
   Hmm - user facing APIs shouldn't actually call state store operations directly by passing the name. The assumption is we should never have col families created with leading/trailing spaces. Do you think we should add an assert to verify the same and throw an exception ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #45360:
URL: https://github.com/apache/spark/pull/45360#issuecomment-1984994353

   Is this good to go? @HeartSaVioR @rangadi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #45360: [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families
URL: https://github.com/apache/spark/pull/45360


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45360:
URL: https://github.com/apache/spark/pull/45360#discussion_r1513624528


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -246,25 +246,35 @@ class RocksDB(
     colFamilyNameToHandleMap.contains(colFamilyName)
   }
 
-  private def verifyColFamilyExists(colFamilyName: String): Unit = {
-    if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
-      throw new RuntimeException(s"Column family with name=$colFamilyName does not exist")
+  private def verifyColFamilyExists(operationName: String, colFamilyName: String): Unit = {
+    if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+      if (!useColumnFamilies) {
+        throw StateStoreErrors.unsupportedOperationException(operationName,
+          "RocksDBStateStoreProvider and multiple column families disabled")

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45360:
URL: https://github.com/apache/spark/pull/45360#discussion_r1518475424


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -582,7 +636,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
         assert(iterator(db, colFamily2).isEmpty)
       }
       assert(ex.isInstanceOf[RuntimeException])
-      assert(ex.getMessage.contains("does not exist"))
+      assert(ex.getMessage.contains("missing column family"))

Review Comment:
   ditto



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -536,6 +536,67 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithColumnFamilies(s"RocksDB: column family creation with invalid names",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete() // to make sure that the directory gets created
+
+    val conf = RocksDBConf().copy()
+    withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db =>
+      Seq("default", "", " ", "    ", " default", " default ").foreach { colFamilyName =>
+        val ex = intercept[Exception] {

Review Comment:
   Verifying exception belongs to error class framework needs be done with checkError.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -536,6 +536,67 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithColumnFamilies(s"RocksDB: column family creation with invalid names",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete() // to make sure that the directory gets created
+
+    val conf = RocksDBConf().copy()
+    withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db =>
+      Seq("default", "", " ", "    ", " default", " default ").foreach { colFamilyName =>
+        val ex = intercept[Exception] {
+          db.createColFamilyIfAbsent(colFamilyName)
+        }
+        ex.getCause.isInstanceOf[UnsupportedOperationException]
+      }
+    }
+  }
+
+  private def verifyStoreOperationUnsupported(
+      operationName: String)
+      (testFn: => Unit): Unit = {
+    val ex = intercept[UnsupportedOperationException] {

Review Comment:
   ditto



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -134,6 +134,46 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     }
   }
 
+  private def verifyStoreOperationUnsupported()(testFn: => Unit): Unit = {
+    val ex = intercept[UnsupportedOperationException] {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

Posted by "sahnib (via GitHub)" <gi...@apache.org>.
sahnib commented on code in PR #45360:
URL: https://github.com/apache/spark/pull/45360#discussion_r1513594152


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -246,25 +246,35 @@ class RocksDB(
     colFamilyNameToHandleMap.contains(colFamilyName)
   }
 
-  private def verifyColFamilyExists(colFamilyName: String): Unit = {
-    if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
-      throw new RuntimeException(s"Column family with name=$colFamilyName does not exist")
+  private def verifyColFamilyExists(operationName: String, colFamilyName: String): Unit = {
+    if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+      if (!useColumnFamilies) {
+        throw StateStoreErrors.unsupportedOperationException(operationName,
+          "RocksDBStateStoreProvider and multiple column families disabled")
+      }
+
+      if (!checkColFamilyExists(colFamilyName)) {
+        throw StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName,
+          colFamilyName)
+      }
     }
   }
 
   /**
    * Create RocksDB column family, if not created already
    */
   def createColFamilyIfAbsent(colFamilyName: String): Unit = {
-    if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
-      throw new SparkUnsupportedOperationException(
-        errorClass = "_LEGACY_ERROR_TEMP_3197",
-        messageParameters = Map("colFamilyName" -> colFamilyName).toMap)
+    // Remove leading and trailing whitespaces
+    val cfName = colFamilyName.trim

Review Comment:
   This trimming also needs to happen when the cx issues a put/get etc against the column family. I dont think thats happening today. In that case, would this lead to inconsistencies? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -246,25 +246,35 @@ class RocksDB(
     colFamilyNameToHandleMap.contains(colFamilyName)
   }
 
-  private def verifyColFamilyExists(colFamilyName: String): Unit = {
-    if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
-      throw new RuntimeException(s"Column family with name=$colFamilyName does not exist")
+  private def verifyColFamilyExists(operationName: String, colFamilyName: String): Unit = {
+    if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+      if (!useColumnFamilies) {
+        throw StateStoreErrors.unsupportedOperationException(operationName,
+          "RocksDBStateStoreProvider and multiple column families disabled")

Review Comment:
   can be reworded as - `multiple column families disabled in RocksDBStateStoreProvider`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45360:
URL: https://github.com/apache/spark/pull/45360#discussion_r1514964881


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -246,25 +246,35 @@ class RocksDB(
     colFamilyNameToHandleMap.contains(colFamilyName)
   }
 
-  private def verifyColFamilyExists(colFamilyName: String): Unit = {
-    if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
-      throw new RuntimeException(s"Column family with name=$colFamilyName does not exist")
+  private def verifyColFamilyExists(operationName: String, colFamilyName: String): Unit = {
+    if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+      if (!useColumnFamilies) {
+        throw StateStoreErrors.unsupportedOperationException(operationName,
+          "RocksDBStateStoreProvider and multiple column families disabled")
+      }
+
+      if (!checkColFamilyExists(colFamilyName)) {
+        throw StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName,
+          colFamilyName)
+      }
     }
   }
 
   /**
    * Create RocksDB column family, if not created already
    */
   def createColFamilyIfAbsent(colFamilyName: String): Unit = {
-    if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
-      throw new SparkUnsupportedOperationException(
-        errorClass = "_LEGACY_ERROR_TEMP_3197",
-        messageParameters = Map("colFamilyName" -> colFamilyName).toMap)
+    // Remove leading and trailing whitespaces
+    val cfName = colFamilyName.trim

Review Comment:
   Modified to check and throw an exception if some invariants are not met for both the col family operations case and the creation/deletion case 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #45360:
URL: https://github.com/apache/spark/pull/45360#issuecomment-1974249820

   @sahnib @HeartSaVioR - PTAL, thx !
   
   @HeartSaVioR - let me know if you are ok with the proposed dir layout changes and also if you prefer them in a separate PR. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45360:
URL: https://github.com/apache/spark/pull/45360#issuecomment-1985157244

   Will review sooner than later. Maybe by today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45360:
URL: https://github.com/apache/spark/pull/45360#discussion_r1520296947


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -582,7 +636,7 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
         assert(iterator(db, colFamily2).isEmpty)
       }
       assert(ex.isInstanceOf[RuntimeException])
-      assert(ex.getMessage.contains("does not exist"))
+      assert(ex.getMessage.contains("missing column family"))

Review Comment:
   Done



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -134,6 +134,46 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     }
   }
 
+  private def verifyStoreOperationUnsupported()(testFn: => Unit): Unit = {
+    val ex = intercept[UnsupportedOperationException] {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45360:
URL: https://github.com/apache/spark/pull/45360#discussion_r1520296736


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -536,6 +536,67 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithColumnFamilies(s"RocksDB: column family creation with invalid names",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete() // to make sure that the directory gets created
+
+    val conf = RocksDBConf().copy()
+    withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db =>
+      Seq("default", "", " ", "    ", " default", " default ").foreach { colFamilyName =>
+        val ex = intercept[Exception] {

Review Comment:
   Done



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -536,6 +536,67 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithColumnFamilies(s"RocksDB: column family creation with invalid names",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete() // to make sure that the directory gets created
+
+    val conf = RocksDBConf().copy()
+    withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db =>
+      Seq("default", "", " ", "    ", " default", " default ").foreach { colFamilyName =>
+        val ex = intercept[Exception] {
+          db.createColFamilyIfAbsent(colFamilyName)
+        }
+        ex.getCause.isInstanceOf[UnsupportedOperationException]
+      }
+    }
+  }
+
+  private def verifyStoreOperationUnsupported(
+      operationName: String)
+      (testFn: => Unit): Unit = {
+    val ex = intercept[UnsupportedOperationException] {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45360:
URL: https://github.com/apache/spark/pull/45360#issuecomment-1990520156

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org