You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "anishshri-db (via GitHub)" <gi...@apache.org> on 2024/03/13 20:07:24 UTC

[PR] [SPARK-47372] Add support for range scan based key encoder for stateful streaming using state provider [spark]

anishshri-db opened a new pull request, #45503:
URL: https://github.com/apache/spark/pull/45503

   ### What changes were proposed in this pull request?
   Add support for range scan based key encoder for stateful streaming using state provider
   
   
   ### Why are the changes needed?
   Changes are needed to allow range scan of fixed size initial cols especially with RocksDB state store provider
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Added unit tests
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536546941


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating whether the value
+  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   I'm less worried about overhead on adding a byte consistently per column, unless we will have 100s of columns.
   (I know it's 25% of overhead in numeric and 100% of overhead for boolean, which sounds like significant. But as we are here to support null case, probably better to not support partially but do full support.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536070026


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -77,9 +77,18 @@ class StatePartitionReader(
       stateStoreMetadata.head.numColsPrefixKey
     }
 
+    // TODO: currently we don't support RangeKeyScanStateEncoderSpec. Support for this will be

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1535025069


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -110,9 +120,6 @@ class PrefixKeyScanStateEncoder(
 
   import RocksDBStateEncoder._
 
-  require(keySchema.length > numColsPrefixKey, "The number of columns in the key must be " +

Review Comment:
   Nvm ignore this. I see we check this elsewhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536547158


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating whether the value
+  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   @neilramaswamy @sahnib Does the above proposal make sense to you as well? Just to make sure I'm not missing something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1531442175


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -158,14 +160,292 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan validation - invalid num columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+    // zero ordering cols
+    val ex1 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderType, 0, colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex1,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "numOrderingCols" -> "0"
+      ),
+      matchPVals = true
+    )
+
+    // ordering cols greater than schema cols
+    val ex2 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderType, keySchemaWithRangeScan.length + 1,
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex2,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "numOrderingCols" -> (keySchemaWithRangeScan.length + 1).toString
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan validation - variable sized columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+    val keySchemaWithVariableSizeCols: StructType = StructType(
+      Seq(StructField("key1", StringType, false), StructField("key2", StringType, false)))
+
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithVariableSizeCols,
+        RangeKeyScanStateEncoderType, 1, colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "fieldName" -> keySchemaWithVariableSizeCols.fields(0).name,
+        "index" -> "0"
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan - fixed size non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderType, 1, colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      // use non-default col family if column families are enabled
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName, RangeKeyScanStateEncoderType,
+          keySchemaWithRangeScan, numColsPrefixKey = 1, valueSchema)
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L)

Review Comment:
   yea it was being covered in those set of inputs. But added some explicit ones as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1531410661


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -305,9 +307,15 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
       throw StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", providerName)
     }
 
-    require((keySchema.length == 0 && numColsPrefixKey == 0) ||
-      (keySchema.length > numColsPrefixKey), "The number of columns in the key must be " +

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1531410814


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -110,8 +119,11 @@ class PrefixKeyScanStateEncoder(
 
   import RocksDBStateEncoder._
 
-  require(keySchema.length > numColsPrefixKey, "The number of columns in the key must be " +
-    "greater than the number of columns for prefix key!")
+  // Note that numColsPrefixKey have to be less than the number of columns in the key schema
+  // Range scan encoder allows for equality, but prefix key scan encoder does not
+  if (numColsPrefixKey == 0 || numColsPrefixKey >= keySchema.length) {
+    throw StateStoreErrors.incorrectNumOrderingColsNotSupported(numColsPrefixKey.toString)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "neilramaswamy (via GitHub)" <gi...@apache.org>.
neilramaswamy commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1533103809


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -39,13 +41,21 @@ sealed trait RocksDBValueStateEncoder {
 }
 
 object RocksDBStateEncoder {
-  def getKeyEncoder(
-      keySchema: StructType,
-      numColsPrefixKey: Int): RocksDBKeyStateEncoder = {
-    if (numColsPrefixKey > 0) {
-      new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey)
-    } else {
-      new NoPrefixKeyStateEncoder(keySchema)
+  def getKeyEncoder(keyStateEncoderSpec: KeyStateEncoderSpec): RocksDBKeyStateEncoder = {
+    // Return the key state encoder based on the requested type
+    keyStateEncoderSpec match {
+      case NoPrefixKeyStateEncoderSpec(keySchema) =>
+        new NoPrefixKeyStateEncoder(keySchema)
+
+      case PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) =>
+        new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey)
+
+      case RangeKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) =>

Review Comment:
   `numOrderingCols` ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +199,233 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size fields

Review Comment:
   Nowhere in this do we mention why we cannot range scan non-fixed size fields (I understand, but maybe future readers won't). It's not mentioned in the error message either. Can we do that?
   
   I think also a quick comment explaining how this actually works would be helpful, i.e. "To encode a row `r` for a range scan, we first project the first `numOrderingCols` needed for the range scan into an `UnsafeRow`; we then rewrite that `UnsafeRow`'s fields in big endian. Then, for the rest of the fields, we project those into an `UnsafeRow`. We then effectively join these latter two `UnsafeRow`s together, and finally take those bytes to get the resulting row." For example, NoPrefixKeyStateEncoder has a comment like this.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +199,233 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size fields
+ * Note that for range scan, we have to encode the ordering columns using BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key

Review Comment:
   nit typo



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -158,14 +161,360 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan validation - invalid num columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+    // zero ordering cols
+    val ex1 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 0),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex1,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN",
+      parameters = Map(
+        "numOrderingCols" -> "0"
+      ),
+      matchPVals = true
+    )
+
+    // ordering cols greater than schema cols
+    val ex2 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, keySchemaWithRangeScan.length + 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex2,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN",
+      parameters = Map(
+        "numOrderingCols" -> (keySchemaWithRangeScan.length + 1).toString
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan validation - variable sized columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+    val keySchemaWithVariableSizeCols: StructType = StructType(
+      Seq(StructField("key1", StringType, false), StructField("key2", StringType, false)))
+
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithVariableSizeCols,
+        RangeKeyScanStateEncoderSpec(keySchemaWithVariableSizeCols, 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "fieldName" -> keySchemaWithVariableSizeCols.fields(0).name,
+        "index" -> "0"
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan validation - null type columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+    val keySchemaWithNullTypeCols: StructType = StructType(
+      Seq(StructField("key1", NullType, false), StructField("key2", StringType, false)))
+
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithNullTypeCols,
+        RangeKeyScanStateEncoderSpec(keySchemaWithNullTypeCols, 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "fieldName" -> keySchemaWithNullTypeCols.fields(0).name,
+        "index" -> "0"
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan - fixed size non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      // use non-default col family if column families are enabled
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts, "a")
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+      store.commit()
+
+      // test with a different set of power of 2 timestamps
+      val store1 = provider.getStore(1)
+      val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L)
+      timerTimestamps1.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts, "a")
+        val valueRow = dataToValueRow(1)
+        store1.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store1.get(keyRow, cfName)) === 1)
+      }
+
+      val result1 = store1.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - variable size non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of variable size
+        val keyRow = dataToKeyRowWithRangeScan(ts,
+          Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+      store.commit()
+
+      // test with a different set of power of 2 timestamps
+      val store1 = provider.getStore(1)
+      val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L)
+      timerTimestamps1.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts,
+          Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)
+        val valueRow = dataToValueRow(1)
+        store1.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store1.get(keyRow, cfName)) === 1)
+      }
+
+      val result1 = store1.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan multiple ordering columns - variable size " +
+    s"non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", LongType, false),
+        StructField("key2", IntegerType, false),
+        StructField("key3", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](LongType, IntegerType, StringType))
+
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 2))
+      }
+
+      val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1), (4200L, 68), (90L, 2000),
+        (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), (6L, 90118), (9L, 95118), (6L, 87210))
+      timerTimestamps.foreach { ts =>
+        // order by long col first and then by int col
+        val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, ts._2,
+          UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString))))
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val keyRow = kv.key
+        val key = (keyRow.getLong(0), keyRow.getInt(1), keyRow.getString(2))
+        (key._1, key._2)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan byte ordering column - variable size " +
+    s"non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", ByteType, false),
+        StructField("key2", IntegerType, false),
+        StructField("key3", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](ByteType, IntegerType, StringType))
+
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 2))
+      }
+
+      val timerTimestamps: Seq[(Byte, Int)] = Seq((0x33, 10), (0x1A, 40), (0x1F, 1), (0x01, 68),
+        (0x7F, 2000), (0x01, 27), (0x01, 394), (0x01, 5), (0x03, 980), (0x35, 2112),
+        (0x06, 90118), (0x09, 95118), (0x06, 87210))
+      timerTimestamps.foreach { ts =>
+        // order by byte col first and then by int col
+        val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, ts._2,
+          UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString))))
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result: Seq[(Byte, Int)] = store.iterator(cfName).map { kv =>
+        val keyRow = kv.key
+        val key = (keyRow.getByte(0), keyRow.getInt(1), keyRow.getString(2))
+        (key._1, key._2)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - ordering cols and key schema cols are same",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    // use the same schema as value schema for single col key schema
+    tryWithProviderResource(newStoreProvider(valueSchema,
+      RangeKeyScanStateEncoderSpec(valueSchema, 1), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          valueSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(valueSchema, 1))
+      }
+
+      val timerTimestamps = Seq(931, 8000, 452300, 4200, 90, 1, 2, 8, 3, 35, 6, 9, 5)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of variable size
+        val keyRow = dataToValueRow(ts)
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        valueRowToData(kv.key)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+
+      // also check for prefix scan
+      timerTimestamps.foreach { ts =>
+        val prefix = dataToValueRow(ts)
+        val result = store.prefixScan(prefix, cfName).map { kv =>
+          assert(valueRowToData(kv.value) === 1)
+          valueRowToData(kv.key)
+        }.toSeq
+        assert(result.size === 1)
+      }
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - with prefix scan",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 1L)
+      timerTimestamps.foreach { ts =>
+        (1 to 5).foreach { keyVal =>
+          val keyRow = dataToKeyRowWithRangeScan(ts, keyVal.toString)
+          val valueRow = dataToValueRow(1)
+          store.put(keyRow, valueRow, cfName)
+          assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+        }
+      }
+
+      timerTimestamps.foreach { ts =>
+        val prefix = dataToPrefixKeyRowWithRangeScan(ts)
+        val result = store.prefixScan(prefix, cfName).map { kv =>
+          assert(valueRowToData(kv.value) === 1)
+          val key = keyRowWithRangeScanToData(kv.key)
+          key._2
+        }.toSeq
+        assert(result.size === 5)

Review Comment:
   Since we're only asserting about the size, I _think_ a buggy implementation of `prefixScan`, where it performs a prefix scan incorrectly (i.e. based on some key other than `prefix`) would still pass. For example, if you wanted to scan based on `8000L` and it instead did a scan using some other prefix (like `931L`), this test would still pass.
   
   Instead of asserting it's always equal to 5, what if, for timer at index `i` in `timerTimestamps`, we inserted `i+1` elements. Then, we would assert, after zipping the timestamps with index, that `results.size == i + 1`. With such a test, I don't think that a `prefixScan` implementation would be able to accidentally scan the wrong key and still pass this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536573406


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -212,6 +212,12 @@ class PrefixKeyScanStateEncoder(
  * We cannot support variable sized fields given the UnsafeRow format which stores variable
  * sized fields as offset and length pointers to the actual values, thereby changing the required
  * ordering.
+ * Note that we also support "null" values being passed for these fixed size fields. We prepend
+ * a single byte to indicate whether the column value is null or not. We cannot change the
+ * nullability on the UnsafeRow itself as the expected ordering would change if non-first
+ * columns are marked as null. If the first col is null, those entries will appear last in
+ * the iterator. If non-first columns are null, ordering based on the previous columns will

Review Comment:
   Not exactly sure what you mean -
   
   for eg - if you look at the test here - https://github.com/apache/spark/pull/45503/files#diff-4c6b19c847c68e25ffa927df85efb4c79f388648d8c6242f1fe9f84cf09ec5ffR449
   
   Suppose we have the sample input as:
   ```
   Seq((931L, 10), (40L, null), (452300L, 1), (1L, 304), (100L, null))
   ```
   
   when we iterate the elements (based on first col) should all be ordered as expected
   
   So we should get
   ```
   Seq((1L, 304), (40L, null), (100L, null), (931L, 10), (452300L, 1)
   ```
   
   So in this case the ordering on the first col will still apply which is what I was trying to convey
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536546941


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating whether the value
+  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   I'm less worried about overhead on adding a byte consistently per column, unless we will have 100s of columns.
   (I know it's 25% of overhead, which sounds like significant. But as we are here to support null case, probably better to not support partially but do full support.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #45503: [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider
URL: https://github.com/apache/spark/pull/45503


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536546801


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating whether the value
+  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   @anishshri-db 
   Maybe I wasn't very clear. What I meant was "consistently" prepending a single byte for nullability, regardless whether they are null or not. 
   
   If the value is null, set 0x00 to the null byte and fill 0x00 for remaining bytes.
   If the value is non-null, set 0x01 to the null byte and fill actual value for remaining bytes.
   
   In any case, `[null byte][actual bytes]` per range column.
   
   With this, you can keep the ordering even though there is null value in the middle of column. It's just that we decide in prior whether the ordering is "null first" vs "null last".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1537035515


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -212,6 +212,12 @@ class PrefixKeyScanStateEncoder(
  * We cannot support variable sized fields given the UnsafeRow format which stores variable
  * sized fields as offset and length pointers to the actual values, thereby changing the required
  * ordering.
+ * Note that we also support "null" values being passed for these fixed size fields. We prepend
+ * a single byte to indicate whether the column value is null or not. We cannot change the
+ * nullability on the UnsafeRow itself as the expected ordering would change if non-first
+ * columns are marked as null. If the first col is null, those entries will appear last in
+ * the iterator. If non-first columns are null, ordering based on the previous columns will

Review Comment:
   Done- added a new test, expanded the previous test and also updated the comment a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536573406


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -212,6 +212,12 @@ class PrefixKeyScanStateEncoder(
  * We cannot support variable sized fields given the UnsafeRow format which stores variable
  * sized fields as offset and length pointers to the actual values, thereby changing the required
  * ordering.
+ * Note that we also support "null" values being passed for these fixed size fields. We prepend
+ * a single byte to indicate whether the column value is null or not. We cannot change the
+ * nullability on the UnsafeRow itself as the expected ordering would change if non-first
+ * columns are marked as null. If the first col is null, those entries will appear last in
+ * the iterator. If non-first columns are null, ordering based on the previous columns will

Review Comment:
   Not exactly sure what you mean -
   
   for eg - if you look at the test here - https://github.com/apache/spark/pull/45503/files#diff-4c6b19c847c68e25ffa927df85efb4c79f388648d8c6242f1fe9f84cf09ec5ffR449
   
   Suppose we have the sample input as:
   ```
   Seq((931L, 10), (40L, null), (452300L, 1), (1L, 304), (100L, null))
   ```
   
   when we iterate the long elements (first col) should all be ordered as expected
   
   So we should get
   ```
   Seq(1L, 40L, 100L, 931L, 452300L)
   ```
   
   So in this case the ordering on the first col will still apply which is what I was trying to convey
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45503:
URL: https://github.com/apache/spark/pull/45503#issuecomment-2019383330

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #45503:
URL: https://github.com/apache/spark/pull/45503#issuecomment-2005829806

   Discussed offline about this
   
   > Using just a number of prefix columns means that we can never support both prefix and range scans. Ideally our session window implementation would do so, and I can think of a few more operators that would benefit from it. We don't have to do it in this PR, but want to make sure there's nothing preventing us from doing it in the future.
   
   And we can augment the prefix scan encoder to also allow for range scan for subsequent columns. But will make that change in a separate PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536546941


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating whether the value
+  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   I'm less worried about overhead on adding a byte consistently per column, unless we will have 100s of columns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536560785


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating whether the value
+  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   Ah sorry you're using same bbuf with one more byte. My bad I missed this. I agree you even don't need to add 0x00 after null bit if bytebuffer initializes all remaining bytes with 0 (or whatever consistent).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1531410457


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +204,234 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size fields
+ * Note that for range scan, we have to encode the ordering columns using BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key
+ */
+class RangeKeyScanStateEncoder(
+    keySchema: StructType,
+    numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+  import RocksDBStateEncoder._
+
+  // Verify that num cols specified for ordering are valid
+  // Note that ordering cols can be equal to number of key schema columns
+  if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+    throw StateStoreErrors.incorrectNumOrderingColsNotSupported(numOrderingCols.toString)
+  }
+
+  private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.take(numOrderingCols)
+  }
+
+  private def isFixedSize(dataType: DataType): Boolean = dataType match {
+    case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: LongType |
+      _: FloatType | _: DoubleType => true
+    case _ => false
+  }
+
+  // verify that only fixed sized columns are used for ordering
+  rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+    if (!isFixedSize(field.dataType)) {
+      throw StateStoreErrors.variableSizeOrderingColsNotSupported(field.name, idx.toString)
+    }
+  }
+

Review Comment:
   Done



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -41,11 +43,18 @@ sealed trait RocksDBValueStateEncoder {
 object RocksDBStateEncoder {
   def getKeyEncoder(
       keySchema: StructType,
-      numColsPrefixKey: Int): RocksDBKeyStateEncoder = {
-    if (numColsPrefixKey > 0) {
-      new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey)
-    } else {
-      new NoPrefixKeyStateEncoder(keySchema)
+      keyStateEncoderType: KeyStateEncoderType = NoPrefixKeyStateEncoderType,
+      numColsPrefixKey: Int = 0): RocksDBKeyStateEncoder = {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536586286


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -212,6 +212,12 @@ class PrefixKeyScanStateEncoder(
  * We cannot support variable sized fields given the UnsafeRow format which stores variable
  * sized fields as offset and length pointers to the actual values, thereby changing the required
  * ordering.
+ * Note that we also support "null" values being passed for these fixed size fields. We prepend
+ * a single byte to indicate whether the column value is null or not. We cannot change the
+ * nullability on the UnsafeRow itself as the expected ordering would change if non-first
+ * columns are marked as null. If the first col is null, those entries will appear last in
+ * the iterator. If non-first columns are null, ordering based on the previous columns will

Review Comment:
   What I meant is, the columns after null values should be still ordered. For the test, we should still be able to iterate the data of (null, 1), (null, 3), (null, 2) as ordered, (null, 1), (null, 2), (null, 3).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536546801


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating whether the value
+  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   @anishshri-db 
   Maybe I wasn't very clear. What I meant was "consistently" prepending a single byte for nullability, regardless whether they are null or not. 
   
   If the value is null, set 0x00 to the null byte and fill 0x00 for remaining bytes.
   If the value is non-null, set 0x01 to the null byte and fill actual value for remaining bytes.
   
   In any case, `[null byte][actual bytes]` per range column.
   
   With this, you can keep the ordering even though there is null value in the middle of column. It's just that we decide in prior whether the ordering is "null first" vs "null last" because the decision is stored "physically".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536546941


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating whether the value
+  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   I'm less worried about overhead on adding a byte consistently per column, unless we will have 100s of columns on ordering.
   (I know it's 25% of overhead in numeric and 100% of overhead for boolean, which sounds like significant. But as we are here to support null case, probably better not to support partially but do full support.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536546941


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating whether the value
+  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   I'm less worried about overhead on adding a byte consistently per column, unless we will have 100s of columns.
   (I know it's 25% of overhead in numeric and 100% of overhead for boolean, which sounds like significant. But as we are here to support null case, probably better not to support partially but do full support.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1531435034


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +204,234 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size fields
+ * Note that for range scan, we have to encode the ordering columns using BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key
+ */
+class RangeKeyScanStateEncoder(
+    keySchema: StructType,
+    numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+  import RocksDBStateEncoder._
+
+  // Verify that num cols specified for ordering are valid
+  // Note that ordering cols can be equal to number of key schema columns
+  if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+    throw StateStoreErrors.incorrectNumOrderingColsNotSupported(numOrderingCols.toString)
+  }
+
+  private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.take(numOrderingCols)
+  }
+
+  private def isFixedSize(dataType: DataType): Boolean = dataType match {
+    case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: LongType |

Review Comment:
   Added an explicit check for this and we will throw a error class exception. Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "neilramaswamy (via GitHub)" <gi...@apache.org>.
neilramaswamy commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1529271953


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -110,8 +119,11 @@ class PrefixKeyScanStateEncoder(
 
   import RocksDBStateEncoder._
 
-  require(keySchema.length > numColsPrefixKey, "The number of columns in the key must be " +
-    "greater than the number of columns for prefix key!")
+  // Note that numColsPrefixKey have to be less than the number of columns in the key schema
+  // Range scan encoder allows for equality, but prefix key scan encoder does not
+  if (numColsPrefixKey == 0 || numColsPrefixKey >= keySchema.length) {
+    throw StateStoreErrors.incorrectNumOrderingColsNotSupported(numColsPrefixKey.toString)

Review Comment:
   This error message will say that you can't have _more_ than `keySchema` number of cols for the prefix key, but in this particular case, it's possible that the user has less (i.e. `numColsPrefixKey == keySchema.length`).



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -158,14 +160,292 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan validation - invalid num columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+    // zero ordering cols
+    val ex1 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderType, 0, colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex1,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "numOrderingCols" -> "0"
+      ),
+      matchPVals = true
+    )
+
+    // ordering cols greater than schema cols
+    val ex2 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderType, keySchemaWithRangeScan.length + 1,
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex2,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "numOrderingCols" -> (keySchemaWithRangeScan.length + 1).toString
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan validation - variable sized columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+    val keySchemaWithVariableSizeCols: StructType = StructType(
+      Seq(StructField("key1", StringType, false), StructField("key2", StringType, false)))
+
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithVariableSizeCols,
+        RangeKeyScanStateEncoderType, 1, colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "fieldName" -> keySchemaWithVariableSizeCols.fields(0).name,
+        "index" -> "0"
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan - fixed size non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderType, 1, colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      // use non-default col family if column families are enabled
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName, RangeKeyScanStateEncoderType,
+          keySchemaWithRangeScan, numColsPrefixKey = 1, valueSchema)
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L)

Review Comment:
   For all of these unit tests, what is the methodology for picking these numbers? I see `931` and `800` and `452300` a lot... is there a significance?
   
   I think a test with more obviously "tricky" numbers can be useful. For example, with 32 and 64, the little endian encoding of 64 is less than the little endian encoding of 32 (i.e. 32 little endian is `0000 0010` and 64 little endian is `0000 0100`). It's possible that a case like this (`a < b` but `LE(a) > LE(b)`) exists "accidentally" in these numbers we have, but a simpler more explicit test could be good.
   



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +204,234 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size fields
+ * Note that for range scan, we have to encode the ordering columns using BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key
+ */
+class RangeKeyScanStateEncoder(
+    keySchema: StructType,
+    numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+  import RocksDBStateEncoder._
+
+  // Verify that num cols specified for ordering are valid
+  // Note that ordering cols can be equal to number of key schema columns
+  if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+    throw StateStoreErrors.incorrectNumOrderingColsNotSupported(numOrderingCols.toString)
+  }
+
+  private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.take(numOrderingCols)
+  }
+
+  private def isFixedSize(dataType: DataType): Boolean = dataType match {
+    case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: LongType |

Review Comment:
   The only other fixed size type I think is `NullType`, but since we control these schemas we shouldn't have it and it will just throw an error. Was that your thinking as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #45503:
URL: https://github.com/apache/spark/pull/45503#issuecomment-2013956039

   > Is this true? Do we just assume that rows should have non-null values, or if they do, we don't have any guarantee?
   Yes, assumption is that ordering cols have `nullable=false`. Otherwise ordering might not be as expected. Maybe I should add an explicit check for this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "neilramaswamy (via GitHub)" <gi...@apache.org>.
neilramaswamy commented on PR #45503:
URL: https://github.com/apache/spark/pull/45503#issuecomment-2013940433

   Just thought of a question around the null bitset. I believe the format of the encoded rows is as follows:
   
   `[length of range scan portion + version byte, version byte, range scan unsafe row contents, version byte, rest of row contents]`
   
   If the range scanner is comprised of two columns, perhaps `col1` and `col2`, if we have a row `r` for which `col1` is defined but `col2` is null, then doing a range scan of that will make the null bit-set of the range scan UnsafeRow greater than zero, which means that you will never get back anything, even though in that case you should probably get back all the records whose `col1` value is greater than `r["col1"]`.
   
   Is this true? Do we just assume that rows should have non-null values, or if they do, we don't have any guarantee?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1535006405


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -77,9 +77,18 @@ class StatePartitionReader(
       stateStoreMetadata.head.numColsPrefixKey
     }
 
+    // TODO: currently we don't support RangeKeyScanStateEncoderSpec. Support for this will be

Review Comment:
   Probably good to file a JIRA ticket for this and leave a ticket number. Could you please help filing one?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -110,9 +120,6 @@ class PrefixKeyScanStateEncoder(
 
   import RocksDBStateEncoder._
 
-  require(keySchema.length > numColsPrefixKey, "The number of columns in the key must be " +

Review Comment:
   The reason I added this here is that it's semantically making zero sense if keySchema.length == numColsPrefixKey. The caller should just use get and prefix scan isn't needed.
   
   What is the desired use case?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +199,241 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size fields
+ *
+ * To encode a row for range scan, we first project the first numOrderingCols needed
+ * for the range scan into an UnsafeRow; we then rewrite that UnsafeRow's fields in BIG_ENDIAN
+ * to allow for scanning keys in sorted order using the byte-wise comparison method that
+ * RocksDB uses.
+ * Then, for the rest of the fields, we project those into another UnsafeRow.
+ * We then effectively join these two UnsafeRows together, and finally take those bytes
+ * to get the resulting row.
+ * We cannot support variable sized fields given the UnsafeRow format which stores variable
+ * sized fields as offset and length pointers to the actual values, thereby changing the required
+ * ordering.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numOrderingCols - number of columns to be used for range scan
+ */
+class RangeKeyScanStateEncoder(
+    keySchema: StructType,
+    numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+  import RocksDBStateEncoder._
+
+  private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.take(numOrderingCols)
+  }
+
+  private def isFixedSize(dataType: DataType): Boolean = dataType match {
+    case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: LongType |
+      _: FloatType | _: DoubleType => true
+    case _ => false
+  }
+
+  // verify that only fixed sized columns are used for ordering
+  rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+    if (!isFixedSize(field.dataType)) {
+      // NullType is technically fixed size, but not supported for ordering
+      if (field.dataType == NullType) {
+        throw StateStoreErrors.nullTypeOrderingColsNotSupported(field.name, idx.toString)
+      } else {
+        throw StateStoreErrors.variableSizeOrderingColsNotSupported(field.name, idx.toString)
+      }
+    }
+  }
+
+  private val remainingKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.drop(numOrderingCols)
+  }
+
+  private val rangeScanKeyProjection: UnsafeProjection = {
+    val refs = rangeScanKeyFieldsWithIdx.map(x =>
+      BoundReference(x._2, x._1.dataType, x._1.nullable))
+    UnsafeProjection.create(refs)
+  }
+
+  private val remainingKeyProjection: UnsafeProjection = {
+    val refs = remainingKeyFieldsWithIdx.map(x =>
+      BoundReference(x._2, x._1.dataType, x._1.nullable))
+    UnsafeProjection.create(refs)
+  }
+
+  private val restoreKeyProjection: UnsafeProjection = UnsafeProjection.create(keySchema)
+
+  // Reusable objects
+  private val joinedRowOnKey = new JoinedRow()
+
+  private def extractPrefixKey(key: UnsafeRow): UnsafeRow = {
+    rangeScanKeyProjection(key)
+  }
+
+  // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
+  // using byte arrays.
+  private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
+    val writer = new UnsafeRowWriter(numOrderingCols)
+    writer.resetRowWriter()
+    rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+      val value = row.get(idx, field.dataType)
+      field.dataType match {
+        // endian-ness doesn't matter for single byte objects. so just write these
+        // types directly.
+        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
+        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
+
+        // for other multi-byte types, we need to convert to big-endian
+        case ShortType =>
+          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          bbuf.putShort(value.asInstanceOf[Short])
+          writer.write(idx, bbuf.array())
+
+        case IntegerType =>
+          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          bbuf.putInt(value.asInstanceOf[Int])
+          writer.write(idx, bbuf.array())
+
+        case LongType =>
+          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          bbuf.putLong(value.asInstanceOf[Long])
+          writer.write(idx, bbuf.array())
+
+        case FloatType =>
+          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          bbuf.putFloat(value.asInstanceOf[Float])
+          writer.write(idx, bbuf.array())
+
+        case DoubleType =>
+          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          bbuf.putDouble(value.asInstanceOf[Double])
+          writer.write(idx, bbuf.array())
+      }
+    }
+    writer.getRow().copy()

Review Comment:
   nit: is copy() needed?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -289,6 +289,26 @@ class InvalidUnsafeRowException(error: String)
     "among restart. For the first case, you can try to restart the application without " +
     s"checkpoint or use the legacy Spark version to process the streaming state.\n$error", null)
 
+sealed trait KeyStateEncoderSpec
+
+case class NoPrefixKeyStateEncoderSpec(keySchema: StructType) extends KeyStateEncoderSpec
+
+case class PrefixKeyScanStateEncoderSpec(
+    keySchema: StructType,
+    numColsPrefixKey: Int) extends KeyStateEncoderSpec {
+  if (numColsPrefixKey == 0 || numColsPrefixKey >= keySchema.length) {

Review Comment:
   Ah OK you check the equality in here, good.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +199,241 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size fields
+ *
+ * To encode a row for range scan, we first project the first numOrderingCols needed
+ * for the range scan into an UnsafeRow; we then rewrite that UnsafeRow's fields in BIG_ENDIAN
+ * to allow for scanning keys in sorted order using the byte-wise comparison method that
+ * RocksDB uses.
+ * Then, for the rest of the fields, we project those into another UnsafeRow.
+ * We then effectively join these two UnsafeRows together, and finally take those bytes
+ * to get the resulting row.
+ * We cannot support variable sized fields given the UnsafeRow format which stores variable
+ * sized fields as offset and length pointers to the actual values, thereby changing the required
+ * ordering.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numOrderingCols - number of columns to be used for range scan
+ */
+class RangeKeyScanStateEncoder(
+    keySchema: StructType,
+    numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+  import RocksDBStateEncoder._
+
+  private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.take(numOrderingCols)
+  }
+
+  private def isFixedSize(dataType: DataType): Boolean = dataType match {
+    case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: LongType |
+      _: FloatType | _: DoubleType => true
+    case _ => false
+  }
+
+  // verify that only fixed sized columns are used for ordering
+  rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+    if (!isFixedSize(field.dataType)) {
+      // NullType is technically fixed size, but not supported for ordering
+      if (field.dataType == NullType) {
+        throw StateStoreErrors.nullTypeOrderingColsNotSupported(field.name, idx.toString)
+      } else {
+        throw StateStoreErrors.variableSizeOrderingColsNotSupported(field.name, idx.toString)
+      }
+    }
+  }
+
+  private val remainingKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.drop(numOrderingCols)
+  }
+
+  private val rangeScanKeyProjection: UnsafeProjection = {
+    val refs = rangeScanKeyFieldsWithIdx.map(x =>
+      BoundReference(x._2, x._1.dataType, x._1.nullable))
+    UnsafeProjection.create(refs)
+  }
+
+  private val remainingKeyProjection: UnsafeProjection = {
+    val refs = remainingKeyFieldsWithIdx.map(x =>
+      BoundReference(x._2, x._1.dataType, x._1.nullable))
+    UnsafeProjection.create(refs)
+  }
+
+  private val restoreKeyProjection: UnsafeProjection = UnsafeProjection.create(keySchema)
+
+  // Reusable objects
+  private val joinedRowOnKey = new JoinedRow()
+
+  private def extractPrefixKey(key: UnsafeRow): UnsafeRow = {
+    rangeScanKeyProjection(key)
+  }
+
+  // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
+  // using byte arrays.
+  private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
+    val writer = new UnsafeRowWriter(numOrderingCols)
+    writer.resetRowWriter()
+    rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+      val value = row.get(idx, field.dataType)
+      field.dataType match {
+        // endian-ness doesn't matter for single byte objects. so just write these
+        // types directly.
+        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
+        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
+
+        // for other multi-byte types, we need to convert to big-endian
+        case ShortType =>
+          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          bbuf.putShort(value.asInstanceOf[Short])
+          writer.write(idx, bbuf.array())
+
+        case IntegerType =>
+          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          bbuf.putInt(value.asInstanceOf[Int])
+          writer.write(idx, bbuf.array())
+
+        case LongType =>
+          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          bbuf.putLong(value.asInstanceOf[Long])
+          writer.write(idx, bbuf.array())
+
+        case FloatType =>
+          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          bbuf.putFloat(value.asInstanceOf[Float])
+          writer.write(idx, bbuf.array())
+
+        case DoubleType =>
+          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          bbuf.putDouble(value.asInstanceOf[Double])
+          writer.write(idx, bbuf.array())
+      }
+    }
+    writer.getRow().copy()
+  }
+
+  // Rewrite the unsafe row by converting back from BIG_ENDIAN byte arrays to the
+  // original data types.
+  private def decodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
+    val writer = new UnsafeRowWriter(numOrderingCols)
+    writer.resetRowWriter()
+    rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+      val value = if (field.dataType == BooleanType || field.dataType == ByteType) {
+        row.get(idx, field.dataType)
+      } else {
+        row.getBinary(idx)
+      }
+
+      field.dataType match {
+        // for single byte types, read them directly
+        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
+        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
+
+        // for multi-byte types, convert from big-endian
+        case ShortType =>
+          val bbuf = ByteBuffer.wrap(value.asInstanceOf[Array[Byte]])
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          writer.write(idx, bbuf.getShort)
+
+        case IntegerType =>
+          val bbuf = ByteBuffer.wrap(value.asInstanceOf[Array[Byte]])
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          writer.write(idx, bbuf.getInt)
+
+        case LongType =>
+          val bbuf = ByteBuffer.wrap(value.asInstanceOf[Array[Byte]])
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          writer.write(idx, bbuf.getLong)
+
+        case FloatType =>
+          val bbuf = ByteBuffer.wrap(value.asInstanceOf[Array[Byte]])
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          writer.write(idx, bbuf.getFloat)
+
+        case DoubleType =>
+          val bbuf = ByteBuffer.wrap(value.asInstanceOf[Array[Byte]])
+          bbuf.order(ByteOrder.BIG_ENDIAN)
+          writer.write(idx, bbuf.getDouble)
+      }
+    }
+    writer.getRow().copy()

Review Comment:
   nit: ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536560188


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating whether the value
+  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   @HeartSaVioR - that is what I am doing right. If you check line above, I am always adding the `isNullCol` byte for each column.
   
   only the policy is inverted
   if value is null, set 0x01 to null byte and fill 0x00 for remaining bytes (currently i am writing 0x00 for the next byte - but even that's not required because byte buffer allocate will already initialize those bytes to null and regardless we won't use them)
   
   if the value is non-null, set 0x00 to null byte and fill remaining bytes to actual values



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1530919219


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +204,234 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size fields
+ * Note that for range scan, we have to encode the ordering columns using BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key
+ */
+class RangeKeyScanStateEncoder(
+    keySchema: StructType,
+    numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+  import RocksDBStateEncoder._
+
+  // Verify that num cols specified for ordering are valid
+  // Note that ordering cols can be equal to number of key schema columns
+  if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+    throw StateStoreErrors.incorrectNumOrderingColsNotSupported(numOrderingCols.toString)
+  }
+
+  private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.take(numOrderingCols)
+  }
+
+  private def isFixedSize(dataType: DataType): Boolean = dataType match {
+    case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: LongType |
+      _: FloatType | _: DoubleType => true
+    case _ => false
+  }
+
+  // verify that only fixed sized columns are used for ordering
+  rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+    if (!isFixedSize(field.dataType)) {
+      throw StateStoreErrors.variableSizeOrderingColsNotSupported(field.name, idx.toString)
+    }
+  }
+
+  private val remainingKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.drop(numOrderingCols)
+  }
+
+  private val rangeScanKeyProjection: UnsafeProjection = {
+    val refs = rangeScanKeyFieldsWithIdx.map(x =>
+      BoundReference(x._2, x._1.dataType, x._1.nullable))
+    UnsafeProjection.create(refs)
+  }
+
+  private val remainingKeyProjection: UnsafeProjection = {
+    val refs = remainingKeyFieldsWithIdx.map(x =>
+      BoundReference(x._2, x._1.dataType, x._1.nullable))
+    UnsafeProjection.create(refs)
+  }
+
+  private val restoreKeyProjection: UnsafeProjection = UnsafeProjection.create(keySchema)
+
+  // Reusable objects
+  private val joinedRowOnKey = new JoinedRow()
+
+  private def extractPrefixKey(key: UnsafeRow): UnsafeRow = {
+    rangeScanKeyProjection(key)
+  }
+
+  // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
+  // using byte arrays.
+  private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
+    val writer = new UnsafeRowWriter(numOrderingCols)
+    writer.resetRowWriter()
+    rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+      val value = row.get(idx, field.dataType)
+      field.dataType match {
+        // endian-ness doesn't matter for single byte objects. so just write these
+        // types directly.
+        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
+        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
+
+        // for other multi-byte types, we need to convert to big-endian
+        case ShortType =>
+          val bbuf = ByteBuffer.allocate(2)

Review Comment:
   Done



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -17,16 +17,18 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
+import java.nio.{ByteBuffer, ByteOrder}
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
 import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{STATE_ENCODING_NUM_VERSION_BYTES, STATE_ENCODING_VERSION}
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DoubleType, FloatType, IntegerType, LongType, ShortType, StructField, StructType}

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "sahnib (via GitHub)" <gi...@apache.org>.
sahnib commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1530503577


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -41,11 +43,18 @@ sealed trait RocksDBValueStateEncoder {
 object RocksDBStateEncoder {
   def getKeyEncoder(
       keySchema: StructType,
-      numColsPrefixKey: Int): RocksDBKeyStateEncoder = {
-    if (numColsPrefixKey > 0) {
-      new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey)
-    } else {
-      new NoPrefixKeyStateEncoder(keySchema)
+      keyStateEncoderType: KeyStateEncoderType = NoPrefixKeyStateEncoderType,
+      numColsPrefixKey: Int = 0): RocksDBKeyStateEncoder = {

Review Comment:
   `numColsPrefixKey` applies to both RangeScan and PrefixKey Encoder now. I think the name `numColsPrefixKey` is somewhat confusing because it does not convey that it applies to RangeScan. 
   
   In one way, RangeScan relies on creating a prefix key, where encoding scheme preserves order and doing range operations on it.
   
   [This is probably a large change but worth considering] Should we create a KeyEncoderSpec trait which allows the user to provide the encoder required spec as PrefixKeyEncoderSpec, NoPrefixKeyEncoderSpec, RangeScanKeyEncoderSpec. Each corresponding object can have fields like `numColsPrefixKey` inside it and then we can case match on it. This would allow us to keep the extra required for the encoder inside the corresponding Spec object, rather than adding all these fields in the API. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +204,234 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size fields
+ * Note that for range scan, we have to encode the ordering columns using BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key
+ */
+class RangeKeyScanStateEncoder(
+    keySchema: StructType,
+    numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+  import RocksDBStateEncoder._
+
+  // Verify that num cols specified for ordering are valid
+  // Note that ordering cols can be equal to number of key schema columns
+  if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+    throw StateStoreErrors.incorrectNumOrderingColsNotSupported(numOrderingCols.toString)
+  }
+
+  private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.take(numOrderingCols)
+  }
+
+  private def isFixedSize(dataType: DataType): Boolean = dataType match {
+    case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: LongType |
+      _: FloatType | _: DoubleType => true
+    case _ => false
+  }
+
+  // verify that only fixed sized columns are used for ordering
+  rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+    if (!isFixedSize(field.dataType)) {
+      throw StateStoreErrors.variableSizeOrderingColsNotSupported(field.name, idx.toString)
+    }
+  }
+
+  private val remainingKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.drop(numOrderingCols)
+  }
+
+  private val rangeScanKeyProjection: UnsafeProjection = {
+    val refs = rangeScanKeyFieldsWithIdx.map(x =>
+      BoundReference(x._2, x._1.dataType, x._1.nullable))
+    UnsafeProjection.create(refs)
+  }
+
+  private val remainingKeyProjection: UnsafeProjection = {
+    val refs = remainingKeyFieldsWithIdx.map(x =>
+      BoundReference(x._2, x._1.dataType, x._1.nullable))
+    UnsafeProjection.create(refs)
+  }
+
+  private val restoreKeyProjection: UnsafeProjection = UnsafeProjection.create(keySchema)
+
+  // Reusable objects
+  private val joinedRowOnKey = new JoinedRow()
+
+  private def extractPrefixKey(key: UnsafeRow): UnsafeRow = {
+    rangeScanKeyProjection(key)
+  }
+
+  // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
+  // using byte arrays.
+  private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
+    val writer = new UnsafeRowWriter(numOrderingCols)
+    writer.resetRowWriter()
+    rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+      val value = row.get(idx, field.dataType)
+      field.dataType match {
+        // endian-ness doesn't matter for single byte objects. so just write these
+        // types directly.
+        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
+        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
+
+        // for other multi-byte types, we need to convert to big-endian
+        case ShortType =>
+          val bbuf = ByteBuffer.allocate(2)

Review Comment:
   [nit] we can use `field.dataType.defaultSize` rather than hard-coding it here. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -305,9 +307,15 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
       throw StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", providerName)
     }
 
-    require((keySchema.length == 0 && numColsPrefixKey == 0) ||
-      (keySchema.length > numColsPrefixKey), "The number of columns in the key must be " +

Review Comment:
   We still want to validate that keySchema has more fields than numColsPrefixKey. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -41,11 +43,18 @@ sealed trait RocksDBValueStateEncoder {
 object RocksDBStateEncoder {
   def getKeyEncoder(
       keySchema: StructType,
-      numColsPrefixKey: Int): RocksDBKeyStateEncoder = {
-    if (numColsPrefixKey > 0) {
-      new PrefixKeyScanStateEncoder(keySchema, numColsPrefixKey)
-    } else {
-      new NoPrefixKeyStateEncoder(keySchema)
+      keyStateEncoderType: KeyStateEncoderType = NoPrefixKeyStateEncoderType,
+      numColsPrefixKey: Int = 0): RocksDBKeyStateEncoder = {

Review Comment:
   I think we should not set `numColsPrefixKey` default to zero here as this object does not perform validation before creating the keyEncoder. Both prefix key and range scan encoders need `numColsPrefixKey > 0`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##########
@@ -85,13 +85,13 @@ class TimerStateImpl(
   }
 
   val keyToTsCFName = timerCFName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
-  store.createColFamilyIfAbsent(keyToTsCFName,
+  store.createColFamilyIfAbsent(keyToTsCFName, PrefixKeyScanStateEncoderType,

Review Comment:
   We want to use RangeScan for State v2 timers, right? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +204,234 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size fields
+ * Note that for range scan, we have to encode the ordering columns using BIG_ENDIAN
+ * encoding to allow for scanning keys in sorted order using the byte-wise comparison
+ * method that RocksDB uses.
+ *
+ * @param keySchema - schema of the key to be encoded
+ * @param numColsPrefixKey - number of columns to be used for prefix key
+ */
+class RangeKeyScanStateEncoder(
+    keySchema: StructType,
+    numOrderingCols: Int) extends RocksDBKeyStateEncoder {
+
+  import RocksDBStateEncoder._
+
+  // Verify that num cols specified for ordering are valid
+  // Note that ordering cols can be equal to number of key schema columns
+  if (numOrderingCols == 0 || numOrderingCols > keySchema.length) {
+    throw StateStoreErrors.incorrectNumOrderingColsNotSupported(numOrderingCols.toString)
+  }
+
+  private val rangeScanKeyFieldsWithIdx: Seq[(StructField, Int)] = {
+    keySchema.zipWithIndex.take(numOrderingCols)
+  }
+
+  private def isFixedSize(dataType: DataType): Boolean = dataType match {
+    case _: ByteType | _: BooleanType | _: ShortType | _: IntegerType | _: LongType |
+      _: FloatType | _: DoubleType => true
+    case _ => false
+  }
+
+  // verify that only fixed sized columns are used for ordering
+  rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
+    if (!isFixedSize(field.dataType)) {
+      throw StateStoreErrors.variableSizeOrderingColsNotSupported(field.name, idx.toString)
+    }
+  }
+

Review Comment:
   [nit] can we keep all validations for input in one place for easier reading in future. Maybe also put in the class constructor (does not make a difference technically but makes it easier to follow what is happening at construction time). 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -17,16 +17,18 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
+import java.nio.{ByteBuffer, ByteOrder}
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
 import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{STATE_ENCODING_NUM_VERSION_BYTES, STATE_ENCODING_VERSION}
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DoubleType, FloatType, IntegerType, LongType, ShortType, StructField, StructType}

Review Comment:
   [nit] Maybe just `mport org.apache.spark.sql.types._` is better. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536560188


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating whether the value
+  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   @HeartSaVioR - that is what I am doing right. If you check line above, I am always adding the `isNullCol` byte for each column.
   
   only the policy is inverted
   if value is null, set 0x01 to null byte and fill 0x00 for remaining bytes (currently i am writing 0x00 for the next byte - but even that's not required because byte buffer allocate will already initialize those bytes to null and regardless we won't use them)
   
   if the value is non-null, set 0x00 to null byte and fill remaining bytes to actual values
   
   line here - https://github.com/apache/spark/pull/45503/files#diff-e7c05dcd29276ad2ad3469481491b0c30ea328d43dee37d7e2f33e65096e8f51R296 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #45503:
URL: https://github.com/apache/spark/pull/45503#issuecomment-2015690849

   @neilramaswamy @HeartSaVioR - spoke to @HeartSaVioR offline and we decided to add support for "null" values for any of the ordering columns. Basically we maintain a single byte per column to indicate whether its null or not. If we have null values in the first column, they will appear last in the iteration. If we have null values for non-first columns, the sorting order based on the first col won't be affected.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536565745


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -212,6 +212,12 @@ class PrefixKeyScanStateEncoder(
  * We cannot support variable sized fields given the UnsafeRow format which stores variable
  * sized fields as offset and length pointers to the actual values, thereby changing the required
  * ordering.
+ * Note that we also support "null" values being passed for these fixed size fields. We prepend
+ * a single byte to indicate whether the column value is null or not. We cannot change the
+ * nullability on the UnsafeRow itself as the expected ordering would change if non-first
+ * columns are marked as null. If the first col is null, those entries will appear last in
+ * the iterator. If non-first columns are null, ordering based on the previous columns will

Review Comment:
   This comment actually confused me. If I'm not missing here, it's just applying "null-last" for each column, and ordering should just work as the same. E.g. `(1, 1, 3, 1, 2), (1, 1, null, 1, 3)`, `(1, 1, null, 2, 2)` should be still ordered. Having null in the middle of column does not block the data to be ordered for remaining columns.
   
   Please correct me if I'm missing something. If I'm on the right track, could we please update the comment and also add a test to check this behavior? Probably adding more values in the latest test case would work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #45503:
URL: https://github.com/apache/spark/pull/45503#issuecomment-2000653779

   @HeartSaVioR @sahnib @neilramaswamy - Could you folks PTAL ? Thx !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1530915028


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##########
@@ -85,13 +85,13 @@ class TimerStateImpl(
   }
 
   val keyToTsCFName = timerCFName + TimerStateUtils.KEY_TO_TIMESTAMP_CF
-  store.createColFamilyIfAbsent(keyToTsCFName,
+  store.createColFamilyIfAbsent(keyToTsCFName, PrefixKeyScanStateEncoderType,

Review Comment:
   Yes correct - but will do this as a follow up change. wanted to keep this change limited to changes on the state store API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1534395668


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -158,14 +161,360 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan validation - invalid num columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+    // zero ordering cols
+    val ex1 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 0),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex1,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN",
+      parameters = Map(
+        "numOrderingCols" -> "0"
+      ),
+      matchPVals = true
+    )
+
+    // ordering cols greater than schema cols
+    val ex2 = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, keySchemaWithRangeScan.length + 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex2,
+      errorClass = "STATE_STORE_INCORRECT_NUM_ORDERING_COLS_FOR_RANGE_SCAN",
+      parameters = Map(
+        "numOrderingCols" -> (keySchemaWithRangeScan.length + 1).toString
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan validation - variable sized columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+    val keySchemaWithVariableSizeCols: StructType = StructType(
+      Seq(StructField("key1", StringType, false), StructField("key2", StringType, false)))
+
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithVariableSizeCols,
+        RangeKeyScanStateEncoderSpec(keySchemaWithVariableSizeCols, 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_VARIABLE_SIZE_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "fieldName" -> keySchemaWithVariableSizeCols.fields(0).name,
+        "index" -> "0"
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan validation - null type columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+    val keySchemaWithNullTypeCols: StructType = StructType(
+      Seq(StructField("key1", NullType, false), StructField("key2", StringType, false)))
+
+    val ex = intercept[SparkUnsupportedOperationException] {
+      tryWithProviderResource(newStoreProvider(keySchemaWithNullTypeCols,
+        RangeKeyScanStateEncoderSpec(keySchemaWithNullTypeCols, 1),
+        colFamiliesEnabled)) { provider =>
+        provider.getStore(0)
+      }
+    }
+    checkError(
+      ex,
+      errorClass = "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED",
+      parameters = Map(
+        "fieldName" -> keySchemaWithNullTypeCols.fields(0).name,
+        "index" -> "0"
+      ),
+      matchPVals = true
+    )
+  }
+
+  testWithColumnFamilies("rocksdb range scan - fixed size non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      // use non-default col family if column families are enabled
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts, "a")
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+      store.commit()
+
+      // test with a different set of power of 2 timestamps
+      val store1 = provider.getStore(1)
+      val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L)
+      timerTimestamps1.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts, "a")
+        val valueRow = dataToValueRow(1)
+        store1.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store1.get(keyRow, cfName)) === 1)
+      }
+
+      val result1 = store1.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - variable size non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L, 3L, 35L, 6L, 9L, 5L)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of variable size
+        val keyRow = dataToKeyRowWithRangeScan(ts,
+          Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+      store.commit()
+
+      // test with a different set of power of 2 timestamps
+      val store1 = provider.getStore(1)
+      val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L)
+      timerTimestamps1.foreach { ts =>
+        // non-timestamp col is of fixed size
+        val keyRow = dataToKeyRowWithRangeScan(ts,
+          Random.alphanumeric.take(Random.nextInt(20) + 1).mkString)
+        val valueRow = dataToValueRow(1)
+        store1.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store1.get(keyRow, cfName)) === 1)
+      }
+
+      val result1 = store1.iterator(cfName).map { kv =>
+        val key = keyRowWithRangeScanToData(kv.key)
+        key._1
+      }.toSeq
+      assert(result1 === (timerTimestamps ++ timerTimestamps1).sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan multiple ordering columns - variable size " +
+    s"non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", LongType, false),
+        StructField("key2", IntegerType, false),
+        StructField("key3", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](LongType, IntegerType, StringType))
+
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 2))
+      }
+
+      val timerTimestamps = Seq((931L, 10), (8000L, 40), (452300L, 1), (4200L, 68), (90L, 2000),
+        (1L, 27), (1L, 394), (1L, 5), (3L, 980), (35L, 2112), (6L, 90118), (9L, 95118), (6L, 87210))
+      timerTimestamps.foreach { ts =>
+        // order by long col first and then by int col
+        val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, ts._2,
+          UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString))))
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        val keyRow = kv.key
+        val key = (keyRow.getLong(0), keyRow.getInt(1), keyRow.getString(2))
+        (key._1, key._2)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan byte ordering column - variable size " +
+    s"non-ordering columns",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", ByteType, false),
+        StructField("key2", IntegerType, false),
+        StructField("key3", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](ByteType, IntegerType, StringType))
+
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 2), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 2))
+      }
+
+      val timerTimestamps: Seq[(Byte, Int)] = Seq((0x33, 10), (0x1A, 40), (0x1F, 1), (0x01, 68),
+        (0x7F, 2000), (0x01, 27), (0x01, 394), (0x01, 5), (0x03, 980), (0x35, 2112),
+        (0x06, 90118), (0x09, 95118), (0x06, 87210))
+      timerTimestamps.foreach { ts =>
+        // order by byte col first and then by int col
+        val keyRow = schemaProj.apply(new GenericInternalRow(Array[Any](ts._1, ts._2,
+          UTF8String.fromString(Random.alphanumeric.take(Random.nextInt(20) + 1).mkString))))
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result: Seq[(Byte, Int)] = store.iterator(cfName).map { kv =>
+        val keyRow = kv.key
+        val key = (keyRow.getByte(0), keyRow.getInt(1), keyRow.getString(2))
+        (key._1, key._2)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - ordering cols and key schema cols are same",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    // use the same schema as value schema for single col key schema
+    tryWithProviderResource(newStoreProvider(valueSchema,
+      RangeKeyScanStateEncoderSpec(valueSchema, 1), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          valueSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(valueSchema, 1))
+      }
+
+      val timerTimestamps = Seq(931, 8000, 452300, 4200, 90, 1, 2, 8, 3, 35, 6, 9, 5)
+      timerTimestamps.foreach { ts =>
+        // non-timestamp col is of variable size
+        val keyRow = dataToValueRow(ts)
+        val valueRow = dataToValueRow(1)
+        store.put(keyRow, valueRow, cfName)
+        assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+      }
+
+      val result = store.iterator(cfName).map { kv =>
+        valueRowToData(kv.key)
+      }.toSeq
+      assert(result === timerTimestamps.sorted)
+
+      // also check for prefix scan
+      timerTimestamps.foreach { ts =>
+        val prefix = dataToValueRow(ts)
+        val result = store.prefixScan(prefix, cfName).map { kv =>
+          assert(valueRowToData(kv.value) === 1)
+          valueRowToData(kv.key)
+        }.toSeq
+        assert(result.size === 1)
+      }
+    }
+  }
+
+  testWithColumnFamilies("rocksdb range scan - with prefix scan",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
+      RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, 1))
+      }
+
+      val timerTimestamps = Seq(931L, 8000L, 1L)
+      timerTimestamps.foreach { ts =>
+        (1 to 5).foreach { keyVal =>
+          val keyRow = dataToKeyRowWithRangeScan(ts, keyVal.toString)
+          val valueRow = dataToValueRow(1)
+          store.put(keyRow, valueRow, cfName)
+          assert(valueRowToData(store.get(keyRow, cfName)) === 1)
+        }
+      }
+
+      timerTimestamps.foreach { ts =>
+        val prefix = dataToPrefixKeyRowWithRangeScan(ts)
+        val result = store.prefixScan(prefix, cfName).map { kv =>
+          assert(valueRowToData(kv.value) === 1)
+          val key = keyRowWithRangeScanToData(kv.key)
+          key._2
+        }.toSeq
+        assert(result.size === 5)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536561314


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -271,97 +277,97 @@ class RangeKeyScanStateEncoder(
 
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
+  // To handle "null" values, we prepend a byte to the byte array indicating whether the value
+  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // If the value is not null, we write the null byte followed by the value.
+  // Note that setting null for the index on the unsafeRow is not feasible as it would change
+  // the sorting order on iteration.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      field.dataType match {
-        // endian-ness doesn't matter for single byte objects. so just write these
-        // types directly.
-        case BooleanType => writer.write(idx, value.asInstanceOf[Boolean])
-        case ByteType => writer.write(idx, value.asInstanceOf[Byte])
-
-        // for other multi-byte types, we need to convert to big-endian
-        case ShortType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putShort(value.asInstanceOf[Short])
-          writer.write(idx, bbuf.array())
-
-        case IntegerType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putInt(value.asInstanceOf[Int])
-          writer.write(idx, bbuf.array())
-
-        case LongType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putLong(value.asInstanceOf[Long])
-          writer.write(idx, bbuf.array())
-
-        case FloatType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putFloat(value.asInstanceOf[Float])
-          writer.write(idx, bbuf.array())
-
-        case DoubleType =>
-          val bbuf = ByteBuffer.allocate(field.dataType.defaultSize)
-          bbuf.order(ByteOrder.BIG_ENDIAN)
-          bbuf.putDouble(value.asInstanceOf[Double])
-          writer.write(idx, bbuf.array())
+      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      bbuf.order(ByteOrder.BIG_ENDIAN)
+      bbuf.put(isNullCol)
+      if (isNullCol == 0x01.toByte) {

Review Comment:
   Yup done
   
   Here is the allocate docs - 
   
   ```
   Allocates a new byte buffer.
   The new buffer's position will be zero, its limit will be its capacity, its mark will be undefined, and each of its elements will be initialized to zero. It will have a backing array and its array offset will be zero.
   ```
   
   So I believe should be safe



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1536573406


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -212,6 +212,12 @@ class PrefixKeyScanStateEncoder(
  * We cannot support variable sized fields given the UnsafeRow format which stores variable
  * sized fields as offset and length pointers to the actual values, thereby changing the required
  * ordering.
+ * Note that we also support "null" values being passed for these fixed size fields. We prepend
+ * a single byte to indicate whether the column value is null or not. We cannot change the
+ * nullability on the UnsafeRow itself as the expected ordering would change if non-first
+ * columns are marked as null. If the first col is null, those entries will appear last in
+ * the iterator. If non-first columns are null, ordering based on the previous columns will

Review Comment:
   Not exactly sure what you mean -
   
   for eg - if you look at the test here - https://github.com/apache/spark/pull/45503/files#diff-4c6b19c847c68e25ffa927df85efb4c79f388648d8c6242f1fe9f84cf09ec5ffR449
   
   Suppose we have the sample input as:
   ```
   Seq((931L, 10), (40L, null), (452300L, 1), (1L, 304), (100L, null))
   ```
   
   when we iterate the long elements should all be ordered as expected
   
   So we should get
   ```
   Seq(1L, 40L, 100L, 931L, 452300L)
   ```
   
   So in this case the ordering on the first col will still apply which is what I was trying to convey
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #45503:
URL: https://github.com/apache/spark/pull/45503#issuecomment-2015886412

   @sahnib - yes we did discuss this and @HeartSaVioR - please do correct me if I'm wrong. but basically within the schema, you can say whether a column is nullable or not. However, Spark doesn't disallow writing "null" values for such columns. So this kind of validation would need to happen at the caller, if we want to block writing "null" values for columns defined as non-nullable. However, in some cases the plan could also change the nullability of columns across runs. So instead, we decided to just handle null values and we would keep a single byte mapping per column for each row and simplify this contract a bit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "sahnib (via GitHub)" <gi...@apache.org>.
sahnib commented on PR #45503:
URL: https://github.com/apache/spark/pull/45503#issuecomment-2015874207

   > @neilramaswamy @HeartSaVioR - spoke to @HeartSaVioR offline and we decided to add support for "null" values for any of the ordering columns. Basically we maintain a single byte per column to indicate whether its null or not. If we have null values in the first column, they will appear last in the iteration. If we have null values for non-first columns, the sorting order based on the first col won't be affected.
   
   For the null value support, would we always add this byte per column - or would we let the user specify that values can, or cannot be nullable? Based on that, we can decide to add the additional byte or not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47372][SS] Add support for range scan based key state encoder for use with state store provider [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45503:
URL: https://github.com/apache/spark/pull/45503#discussion_r1533310095


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -192,6 +199,233 @@ class PrefixKeyScanStateEncoder(
   override def supportPrefixKeyScan: Boolean = true
 }
 
+/**
+ * RocksDB Key Encoder for UnsafeRow that supports range scan for fixed size fields

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org