You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "anishshri-db (via GitHub)" <gi...@apache.org> on 2024/03/29 19:58:03 UTC

[PR] [SPARK-47653] Add support for negative numeric types and range scan key encoder [spark]

anishshri-db opened a new pull request, #45778:
URL: https://github.com/apache/spark/pull/45778

   ### What changes were proposed in this pull request?
   Add support for negative numeric types and range scan key encoder
   
   
   ### Why are the changes needed?
   Without this change, sort ordering for `-ve` numbers is not maintained on iteration. Negative numbers would appear last previously
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Added unit tests
   
   ```
   [info] - rocksdb range scan - with prefix scan - with colFamiliesEnabled=true (with changelog checkpointing) (164 milliseconds)
   [info] - rocksdb range scan - with prefix scan - with colFamiliesEnabled=true (without changelog checkpointing) (95 milliseconds)
   [info] - rocksdb range scan - with prefix scan - with colFamiliesEnabled=false (with changelog checkpointing) (155 milliseconds)
   [info] - rocksdb range scan - with prefix scan - with colFamiliesEnabled=false (without changelog checkpointing) (82 milliseconds)
   12:55:54.184 WARN org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreSuite:
   
   ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.RocksDBStateStoreSuite, threads: rpc-boss-3-1 (daemon=true), ForkJoinPool.commonPool-worker-2 (daemon=true), shuffle-boss-6-1 (daemon=true), ForkJoinPool.commonPool-worker-1 (daemon=true) =====
   [info] Run completed in 8 seconds, 888 milliseconds.
   [info] Total number of tests run: 44
   [info] Suites: completed 1, aborted 0
   [info] Tests: succeeded 44, failed 0, canceled 0, ignored 0, pending 0
   [info] All tests passed.
   [success] Total time: 21 s, completed Mar 29, 2024, 12:55:54 PM
   ```
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1548877778


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +284,111 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or positive
+  // To ensure sorted ordering, we use the lowest byte value for negative numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating whether the value
-  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
       // Note that we cannot allocate a smaller buffer here even if the value is null
       // because the effective byte array is considered variable size and needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (value == null) {
+        bbuf.put(nullValMarker)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(positiveValMarker)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>

Review Comment:
   nit: Sorry for nitpicking, but as we do explicit type casting twice, can we do the following?
   
   ```case s: ShortType =>```
   
   Apply to all types except matching with multiple types (like above BooleanType/ByteType).



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -294,6 +295,60 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan - variable size non-ordering columns with " +
+    "double type values are supported",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", DoubleType, false),
+        StructField("key2", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](DoubleType, StringType))
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 1), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 1))
+      }
+
+      // Verify that the sort ordering here is as follows:
+      // -NaN, -Infinity, -ve values, -0, 0, +0, +ve values, +Infinity, +NaN

Review Comment:
   nit: This test does not verify that the ordering takes NaN into account, do I understand correctly? If then let's update the code comment to clarify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1546863228


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -283,45 +286,79 @@ class RangeKeyScanStateEncoder(
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize to 0x01 to indicate that the column is not null and positive
+      var isNullOrSignCol: Byte = 0x01.toByte
+      // Update the isNullOrSignCol byte to indicate null value
+      if (value == null) {
+        isNullOrSignCol = 0x02.toByte
+      }
       // Note that we cannot allocate a smaller buffer here even if the value is null
       // because the effective byte array is considered variable size and needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (isNullOrSignCol == 0x02.toByte) {
+        bbuf.put(isNullOrSignCol)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(isNullOrSignCol)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>
+            if (value.asInstanceOf[Short] < 0) {
+              isNullOrSignCol = 0x00.toByte
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putShort(value.asInstanceOf[Short])
             writer.write(idx, bbuf.array())
 
           case IntegerType =>
+            if (value.asInstanceOf[Int] < 0) {
+              isNullOrSignCol = 0x00.toByte
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putInt(value.asInstanceOf[Int])
             writer.write(idx, bbuf.array())
 
           case LongType =>
+            if (value.asInstanceOf[Long] < 0) {
+              isNullOrSignCol = 0x00.toByte
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putLong(value.asInstanceOf[Long])
             writer.write(idx, bbuf.array())
 
+          // For floating point types, we cannot support ordering using additional byte for
+          // negative values. This is because the IEEE 754 floating point representation
+          // stores exponent first and then the mantissa. So, we cannot simply prepend a byte

Review Comment:
   Not sure what you meant exactly - but I realized that we can support this. Basically we just need to flip all the bits for negative float/double values and convert them back. Updated the change to remove this restriction



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "sahnib (via GitHub)" <gi...@apache.org>.
sahnib commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1547188291


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +283,113 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or positive
+  // To ensure sorted ordering, we use the lowest byte value for negative numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating whether the value
-  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize the value to indicate positive value to begin with
+      var isNullOrSignCol: Byte = positiveValMarker
+      // Update the isNullOrSignCol byte (if required) to indicate null value
+      if (value == null) {
+        isNullOrSignCol = nullValMarker
+      }
       // Note that we cannot allocate a smaller buffer here even if the value is null
       // because the effective byte array is considered variable size and needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (isNullOrSignCol == nullValMarker) {
+        bbuf.put(isNullOrSignCol)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(isNullOrSignCol)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>
+            if (value.asInstanceOf[Short] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putShort(value.asInstanceOf[Short])
             writer.write(idx, bbuf.array())

Review Comment:
   [nit] we could extract the Short/Integer/Long handling to a common typed method. 
   
   ```
   def encodeIntegralValue[T](value: T, bbuf: ByteBuffer): Unit = {
     if (value.asInstanceOf[T] < 0) {
       isNullOrSignCol = negativeValMarker
     }
     bbuf.put(isNullOrSignCol)
     bbuf.putShort(value.asInstanceOf[T])
     writer.write(idx, bbuf.array())
   }
   ```
   
   same for Float/Double. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -206,19 +208,24 @@ class PrefixKeyScanStateEncoder(
  * for the range scan into an UnsafeRow; we then rewrite that UnsafeRow's fields in BIG_ENDIAN
  * to allow for scanning keys in sorted order using the byte-wise comparison method that
  * RocksDB uses.
+ *
  * Then, for the rest of the fields, we project those into another UnsafeRow.
  * We then effectively join these two UnsafeRows together, and finally take those bytes
  * to get the resulting row.
+ *
  * We cannot support variable sized fields given the UnsafeRow format which stores variable
  * sized fields as offset and length pointers to the actual values, thereby changing the required
  * ordering.
+ *
  * Note that we also support "null" values being passed for these fixed size fields. We prepend
  * a single byte to indicate whether the column value is null or not. We cannot change the
  * nullability on the UnsafeRow itself as the expected ordering would change if non-first
  * columns are marked as null. If the first col is null, those entries will appear last in
  * the iterator. If non-first columns are null, ordering based on the previous columns will
  * still be honored. For rows with null column values, ordering for subsequent columns
- * will also be maintained within those set of rows.
+ * will also be maintained within those set of rows. We use the same byte to also encode whether
+ * the value is negative or not. For negative float/double values, we flip all the bits to ensure
+ * the right lexicographical ordering.

Review Comment:
   [nit] I know its not part of this PR, but can we also change the parameter name `numColsPrefixKey` in `RangeKeyScanStateEncoderSpec` to `numOrderingCols` as used here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "sahnib (via GitHub)" <gi...@apache.org>.
sahnib commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1548037943


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +283,113 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or positive
+  // To ensure sorted ordering, we use the lowest byte value for negative numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating whether the value
-  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize the value to indicate positive value to begin with
+      var isNullOrSignCol: Byte = positiveValMarker
+      // Update the isNullOrSignCol byte (if required) to indicate null value
+      if (value == null) {
+        isNullOrSignCol = nullValMarker
+      }
       // Note that we cannot allocate a smaller buffer here even if the value is null
       // because the effective byte array is considered variable size and needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (isNullOrSignCol == nullValMarker) {
+        bbuf.put(isNullOrSignCol)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(isNullOrSignCol)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>
+            if (value.asInstanceOf[Short] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putShort(value.asInstanceOf[Short])
             writer.write(idx, bbuf.array())

Review Comment:
   I missed the `putShort, putInt, putLong` difference. We can keep it as is. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #45778:
URL: https://github.com/apache/spark/pull/45778#issuecomment-2027682508

   @HeartSaVioR @neilramaswamy @sahnib - PTAL, thx !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1546862226


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -283,45 +286,79 @@ class RangeKeyScanStateEncoder(
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize to 0x01 to indicate that the column is not null and positive
+      var isNullOrSignCol: Byte = 0x01.toByte

Review Comment:
   Done



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -283,45 +286,79 @@ class RangeKeyScanStateEncoder(
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.

Review Comment:
   Done - added notes above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "neilramaswamy (via GitHub)" <gi...@apache.org>.
neilramaswamy commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1548751462


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +283,113 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or positive
+  // To ensure sorted ordering, we use the lowest byte value for negative numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating whether the value
-  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize the value to indicate positive value to begin with
+      var isNullOrSignCol: Byte = positiveValMarker
+      // Update the isNullOrSignCol byte (if required) to indicate null value
+      if (value == null) {
+        isNullOrSignCol = nullValMarker
+      }
       // Note that we cannot allocate a smaller buffer here even if the value is null
       // because the effective byte array is considered variable size and needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (isNullOrSignCol == nullValMarker) {
+        bbuf.put(isNullOrSignCol)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(isNullOrSignCol)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>
+            if (value.asInstanceOf[Short] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putShort(value.asInstanceOf[Short])
             writer.write(idx, bbuf.array())
 
           case IntegerType =>
+            if (value.asInstanceOf[Int] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putInt(value.asInstanceOf[Int])
             writer.write(idx, bbuf.array())
 
           case LongType =>
+            if (value.asInstanceOf[Long] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putLong(value.asInstanceOf[Long])
             writer.write(idx, bbuf.array())
 
           case FloatType =>
-            bbuf.putFloat(value.asInstanceOf[Float])
+            // for negative values, we need to flip all the bits to ensure correct ordering
+            val rawBits = floatToRawIntBits(value.asInstanceOf[Float])
+            // perform sign comparison using bit manipulation to ensure NaN values are handled
+            // correctly
+            if ((rawBits & floatSignBitMask) != 0) {
+              // flip all the bits

Review Comment:
   Hm, feels like we're brushing aside the complexity here. We ought to explain _why_ flipping the bits works (it's not obvious). Here's why I think this works:
   
   IEEE 754 has the following format: `[sign bit, exponent, mantissa]`. Let's say that the sign bit is `1`, so we have a negative number. When the exponent is lexicographically larger, then we have a more negative number (same with mantissa). We want the opposite to be true, i.e. when the exponent/mantissa is lexicographically larger, we have a smaller number.
   
   How can we do that? Well, we can shift the negative numbers into the positive range. We can shift up by adding a constant, `2^n - 1` (which is flipping the bits). Then, if `x` and `y` are negative s.t. `|x| > |y|`, then `x + 2^n - 1 < y + 2^n - 1`.
   
   Not the most elegant explanation (I'm sure there's better), but at least it's not evading complexity.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -294,6 +295,55 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan - variable size non-ordering columns with " +
+    "double type values are supported",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", DoubleType, false),
+        StructField("key2", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](DoubleType, StringType))
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 1), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 1))
+      }
+
+      // Verify that the sort ordering here is as follows:
+      // -NaN, -Infinity, -ve values, 0, +ve values, +Infinity, +NaN
+      val timerTimestamps: Seq[Double] = Seq(6894.32, 345.2795, -23.24, 24.466,
+        7860.0, 4535.55, 423.42, -5350.355, 0.0, 0.001, 0.233, -53.255, -66.356, -244.452,
+        96456466.3536677, 14421434453.43524562, Double.NaN, Double.PositiveInfinity,

Review Comment:
   There are many `NaN`s as per IEEE 754—is there only two valid/possible NaNs (+/-) in Java ?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -294,6 +295,55 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan - variable size non-ordering columns with " +
+    "double type values are supported",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", DoubleType, false),
+        StructField("key2", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](DoubleType, StringType))
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 1), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 1))
+      }
+
+      // Verify that the sort ordering here is as follows:
+      // -NaN, -Infinity, -ve values, 0, +ve values, +Infinity, +NaN
+      val timerTimestamps: Seq[Double] = Seq(6894.32, 345.2795, -23.24, 24.466,
+        7860.0, 4535.55, 423.42, -5350.355, 0.0, 0.001, 0.233, -53.255, -66.356, -244.452,

Review Comment:
   +/- 0.0?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +283,113 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or positive
+  // To ensure sorted ordering, we use the lowest byte value for negative numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating whether the value
-  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize the value to indicate positive value to begin with
+      var isNullOrSignCol: Byte = positiveValMarker
+      // Update the isNullOrSignCol byte (if required) to indicate null value
+      if (value == null) {
+        isNullOrSignCol = nullValMarker
+      }
       // Note that we cannot allocate a smaller buffer here even if the value is null
       // because the effective byte array is considered variable size and needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (isNullOrSignCol == nullValMarker) {
+        bbuf.put(isNullOrSignCol)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(isNullOrSignCol)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>
+            if (value.asInstanceOf[Short] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putShort(value.asInstanceOf[Short])
             writer.write(idx, bbuf.array())
 
           case IntegerType =>
+            if (value.asInstanceOf[Int] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putInt(value.asInstanceOf[Int])
             writer.write(idx, bbuf.array())
 
           case LongType =>
+            if (value.asInstanceOf[Long] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putLong(value.asInstanceOf[Long])
             writer.write(idx, bbuf.array())
 
           case FloatType =>
-            bbuf.putFloat(value.asInstanceOf[Float])
+            // for negative values, we need to flip all the bits to ensure correct ordering
+            val rawBits = floatToRawIntBits(value.asInstanceOf[Float])
+            // perform sign comparison using bit manipulation to ensure NaN values are handled
+            // correctly
+            if ((rawBits & floatSignBitMask) != 0) {
+              // flip all the bits

Review Comment:
   Or, maybe Wikipedia notes it somewhere :)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +283,113 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or positive
+  // To ensure sorted ordering, we use the lowest byte value for negative numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating whether the value
-  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize the value to indicate positive value to begin with
+      var isNullOrSignCol: Byte = positiveValMarker

Review Comment:
   I think this code with the marker is a bit convoluted, what about something like:
   
   ```
   // For each field
   bbuf = allocate()
   if null:
       bbuf.put(nullMarker)
   else:
       // Switch on case
       val marker = positiveMarker if val >= 0 else negativeMarker
       bbuf.put(marker)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1547174481


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -283,45 +286,79 @@ class RangeKeyScanStateEncoder(
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize to 0x01 to indicate that the column is not null and positive
+      var isNullOrSignCol: Byte = 0x01.toByte
+      // Update the isNullOrSignCol byte to indicate null value
+      if (value == null) {
+        isNullOrSignCol = 0x02.toByte
+      }
       // Note that we cannot allocate a smaller buffer here even if the value is null
       // because the effective byte array is considered variable size and needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (isNullOrSignCol == 0x02.toByte) {
+        bbuf.put(isNullOrSignCol)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(isNullOrSignCol)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>
+            if (value.asInstanceOf[Short] < 0) {
+              isNullOrSignCol = 0x00.toByte
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putShort(value.asInstanceOf[Short])
             writer.write(idx, bbuf.array())
 
           case IntegerType =>
+            if (value.asInstanceOf[Int] < 0) {
+              isNullOrSignCol = 0x00.toByte
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putInt(value.asInstanceOf[Int])
             writer.write(idx, bbuf.array())
 
           case LongType =>
+            if (value.asInstanceOf[Long] < 0) {
+              isNullOrSignCol = 0x00.toByte
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putLong(value.asInstanceOf[Long])
             writer.write(idx, bbuf.array())
 
+          // For floating point types, we cannot support ordering using additional byte for
+          // negative values. This is because the IEEE 754 floating point representation
+          // stores exponent first and then the mantissa. So, we cannot simply prepend a byte

Review Comment:
   Updated this to support negative values too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1548946531


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +284,111 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or positive
+  // To ensure sorted ordering, we use the lowest byte value for negative numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating whether the value
-  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
       // Note that we cannot allocate a smaller buffer here even if the value is null
       // because the effective byte array is considered variable size and needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (value == null) {
+        bbuf.put(nullValMarker)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(positiveValMarker)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>

Review Comment:
   Never mind. I confused with dataType and actual value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1546616005


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -292,7 +299,13 @@ class RangeKeyScanStateEncoder(
       // Note that we cannot allocate a smaller buffer here even if the value is null
       // because the effective byte array is considered variable size and needs to have
       // the same size across all rows for the ordering to work as expected.
-      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      val bbuf = if (!supportsSignedValues(field.dataType)) {
+        ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      } else {
+        // for numeric types, we reserve 2 additional bytes. one to indicate if the value is null
+        // and the other to indicate if the value is negative.
+        ByteBuffer.allocate(field.dataType.defaultSize + 2)
+      }

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1548956564


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -294,6 +295,60 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan - variable size non-ordering columns with " +
+    "double type values are supported",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", DoubleType, false),
+        StructField("key2", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](DoubleType, StringType))
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 1), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 1))
+      }
+
+      // Verify that the sort ordering here is as follows:
+      // -NaN, -Infinity, -ve values, -0, 0, +0, +ve values, +Infinity, +NaN

Review Comment:
   Updated to also check for the NaN positions in the returned list. The sorted comparison doesn't work for the collections - so we need to filter while performing that check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45778:
URL: https://github.com/apache/spark/pull/45778#issuecomment-2033809172

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1547194931


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -206,19 +208,24 @@ class PrefixKeyScanStateEncoder(
  * for the range scan into an UnsafeRow; we then rewrite that UnsafeRow's fields in BIG_ENDIAN
  * to allow for scanning keys in sorted order using the byte-wise comparison method that
  * RocksDB uses.
+ *
  * Then, for the rest of the fields, we project those into another UnsafeRow.
  * We then effectively join these two UnsafeRows together, and finally take those bytes
  * to get the resulting row.
+ *
  * We cannot support variable sized fields given the UnsafeRow format which stores variable
  * sized fields as offset and length pointers to the actual values, thereby changing the required
  * ordering.
+ *
  * Note that we also support "null" values being passed for these fixed size fields. We prepend
  * a single byte to indicate whether the column value is null or not. We cannot change the
  * nullability on the UnsafeRow itself as the expected ordering would change if non-first
  * columns are marked as null. If the first col is null, those entries will appear last in
  * the iterator. If non-first columns are null, ordering based on the previous columns will
  * still be honored. For rows with null column values, ordering for subsequent columns
- * will also be maintained within those set of rows.
+ * will also be maintained within those set of rows. We use the same byte to also encode whether
+ * the value is negative or not. For negative float/double values, we flip all the bits to ensure
+ * the right lexicographical ordering.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1548801165


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +283,113 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or positive
+  // To ensure sorted ordering, we use the lowest byte value for negative numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating whether the value
-  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize the value to indicate positive value to begin with
+      var isNullOrSignCol: Byte = positiveValMarker
+      // Update the isNullOrSignCol byte (if required) to indicate null value
+      if (value == null) {
+        isNullOrSignCol = nullValMarker
+      }
       // Note that we cannot allocate a smaller buffer here even if the value is null
       // because the effective byte array is considered variable size and needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (isNullOrSignCol == nullValMarker) {
+        bbuf.put(isNullOrSignCol)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(isNullOrSignCol)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>
+            if (value.asInstanceOf[Short] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putShort(value.asInstanceOf[Short])
             writer.write(idx, bbuf.array())
 
           case IntegerType =>
+            if (value.asInstanceOf[Int] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putInt(value.asInstanceOf[Int])
             writer.write(idx, bbuf.array())
 
           case LongType =>
+            if (value.asInstanceOf[Long] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putLong(value.asInstanceOf[Long])
             writer.write(idx, bbuf.array())
 
           case FloatType =>
-            bbuf.putFloat(value.asInstanceOf[Float])
+            // for negative values, we need to flip all the bits to ensure correct ordering
+            val rawBits = floatToRawIntBits(value.asInstanceOf[Float])
+            // perform sign comparison using bit manipulation to ensure NaN values are handled
+            // correctly
+            if ((rawBits & floatSignBitMask) != 0) {
+              // flip all the bits

Review Comment:
   Discussed offline and also added the reference to the Wikipedia link



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +283,113 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or positive
+  // To ensure sorted ordering, we use the lowest byte value for negative numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating whether the value
-  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize the value to indicate positive value to begin with
+      var isNullOrSignCol: Byte = positiveValMarker

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1548800930


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -294,6 +295,55 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan - variable size non-ordering columns with " +
+    "double type values are supported",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", DoubleType, false),
+        StructField("key2", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](DoubleType, StringType))
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 1), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 1))
+      }
+
+      // Verify that the sort ordering here is as follows:
+      // -NaN, -Infinity, -ve values, 0, +ve values, +Infinity, +NaN
+      val timerTimestamps: Seq[Double] = Seq(6894.32, 345.2795, -23.24, 24.466,
+        7860.0, 4535.55, 423.42, -5350.355, 0.0, 0.001, 0.233, -53.255, -66.356, -244.452,
+        96456466.3536677, 14421434453.43524562, Double.NaN, Double.PositiveInfinity,

Review Comment:
   Done - added more binary representations than the usual Java constants



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1548800791


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -294,6 +295,55 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  testWithColumnFamilies("rocksdb range scan - variable size non-ordering columns with " +
+    "double type values are supported",
+    TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
+
+    val testSchema: StructType = StructType(
+      Seq(StructField("key1", DoubleType, false),
+        StructField("key2", StringType, false)))
+
+    val schemaProj = UnsafeProjection.create(Array[DataType](DoubleType, StringType))
+    tryWithProviderResource(newStoreProvider(testSchema,
+      RangeKeyScanStateEncoderSpec(testSchema, 1), colFamiliesEnabled)) { provider =>
+      val store = provider.getStore(0)
+
+      val cfName = if (colFamiliesEnabled) "testColFamily" else "default"
+      if (colFamiliesEnabled) {
+        store.createColFamilyIfAbsent(cfName,
+          testSchema, valueSchema,
+          RangeKeyScanStateEncoderSpec(testSchema, 1))
+      }
+
+      // Verify that the sort ordering here is as follows:
+      // -NaN, -Infinity, -ve values, 0, +ve values, +Infinity, +NaN
+      val timerTimestamps: Seq[Double] = Seq(6894.32, 345.2795, -23.24, 24.466,
+        7860.0, 4535.55, 423.42, -5350.355, 0.0, 0.001, 0.233, -53.255, -66.356, -244.452,

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "sahnib (via GitHub)" <gi...@apache.org>.
sahnib commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1546601893


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -292,7 +299,13 @@ class RangeKeyScanStateEncoder(
       // Note that we cannot allocate a smaller buffer here even if the value is null
       // because the effective byte array is considered variable size and needs to have
       // the same size across all rows for the ordering to work as expected.
-      val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      val bbuf = if (!supportsSignedValues(field.dataType)) {
+        ByteBuffer.allocate(field.dataType.defaultSize + 1)
+      } else {
+        // for numeric types, we reserve 2 additional bytes. one to indicate if the value is null
+        // and the other to indicate if the value is negative.
+        ByteBuffer.allocate(field.dataType.defaultSize + 2)
+      }

Review Comment:
   Can we just use one byte to indicate sign and Null. If we use `0x00` for -ve, `0x01` for +ve and `0x02` for Null. we should be able to save a byte per column. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "neilramaswamy (via GitHub)" <gi...@apache.org>.
neilramaswamy commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1546662169


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -283,45 +286,79 @@ class RangeKeyScanStateEncoder(
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.

Review Comment:
   More importantly, I think we need to comment why we chose the byte values the way we did. We can't encode positive numbers with `0x00`, for example. It needs to be `0x01`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -283,45 +286,79 @@ class RangeKeyScanStateEncoder(
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize to 0x01 to indicate that the column is not null and positive
+      var isNullOrSignCol: Byte = 0x01.toByte

Review Comment:
   Can we refactor these `0x01` `0x00` to some named constants? I know what's was going on here, but the first read was high friction.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -283,45 +286,79 @@ class RangeKeyScanStateEncoder(
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize to 0x01 to indicate that the column is not null and positive
+      var isNullOrSignCol: Byte = 0x01.toByte
+      // Update the isNullOrSignCol byte to indicate null value
+      if (value == null) {
+        isNullOrSignCol = 0x02.toByte
+      }
       // Note that we cannot allocate a smaller buffer here even if the value is null
       // because the effective byte array is considered variable size and needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (isNullOrSignCol == 0x02.toByte) {
+        bbuf.put(isNullOrSignCol)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(isNullOrSignCol)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>
+            if (value.asInstanceOf[Short] < 0) {
+              isNullOrSignCol = 0x00.toByte
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putShort(value.asInstanceOf[Short])
             writer.write(idx, bbuf.array())
 
           case IntegerType =>
+            if (value.asInstanceOf[Int] < 0) {
+              isNullOrSignCol = 0x00.toByte
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putInt(value.asInstanceOf[Int])
             writer.write(idx, bbuf.array())
 
           case LongType =>
+            if (value.asInstanceOf[Long] < 0) {
+              isNullOrSignCol = 0x00.toByte
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putLong(value.asInstanceOf[Long])
             writer.write(idx, bbuf.array())
 
+          // For floating point types, we cannot support ordering using additional byte for
+          // negative values. This is because the IEEE 754 floating point representation
+          // stores exponent first and then the mantissa. So, we cannot simply prepend a byte

Review Comment:
   I'm not sure that the relative position of the exponent and mantissa is the issue here ? IIUC the reason is that (assuming the sign bit is `1`) as the exponent gets larger, the number becomes more negative. Which means that a lexicographically smaller exponent means that the number is closer to 0, i.e. should appear _later_.
   
   In two's complement, I think the trick of flipping the sign bit works since the _rest_ of the bits being lexicographically small mean that the number itself is actually smaller. That is, `0b1010` (`-6`) is larger than `0b1001` (`-7`) and indeed, if we disregard the MSB, `010` is lexicographically larger than `001`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45778:
URL: https://github.com/apache/spark/pull/45778#discussion_r1547191718


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -276,53 +283,113 @@ class RangeKeyScanStateEncoder(
     rangeScanKeyProjection(key)
   }
 
+  // bit masks used for checking sign or flipping all bits for negative float/double values
+  private val floatFlipBitMask = 0xFFFFFFFF
+  private val floatSignBitMask = 0x80000000
+
+  private val doubleFlipBitMask = 0xFFFFFFFFFFFFFFFFL
+  private val doubleSignBitMask = 0x8000000000000000L
+
+  // Byte markers used to identify whether the value is null, negative or positive
+  // To ensure sorted ordering, we use the lowest byte value for negative numbers followed by
+  // positive numbers and then null values.
+  private val negativeValMarker: Byte = 0x00.toByte
+  private val positiveValMarker: Byte = 0x01.toByte
+  private val nullValMarker: Byte = 0x02.toByte
+
   // Rewrite the unsafe row by replacing fixed size fields with BIG_ENDIAN encoding
   // using byte arrays.
   // To handle "null" values, we prepend a byte to the byte array indicating whether the value
-  // is null or not. If the value is null, we write the null byte followed by a zero byte.
+  // is null or not. If the value is null, we write the null byte followed by zero bytes.
   // If the value is not null, we write the null byte followed by the value.
   // Note that setting null for the index on the unsafeRow is not feasible as it would change
   // the sorting order on iteration.
+  // Also note that the same byte is used to indicate whether the value is negative or not.
   private def encodePrefixKeyForRangeScan(row: UnsafeRow): UnsafeRow = {
     val writer = new UnsafeRowWriter(numOrderingCols)
     writer.resetRowWriter()
     rangeScanKeyFieldsWithIdx.foreach { case (field, idx) =>
       val value = row.get(idx, field.dataType)
-      val isNullCol: Byte = if (value == null) 0x01.toByte else 0x00.toByte
+      // initialize the value to indicate positive value to begin with
+      var isNullOrSignCol: Byte = positiveValMarker
+      // Update the isNullOrSignCol byte (if required) to indicate null value
+      if (value == null) {
+        isNullOrSignCol = nullValMarker
+      }
       // Note that we cannot allocate a smaller buffer here even if the value is null
       // because the effective byte array is considered variable size and needs to have
       // the same size across all rows for the ordering to work as expected.
       val bbuf = ByteBuffer.allocate(field.dataType.defaultSize + 1)
       bbuf.order(ByteOrder.BIG_ENDIAN)
-      bbuf.put(isNullCol)
-      if (isNullCol == 0x01.toByte) {
+      if (isNullOrSignCol == nullValMarker) {
+        bbuf.put(isNullOrSignCol)
         writer.write(idx, bbuf.array())
       } else {
         field.dataType match {
           case BooleanType =>
           case ByteType =>
+            bbuf.put(isNullOrSignCol)
             bbuf.put(value.asInstanceOf[Byte])
             writer.write(idx, bbuf.array())
 
-          // for other multi-byte types, we need to convert to big-endian
           case ShortType =>
+            if (value.asInstanceOf[Short] < 0) {
+              isNullOrSignCol = negativeValMarker
+            }
+            bbuf.put(isNullOrSignCol)
             bbuf.putShort(value.asInstanceOf[Short])
             writer.write(idx, bbuf.array())

Review Comment:
   Hmm - not sure I can do this - we still need to write to the byte array using type specific functions ?
   
   For eg - `putShort, putInt, putLong` etc ?
   
   Can refactor but might not be super clean. Prob better just to keep it as it is ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #45778: [SPARK-47653][SS] Add support for negative numeric types and range scan key encoder
URL: https://github.com/apache/spark/pull/45778


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org