You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/11/19 08:14:04 UTC

[spark] branch branch-2.4 updated: [SPARK-29918][SQL] RecordBinaryComparator should check endianness when compared by long

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new dc2abe51 [SPARK-29918][SQL] RecordBinaryComparator should check endianness when compared by long
dc2abe51 is described below

commit dc2abe51ca2d3d702d6b6457301c3ca9c7244212
Author: wangguangxin.cn <wa...@bytedance.com>
AuthorDate: Tue Nov 19 16:10:22 2019 +0800

    [SPARK-29918][SQL] RecordBinaryComparator should check endianness when compared by long
    
    ### What changes were proposed in this pull request?
    This PR try to make sure the comparison results of  `compared by 8 bytes at a time` and `compared by bytes wise` in RecordBinaryComparator is *consistent*, by reverse long bytes if it is little-endian and using Long.compareUnsigned.
    
    ### Why are the changes needed?
    If the architecture supports unaligned or the offset is 8 bytes aligned, `RecordBinaryComparator` compare 8 bytes at a time by reading 8 bytes as a long.  Related code is
    ```
        if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) {
          while (i <= leftLen - 8) {
            final long v1 = Platform.getLong(leftObj, leftOff + i);
            final long v2 = Platform.getLong(rightObj, rightOff + i);
            if (v1 != v2) {
              return v1 > v2 ? 1 : -1;
            }
            i += 8;
          }
        }
    ```
    
    Otherwise, it will compare bytes by bytes.  Related code is
    ```
        while (i < leftLen) {
          final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff;
          final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff;
          if (v1 != v2) {
            return v1 > v2 ? 1 : -1;
          }
          i += 1;
        }
    ```
    
    However, on little-endian machine,  the result of *compared by a long value* and *compared bytes by bytes* maybe different.
    
    For two same records, its offsets may vary in the first run and second run, which will lead to compare them using long comparison or byte-by-byte comparison, the result maybe different.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Add new test cases in RecordBinaryComparatorSuite
    
    Closes #26548 from WangGuangxin/binary_comparator.
    
    Authored-by: wangguangxin.cn <wa...@bytedance.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit ffc97530371433bc0221e06d8c1d11af8d92bd94)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/execution/RecordBinaryComparator.java      | 30 +++++++++-----
 .../sort/RecordBinaryComparatorSuite.java          | 47 +++++++++++++++++++++-
 2 files changed, 67 insertions(+), 10 deletions(-)

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
index 40c2cc8..1f24340 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
@@ -20,8 +20,13 @@ package org.apache.spark.sql.execution;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
 
+import java.nio.ByteOrder;
+
 public final class RecordBinaryComparator extends RecordComparator {
 
+  private static final boolean LITTLE_ENDIAN =
+      ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
   @Override
   public int compare(
       Object leftObj, long leftOff, int leftLen, Object rightObj, long rightOff, int rightLen) {
@@ -38,10 +43,10 @@ public final class RecordBinaryComparator extends RecordComparator {
     // check if stars align and we can get both offsets to be aligned
     if ((leftOff % 8) == (rightOff % 8)) {
       while ((leftOff + i) % 8 != 0 && i < leftLen) {
-        final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff;
-        final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff;
+        final int v1 = Platform.getByte(leftObj, leftOff + i);
+        final int v2 = Platform.getByte(rightObj, rightOff + i);
         if (v1 != v2) {
-          return v1 > v2 ? 1 : -1;
+          return (v1 & 0xff) > (v2 & 0xff) ? 1 : -1;
         }
         i += 1;
       }
@@ -49,10 +54,17 @@ public final class RecordBinaryComparator extends RecordComparator {
     // for architectures that support unaligned accesses, chew it up 8 bytes at a time
     if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) {
       while (i <= leftLen - 8) {
-        final long v1 = Platform.getLong(leftObj, leftOff + i);
-        final long v2 = Platform.getLong(rightObj, rightOff + i);
+        long v1 = Platform.getLong(leftObj, leftOff + i);
+        long v2 = Platform.getLong(rightObj, rightOff + i);
         if (v1 != v2) {
-          return v1 > v2 ? 1 : -1;
+          if (LITTLE_ENDIAN) {
+            // if read as little-endian, we have to reverse bytes so that the long comparison result
+            // is equivalent to byte-by-byte comparison result.
+            // See discussion in https://github.com/apache/spark/pull/26548#issuecomment-554645859
+            v1 = Long.reverseBytes(v1);
+            v2 = Long.reverseBytes(v2);
+          }
+          return Long.compareUnsigned(v1, v2);
         }
         i += 8;
       }
@@ -60,10 +72,10 @@ public final class RecordBinaryComparator extends RecordComparator {
     // this will finish off the unaligned comparisons, or do the entire aligned comparison
     // whichever is needed.
     while (i < leftLen) {
-      final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff;
-      final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff;
+      final int v1 = Platform.getByte(leftObj, leftOff + i);
+      final int v2 = Platform.getByte(rightObj, rightOff + i);
       if (v1 != v2) {
-        return v1 > v2 ? 1 : -1;
+        return (v1 & 0xff) > (v2 & 0xff) ? 1 : -1;
       }
       i += 1;
     }
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
index 97f3dc5..9664638e 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
@@ -32,6 +32,7 @@ import org.apache.spark.unsafe.types.UTF8String;
 import org.apache.spark.util.collection.unsafe.sort.*;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -271,7 +272,7 @@ public class RecordBinaryComparatorSuite {
     insertRow(row1);
     insertRow(row2);
 
-    assert(compare(0, 1) < 0);
+    assert(compare(0, 1) > 0);
   }
 
   @Test
@@ -319,4 +320,48 @@ public class RecordBinaryComparatorSuite {
 
     assert(compare(0, 1) < 0);
   }
+
+  @Test
+  public void testCompareLongsAsLittleEndian() {
+    long arrayOffset = 12;
+
+    long[] arr1 = new long[2];
+    Platform.putLong(arr1, arrayOffset, 0x0100000000000000L);
+    long[] arr2 = new long[2];
+    Platform.putLong(arr2, arrayOffset + 4, 0x0000000000000001L);
+    // leftBaseOffset is not aligned while rightBaseOffset is aligned,
+    // it will start by comparing long
+    int result1 = binaryComparator.compare(arr1, arrayOffset, 8, arr2, arrayOffset + 4, 8);
+
+    long[] arr3 = new long[2];
+    Platform.putLong(arr3, arrayOffset, 0x0100000000000000L);
+    long[] arr4 = new long[2];
+    Platform.putLong(arr4, arrayOffset, 0x0000000000000001L);
+    // both left and right offset is not aligned, it will start with byte-by-byte comparison
+    int result2 = binaryComparator.compare(arr3, arrayOffset, 8, arr4, arrayOffset, 8);
+
+    Assert.assertEquals(result1, result2);
+  }
+
+  @Test
+  public void testCompareLongsAsUnsigned() {
+    long arrayOffset = 12;
+
+    long[] arr1 = new long[2];
+    Platform.putLong(arr1, arrayOffset + 4, 0xa000000000000000L);
+    long[] arr2 = new long[2];
+    Platform.putLong(arr2, arrayOffset + 4, 0x0000000000000000L);
+    // both leftBaseOffset and rightBaseOffset are aligned, so it will start by comparing long
+    int result1 = binaryComparator.compare(arr1, arrayOffset + 4, 8, arr2, arrayOffset + 4, 8);
+
+    long[] arr3 = new long[2];
+    Platform.putLong(arr3, arrayOffset, 0xa000000000000000L);
+    long[] arr4 = new long[2];
+    Platform.putLong(arr4, arrayOffset, 0x0000000000000000L);
+    // both leftBaseOffset and rightBaseOffset are not aligned,
+    // so it will start with byte-by-byte comparison
+    int result2 = binaryComparator.compare(arr3, arrayOffset, 8, arr4, arrayOffset, 8);
+
+    Assert.assertEquals(result1, result2);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org