You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/11/19 08:14:04 UTC
[spark] branch branch-2.4 updated: [SPARK-29918][SQL]
RecordBinaryComparator should check endianness when compared by long
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new dc2abe51 [SPARK-29918][SQL] RecordBinaryComparator should check endianness when compared by long
dc2abe51 is described below
commit dc2abe51ca2d3d702d6b6457301c3ca9c7244212
Author: wangguangxin.cn <wa...@bytedance.com>
AuthorDate: Tue Nov 19 16:10:22 2019 +0800
[SPARK-29918][SQL] RecordBinaryComparator should check endianness when compared by long
### What changes were proposed in this pull request?
This PR try to make sure the comparison results of `compared by 8 bytes at a time` and `compared by bytes wise` in RecordBinaryComparator is *consistent*, by reverse long bytes if it is little-endian and using Long.compareUnsigned.
### Why are the changes needed?
If the architecture supports unaligned or the offset is 8 bytes aligned, `RecordBinaryComparator` compare 8 bytes at a time by reading 8 bytes as a long. Related code is
```
if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) {
while (i <= leftLen - 8) {
final long v1 = Platform.getLong(leftObj, leftOff + i);
final long v2 = Platform.getLong(rightObj, rightOff + i);
if (v1 != v2) {
return v1 > v2 ? 1 : -1;
}
i += 8;
}
}
```
Otherwise, it will compare bytes by bytes. Related code is
```
while (i < leftLen) {
final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff;
final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff;
if (v1 != v2) {
return v1 > v2 ? 1 : -1;
}
i += 1;
}
```
However, on little-endian machine, the result of *compared by a long value* and *compared bytes by bytes* maybe different.
For two same records, its offsets may vary in the first run and second run, which will lead to compare them using long comparison or byte-by-byte comparison, the result maybe different.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Add new test cases in RecordBinaryComparatorSuite
Closes #26548 from WangGuangxin/binary_comparator.
Authored-by: wangguangxin.cn <wa...@bytedance.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit ffc97530371433bc0221e06d8c1d11af8d92bd94)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/execution/RecordBinaryComparator.java | 30 +++++++++-----
.../sort/RecordBinaryComparatorSuite.java | 47 +++++++++++++++++++++-
2 files changed, 67 insertions(+), 10 deletions(-)
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
index 40c2cc8..1f24340 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
@@ -20,8 +20,13 @@ package org.apache.spark.sql.execution;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+import java.nio.ByteOrder;
+
public final class RecordBinaryComparator extends RecordComparator {
+ private static final boolean LITTLE_ENDIAN =
+ ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
@Override
public int compare(
Object leftObj, long leftOff, int leftLen, Object rightObj, long rightOff, int rightLen) {
@@ -38,10 +43,10 @@ public final class RecordBinaryComparator extends RecordComparator {
// check if stars align and we can get both offsets to be aligned
if ((leftOff % 8) == (rightOff % 8)) {
while ((leftOff + i) % 8 != 0 && i < leftLen) {
- final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff;
- final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff;
+ final int v1 = Platform.getByte(leftObj, leftOff + i);
+ final int v2 = Platform.getByte(rightObj, rightOff + i);
if (v1 != v2) {
- return v1 > v2 ? 1 : -1;
+ return (v1 & 0xff) > (v2 & 0xff) ? 1 : -1;
}
i += 1;
}
@@ -49,10 +54,17 @@ public final class RecordBinaryComparator extends RecordComparator {
// for architectures that support unaligned accesses, chew it up 8 bytes at a time
if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) {
while (i <= leftLen - 8) {
- final long v1 = Platform.getLong(leftObj, leftOff + i);
- final long v2 = Platform.getLong(rightObj, rightOff + i);
+ long v1 = Platform.getLong(leftObj, leftOff + i);
+ long v2 = Platform.getLong(rightObj, rightOff + i);
if (v1 != v2) {
- return v1 > v2 ? 1 : -1;
+ if (LITTLE_ENDIAN) {
+ // if read as little-endian, we have to reverse bytes so that the long comparison result
+ // is equivalent to byte-by-byte comparison result.
+ // See discussion in https://github.com/apache/spark/pull/26548#issuecomment-554645859
+ v1 = Long.reverseBytes(v1);
+ v2 = Long.reverseBytes(v2);
+ }
+ return Long.compareUnsigned(v1, v2);
}
i += 8;
}
@@ -60,10 +72,10 @@ public final class RecordBinaryComparator extends RecordComparator {
// this will finish off the unaligned comparisons, or do the entire aligned comparison
// whichever is needed.
while (i < leftLen) {
- final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff;
- final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff;
+ final int v1 = Platform.getByte(leftObj, leftOff + i);
+ final int v2 = Platform.getByte(rightObj, rightOff + i);
if (v1 != v2) {
- return v1 > v2 ? 1 : -1;
+ return (v1 & 0xff) > (v2 & 0xff) ? 1 : -1;
}
i += 1;
}
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
index 97f3dc5..9664638e 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
@@ -32,6 +32,7 @@ import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.util.collection.unsafe.sort.*;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -271,7 +272,7 @@ public class RecordBinaryComparatorSuite {
insertRow(row1);
insertRow(row2);
- assert(compare(0, 1) < 0);
+ assert(compare(0, 1) > 0);
}
@Test
@@ -319,4 +320,48 @@ public class RecordBinaryComparatorSuite {
assert(compare(0, 1) < 0);
}
+
+ @Test
+ public void testCompareLongsAsLittleEndian() {
+ long arrayOffset = 12;
+
+ long[] arr1 = new long[2];
+ Platform.putLong(arr1, arrayOffset, 0x0100000000000000L);
+ long[] arr2 = new long[2];
+ Platform.putLong(arr2, arrayOffset + 4, 0x0000000000000001L);
+ // leftBaseOffset is not aligned while rightBaseOffset is aligned,
+ // it will start by comparing long
+ int result1 = binaryComparator.compare(arr1, arrayOffset, 8, arr2, arrayOffset + 4, 8);
+
+ long[] arr3 = new long[2];
+ Platform.putLong(arr3, arrayOffset, 0x0100000000000000L);
+ long[] arr4 = new long[2];
+ Platform.putLong(arr4, arrayOffset, 0x0000000000000001L);
+ // both left and right offset is not aligned, it will start with byte-by-byte comparison
+ int result2 = binaryComparator.compare(arr3, arrayOffset, 8, arr4, arrayOffset, 8);
+
+ Assert.assertEquals(result1, result2);
+ }
+
+ @Test
+ public void testCompareLongsAsUnsigned() {
+ long arrayOffset = 12;
+
+ long[] arr1 = new long[2];
+ Platform.putLong(arr1, arrayOffset + 4, 0xa000000000000000L);
+ long[] arr2 = new long[2];
+ Platform.putLong(arr2, arrayOffset + 4, 0x0000000000000000L);
+ // both leftBaseOffset and rightBaseOffset are aligned, so it will start by comparing long
+ int result1 = binaryComparator.compare(arr1, arrayOffset + 4, 8, arr2, arrayOffset + 4, 8);
+
+ long[] arr3 = new long[2];
+ Platform.putLong(arr3, arrayOffset, 0xa000000000000000L);
+ long[] arr4 = new long[2];
+ Platform.putLong(arr4, arrayOffset, 0x0000000000000000L);
+ // both leftBaseOffset and rightBaseOffset are not aligned,
+ // so it will start with byte-by-byte comparison
+ int result2 = binaryComparator.compare(arr3, arrayOffset, 8, arr4, arrayOffset, 8);
+
+ Assert.assertEquals(result1, result2);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org