You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/04/06 02:14:06 UTC
[2/2] spark git commit: [SPARK-10399][CORE][SQL] Introduce multiple
MemoryBlocks to choose several types of memory block
[SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks to choose several types of memory block
## What changes were proposed in this pull request?
This PR allows us to use one of several types of `MemoryBlock`, such as byte array, int array, long array, or `java.nio.DirectByteBuffer`. To use `java.nio.DirectByteBuffer` allows to have off heap memory which is automatically deallocated by JVM. `MemoryBlock` class has primitive accessors like `Platform.getInt()`, `Platform.putint()`, or `Platform.copyMemory()`.
This PR uses `MemoryBlock` for `OffHeapColumnVector`, `UTF8String`, and other places. This PR can improve performance of operations involving memory accesses (e.g. `UTF8String.trim`) by 1.8x.
For now, this PR does not use `MemoryBlock` for `BufferHolder` based on cloud-fan's [suggestion](https://github.com/apache/spark/pull/11494#issuecomment-309694290).
Since this PR is a successor of #11494, close #11494. Many codes were ported from #11494. Many efforts were put here. **I think this PR should credit to yzotov.**
This PR can achieve **1.1-1.4x performance improvements** for operations in `UTF8String` or `Murmur3_x86_32`. Other operations are almost comparable performances.
Without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Hash byte arrays with length 268435487: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Murmur3_x86_32 526 / 536 0.0 131399881.5 1.0X
UTF8String benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
hashCode 525 / 552 1022.6 1.0 1.0X
substring 414 / 423 1298.0 0.8 1.3X
```
With this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Hash byte arrays with length 268435487: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Murmur3_x86_32 474 / 488 0.0 118552232.0 1.0X
UTF8String benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
hashCode 476 / 480 1127.3 0.9 1.0X
substring 287 / 291 1869.9 0.5 1.7X
```
Benchmark program
```
test("benchmark Murmur3_x86_32") {
val length = 8192 * 32768 + 31
val seed = 42L
val iters = 1 << 2
val random = new Random(seed)
val arrays = Array.fill[MemoryBlock](numArrays) {
val bytes = new Array[Byte](length)
random.nextBytes(bytes)
new ByteArrayMemoryBlock(bytes, Platform.BYTE_ARRAY_OFFSET, length)
}
val benchmark = new Benchmark("Hash byte arrays with length " + length,
iters * numArrays, minNumIters = 20)
benchmark.addCase("HiveHasher") { _: Int =>
var sum = 0L
for (_ <- 0L until iters) {
sum += HiveHasher.hashUnsafeBytesBlock(
arrays(i), Platform.BYTE_ARRAY_OFFSET, length)
}
}
benchmark.run()
}
test("benchmark UTF8String") {
val N = 512 * 1024 * 1024
val iters = 2
val benchmark = new Benchmark("UTF8String benchmark", N, minNumIters = 20)
val str0 = new java.io.StringWriter() { { for (i <- 0 until N) { write(" ") } } }.toString
val s0 = UTF8String.fromString(str0)
benchmark.addCase("hashCode") { _: Int =>
var h: Int = 0
for (_ <- 0L until iters) { h += s0.hashCode }
}
benchmark.addCase("substring") { _: Int =>
var s: UTF8String = null
for (_ <- 0L until iters) { s = s0.substring(N / 2 - 5, N / 2 + 5) }
}
benchmark.run()
}
```
I run [this benchmark program](https://gist.github.com/kiszk/94f75b506c93a663bbbc372ffe8f05de) using [the commit](https://github.com/apache/spark/pull/19222/commits/ee5a79861c18725fb1cd9b518cdfd2489c05b81d6). I got the following results:
```
OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Memory access benchmarks: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ByteArrayMemoryBlock get/putInt() 220 / 221 609.3 1.6 1.0X
Platform get/putInt(byte[]) 220 / 236 610.9 1.6 1.0X
Platform get/putInt(Object) 492 / 494 272.8 3.7 0.4X
OnHeapMemoryBlock get/putLong() 322 / 323 416.5 2.4 0.7X
long[] 221 / 221 608.0 1.6 1.0X
Platform get/putLong(long[]) 321 / 321 418.7 2.4 0.7X
Platform get/putLong(Object) 561 / 563 239.2 4.2 0.4X
```
I also run [this benchmark program](https://gist.github.com/kiszk/5fdb4e03733a5d110421177e289d1fb5) for comparing performance of `Platform.copyMemory()`.
```
OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Platform copyMemory: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Object to Object 1961 / 1967 8.6 116.9 1.0X
System.arraycopy Object to Object 1917 / 1921 8.8 114.3 1.0X
byte array to byte array 1961 / 1968 8.6 116.9 1.0X
System.arraycopy byte array to byte array 1909 / 1937 8.8 113.8 1.0X
int array to int array 1921 / 1990 8.7 114.5 1.0X
double array to double array 1918 / 1923 8.7 114.3 1.0X
Object to byte array 1961 / 1967 8.6 116.9 1.0X
Object to short array 1965 / 1972 8.5 117.1 1.0X
Object to int array 1910 / 1915 8.8 113.9 1.0X
Object to float array 1971 / 1978 8.5 117.5 1.0X
Object to double array 1919 / 1944 8.7 114.4 1.0X
byte array to Object 1959 / 1967 8.6 116.8 1.0X
int array to Object 1961 / 1970 8.6 116.9 1.0X
double array to Object 1917 / 1924 8.8 114.3 1.0X
```
These results show three facts:
1. According to the second/third or sixth/seventh results in the first experiment, if we use `Platform.get/putInt(Object)`, we achieve more than 2x worse performance than `Platform.get/putInt(byte[])` with concrete type (i.e. `byte[]`).
2. According to the second/third or fourth/fifth/sixth results in the first experiment, the fastest way to access an array element on Java heap is `array[]`. **Cons of `array[]` is that it is not possible to support unaligned-8byte access.**
3. According to the first/second/third or fourth/sixth/seventh results in the first experiment, `getInt()/putInt() or getLong()/putLong()` in subclasses of `MemoryBlock` can achieve comparable performance to `Platform.get/putInt()` or `Platform.get/putLong()` with concrete type (second or sixth result). There is no overhead regarding virtual call.
4. According to results in the second experiment, for `Platform.copy()`, to pass `Object` can achieve the same performance as to pass any type of primitive array as source or destination.
5. According to second/fourth results in the second experiment, `Platform.copy()` can achieve the same performance as `System.arrayCopy`. **It would be good to use `Platform.copy()` since `Platform.copy()` can take any types for src and dst.**
We are incrementally replace `Platform.get/putXXX` with `MemoryBlock.get/putXXX`. This is because we have two advantages.
1) Achieve better performance due to having a concrete type for an array.
2) Use simple OO design instead of passing `Object`
It is easy to use `MemoryBlock` in `InternalRow`, `BufferHolder`, `TaskMemoryManager`, and others that are already abstracted. It is not easy to use `MemoryBlock` in utility classes related to hashing or others.
Other candidates are
- UnsafeRow, UnsafeArrayData, UnsafeMapData, SpecificUnsafeRowJoiner
- UTF8StringBuffer
- BufferHolder
- TaskMemoryManager
- OnHeapColumnVector
- BytesToBytesMap
- CachedBatch
- classes for hash
- others.
## How was this patch tested?
Added `UnsafeMemoryAllocator`
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Closes #19222 from kiszk/SPARK-10399.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4807d381
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4807d381
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4807d381
Branch: refs/heads/master
Commit: 4807d381bb113a5c61e6dad88202f23a8b6dd141
Parents: d9ca1c9
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Authored: Fri Apr 6 10:13:59 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Apr 6 10:13:59 2018 +0800
----------------------------------------------------------------------
.../sql/catalyst/expressions/HiveHasher.java | 12 +-
.../java/org/apache/spark/unsafe/Platform.java | 2 +-
.../spark/unsafe/array/ByteArrayMethods.java | 13 +-
.../apache/spark/unsafe/array/LongArray.java | 17 +-
.../spark/unsafe/hash/Murmur3_x86_32.java | 45 +++--
.../unsafe/memory/ByteArrayMemoryBlock.java | 128 ++++++++++++++
.../unsafe/memory/HeapMemoryAllocator.java | 19 +-
.../spark/unsafe/memory/MemoryAllocator.java | 4 +-
.../apache/spark/unsafe/memory/MemoryBlock.java | 157 +++++++++++++++--
.../spark/unsafe/memory/MemoryLocation.java | 54 ------
.../spark/unsafe/memory/OffHeapMemoryBlock.java | 105 +++++++++++
.../spark/unsafe/memory/OnHeapMemoryBlock.java | 132 ++++++++++++++
.../unsafe/memory/UnsafeMemoryAllocator.java | 21 ++-
.../apache/spark/unsafe/types/UTF8String.java | 148 ++++++++--------
.../apache/spark/unsafe/PlatformUtilSuite.java | 4 +-
.../spark/unsafe/array/LongArraySuite.java | 5 +-
.../spark/unsafe/hash/Murmur3_x86_32Suite.java | 18 ++
.../spark/unsafe/memory/MemoryBlockSuite.java | 175 +++++++++++++++++++
.../spark/unsafe/types/UTF8StringSuite.java | 29 +--
.../apache/spark/memory/TaskMemoryManager.java | 22 +--
.../shuffle/sort/ShuffleInMemorySorter.java | 14 +-
.../shuffle/sort/ShuffleSortDataFormat.java | 11 +-
.../unsafe/sort/UnsafeExternalSorter.java | 2 +-
.../unsafe/sort/UnsafeInMemorySorter.java | 13 +-
.../spark/memory/TaskMemoryManagerSuite.java | 2 +-
.../util/collection/ExternalSorterSuite.scala | 7 +-
.../collection/unsafe/sort/RadixSortSuite.scala | 10 +-
.../apache/spark/ml/feature/FeatureHasher.scala | 5 +-
.../apache/spark/mllib/feature/HashingTF.scala | 2 +-
.../catalyst/expressions/UnsafeArrayData.java | 4 +-
.../sql/catalyst/expressions/UnsafeRow.java | 4 +-
.../spark/sql/catalyst/expressions/XXH64.java | 46 +++--
.../spark/sql/catalyst/expressions/hash.scala | 39 +++--
.../catalyst/expressions/HiveHasherSuite.java | 20 ++-
.../sql/catalyst/expressions/XXH64Suite.java | 18 +-
.../vectorized/OffHeapColumnVector.java | 3 +-
.../spark/sql/vectorized/ArrowColumnVector.java | 6 +-
.../sql/execution/benchmark/SortBenchmark.scala | 16 +-
.../sql/execution/python/RowQueueSuite.scala | 4 +-
39 files changed, 1002 insertions(+), 334 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
index 7357743..5d90594 100644
--- a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
+++ b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.memory.MemoryBlock;
/**
* Simulates Hive's hashing function from Hive v1.2.1
@@ -38,12 +39,17 @@ public class HiveHasher {
return (int) ((input >>> 32) ^ input);
}
- public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
+ public static int hashUnsafeBytesBlock(MemoryBlock mb) {
+ long lengthInBytes = mb.size();
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int result = 0;
- for (int i = 0; i < lengthInBytes; i++) {
- result = (result * 31) + (int) Platform.getByte(base, offset + i);
+ for (long i = 0; i < lengthInBytes; i++) {
+ result = (result * 31) + (int) mb.getByte(i);
}
return result;
}
+
+ public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
+ return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes));
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index aca6fca..54dcadf 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -187,7 +187,7 @@ public final class Platform {
}
public static void copyMemory(
- Object src, long srcOffset, Object dst, long dstOffset, long length) {
+ Object src, long srcOffset, Object dst, long dstOffset, long length) {
// Check if dstOffset is before or after srcOffset to determine if we should copy
// forward or backwards. This is necessary in case src and dst overlap.
if (dstOffset < srcOffset) {
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
index a6b1f7a..c334c96 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
@@ -18,6 +18,7 @@
package org.apache.spark.unsafe.array;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.memory.MemoryBlock;
public class ByteArrayMethods {
@@ -49,6 +50,16 @@ public class ByteArrayMethods {
private static final boolean unaligned = Platform.unaligned();
/**
+ * MemoryBlock equality check for MemoryBlocks.
+ * @return true if the arrays are equal, false otherwise
+ */
+ public static boolean arrayEqualsBlock(
+ MemoryBlock leftBase, long leftOffset, MemoryBlock rightBase, long rightOffset, final long length) {
+ return arrayEquals(leftBase.getBaseObject(), leftBase.getBaseOffset() + leftOffset,
+ rightBase.getBaseObject(), rightBase.getBaseOffset() + rightOffset, length);
+ }
+
+ /**
* Optimized byte array equality check for byte arrays.
* @return true if the arrays are equal, false otherwise
*/
@@ -56,7 +67,7 @@ public class ByteArrayMethods {
Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
int i = 0;
- // check if stars align and we can get both offsets to be aligned
+ // check if starts align and we can get both offsets to be aligned
if ((leftOffset % 8) == (rightOffset % 8)) {
while ((leftOffset + i) % 8 != 0 && i < length) {
if (Platform.getByte(leftBase, leftOffset + i) !=
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
index 2cd39bd..b74d2de 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
@@ -17,7 +17,6 @@
package org.apache.spark.unsafe.array;
-import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
/**
@@ -33,16 +32,12 @@ public final class LongArray {
private static final long WIDTH = 8;
private final MemoryBlock memory;
- private final Object baseObj;
- private final long baseOffset;
private final long length;
public LongArray(MemoryBlock memory) {
assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
this.memory = memory;
- this.baseObj = memory.getBaseObject();
- this.baseOffset = memory.getBaseOffset();
this.length = memory.size() / WIDTH;
}
@@ -51,11 +46,11 @@ public final class LongArray {
}
public Object getBaseObject() {
- return baseObj;
+ return memory.getBaseObject();
}
public long getBaseOffset() {
- return baseOffset;
+ return memory.getBaseOffset();
}
/**
@@ -69,8 +64,8 @@ public final class LongArray {
* Fill this all with 0L.
*/
public void zeroOut() {
- for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) {
- Platform.putLong(baseObj, off, 0);
+ for (long off = 0; off < length * WIDTH; off += WIDTH) {
+ memory.putLong(off, 0);
}
}
@@ -80,7 +75,7 @@ public final class LongArray {
public void set(int index, long value) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < length : "index (" + index + ") should < length (" + length + ")";
- Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
+ memory.putLong(index * WIDTH, value);
}
/**
@@ -89,6 +84,6 @@ public final class LongArray {
public long get(int index) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < length : "index (" + index + ") should < length (" + length + ")";
- return Platform.getLong(baseObj, baseOffset + index * WIDTH);
+ return memory.getLong(index * WIDTH);
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
index d239de6..f372b19 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
@@ -17,7 +17,9 @@
package org.apache.spark.unsafe.hash;
-import org.apache.spark.unsafe.Platform;
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.memory.MemoryBlock;
/**
* 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction.
@@ -49,49 +51,66 @@ public final class Murmur3_x86_32 {
}
public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
- return hashUnsafeWords(base, offset, lengthInBytes, seed);
+ return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}
- public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
+ public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+ int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)";
- int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
+ int h1 = hashBytesByIntBlock(base, seed);
return fmix(h1, lengthInBytes);
}
- public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
+ public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
+ // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+ return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
+ }
+
+ public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) {
// This is not compatible with original and another implementations.
// But remain it for backward compatibility for the components existing before 2.3.
+ int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
- int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
+ int h1 = hashBytesByIntBlock(base.subBlock(0, lengthAligned), seed);
for (int i = lengthAligned; i < lengthInBytes; i++) {
- int halfWord = Platform.getByte(base, offset + i);
+ int halfWord = base.getByte(i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
return fmix(h1, lengthInBytes);
}
+ public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
+ return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
+ }
+
public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
+ return hashUnsafeBytes2Block(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
+ }
+
+ public static int hashUnsafeBytes2Block(MemoryBlock base, int seed) {
// This is compatible with original and another implementations.
// Use this method for new components after Spark 2.3.
- assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
+ int lengthInBytes = Ints.checkedCast(base.size());
+ assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
- int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
+ int h1 = hashBytesByIntBlock(base.subBlock(0, lengthAligned), seed);
int k1 = 0;
for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
- k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
+ k1 ^= (base.getByte(i) & 0xFF) << shift;
}
h1 ^= mixK1(k1);
return fmix(h1, lengthInBytes);
}
- private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
+ private static int hashBytesByIntBlock(MemoryBlock base, int seed) {
+ long lengthInBytes = base.size();
assert (lengthInBytes % 4 == 0);
int h1 = seed;
- for (int i = 0; i < lengthInBytes; i += 4) {
- int halfWord = Platform.getInt(base, offset + i);
+ for (long i = 0; i < lengthInBytes; i += 4) {
+ int halfWord = base.getInt(i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
new file mode 100644
index 0000000..99a9868
--- /dev/null
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a byte array on Java heap.
+ */
+public final class ByteArrayMemoryBlock extends MemoryBlock {
+
+ private final byte[] array;
+
+ public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
+ super(obj, offset, size);
+ this.array = obj;
+ assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
+ "The sum of size " + size + " and offset " + offset + " should not be larger than " +
+ "the size of the given memory space " + (obj.length + Platform.BYTE_ARRAY_OFFSET);
+ }
+
+ public ByteArrayMemoryBlock(long length) {
+ this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, length);
+ }
+
+ @Override
+ public MemoryBlock subBlock(long offset, long size) {
+ checkSubBlockRange(offset, size);
+ if (offset == 0 && size == this.size()) return this;
+ return new ByteArrayMemoryBlock(array, this.offset + offset, size);
+ }
+
+ public byte[] getByteArray() { return array; }
+
+ /**
+ * Creates a memory block pointing to the memory used by the byte array.
+ */
+ public static ByteArrayMemoryBlock fromArray(final byte[] array) {
+ return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length);
+ }
+
+ @Override
+ public final int getInt(long offset) {
+ return Platform.getInt(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putInt(long offset, int value) {
+ Platform.putInt(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final boolean getBoolean(long offset) {
+ return Platform.getBoolean(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putBoolean(long offset, boolean value) {
+ Platform.putBoolean(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final byte getByte(long offset) {
+ return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)];
+ }
+
+ @Override
+ public final void putByte(long offset, byte value) {
+ array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)] = value;
+ }
+
+ @Override
+ public final short getShort(long offset) {
+ return Platform.getShort(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putShort(long offset, short value) {
+ Platform.putShort(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final long getLong(long offset) {
+ return Platform.getLong(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putLong(long offset, long value) {
+ Platform.putLong(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final float getFloat(long offset) {
+ return Platform.getFloat(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putFloat(long offset, float value) {
+ Platform.putFloat(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final double getDouble(long offset) {
+ return Platform.getDouble(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putDouble(long offset, double value) {
+ Platform.putDouble(array, this.offset + offset, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
index 2733760..acf28fd 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
@@ -58,7 +58,7 @@ public class HeapMemoryAllocator implements MemoryAllocator {
final long[] array = arrayReference.get();
if (array != null) {
assert (array.length * 8L >= size);
- MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
+ MemoryBlock memory = OnHeapMemoryBlock.fromArray(array, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
@@ -70,7 +70,7 @@ public class HeapMemoryAllocator implements MemoryAllocator {
}
}
long[] array = new long[numWords];
- MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
+ MemoryBlock memory = OnHeapMemoryBlock.fromArray(array, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
@@ -79,12 +79,13 @@ public class HeapMemoryAllocator implements MemoryAllocator {
@Override
public void free(MemoryBlock memory) {
- assert (memory.obj != null) :
+ assert(memory instanceof OnHeapMemoryBlock);
+ assert (memory.getBaseObject() != null) :
"baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?";
- assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+ assert (memory.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"page has already been freed";
- assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
- || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+ assert ((memory.getPageNumber() == MemoryBlock.NO_PAGE_NUMBER)
+ || (memory.getPageNumber() == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
"TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator " +
"free()";
@@ -94,12 +95,12 @@ public class HeapMemoryAllocator implements MemoryAllocator {
}
// Mark the page as freed (so we can detect double-frees).
- memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
+ memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER);
// As an additional layer of defense against use-after-free bugs, we mutate the
// MemoryBlock to null out its reference to the long[] array.
- long[] array = (long[]) memory.obj;
- memory.setObjAndOffset(null, 0);
+ long[] array = ((OnHeapMemoryBlock)memory).getLongArray();
+ memory.resetObjAndOffset();
long alignedSize = ((size + 7) / 8) * 8;
if (shouldPool(alignedSize)) {
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
index 7b58868..38315fb 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
@@ -38,7 +38,7 @@ public interface MemoryAllocator {
void free(MemoryBlock memory);
- MemoryAllocator UNSAFE = new UnsafeMemoryAllocator();
+ UnsafeMemoryAllocator UNSAFE = new UnsafeMemoryAllocator();
- MemoryAllocator HEAP = new HeapMemoryAllocator();
+ HeapMemoryAllocator HEAP = new HeapMemoryAllocator();
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
index c333857..b086941 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
@@ -22,10 +22,10 @@ import javax.annotation.Nullable;
import org.apache.spark.unsafe.Platform;
/**
- * A consecutive block of memory, starting at a {@link MemoryLocation} with a fixed size.
+ * A representation of a consecutive memory block in Spark. It defines the common interfaces
+ * for memory accessing and mutating.
*/
-public class MemoryBlock extends MemoryLocation {
-
+public abstract class MemoryBlock {
/** Special `pageNumber` value for pages which were not allocated by TaskMemoryManagers */
public static final int NO_PAGE_NUMBER = -1;
@@ -45,38 +45,163 @@ public class MemoryBlock extends MemoryLocation {
*/
public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
- private final long length;
+ @Nullable
+ protected Object obj;
+
+ protected long offset;
+
+ protected long length;
/**
* Optional page number; used when this MemoryBlock represents a page allocated by a
- * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager,
- * which lives in a different package.
+ * TaskMemoryManager. This field can be updated using setPageNumber method so that
+ * this can be modified by the TaskMemoryManager, which lives in a different package.
*/
- public int pageNumber = NO_PAGE_NUMBER;
+ private int pageNumber = NO_PAGE_NUMBER;
- public MemoryBlock(@Nullable Object obj, long offset, long length) {
- super(obj, offset);
+ protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+ if (offset < 0 || length < 0) {
+ throw new IllegalArgumentException(
+ "Length " + length + " and offset " + offset + "must be non-negative");
+ }
+ this.obj = obj;
+ this.offset = offset;
this.length = length;
}
+ protected MemoryBlock() {
+ this(null, 0, 0);
+ }
+
+ public final Object getBaseObject() {
+ return obj;
+ }
+
+ public final long getBaseOffset() {
+ return offset;
+ }
+
+ public void resetObjAndOffset() {
+ this.obj = null;
+ this.offset = 0;
+ }
+
/**
* Returns the size of the memory block.
*/
- public long size() {
+ public final long size() {
return length;
}
- /**
- * Creates a memory block pointing to the memory used by the long array.
- */
- public static MemoryBlock fromLongArray(final long[] array) {
- return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
+ public final void setPageNumber(int pageNum) {
+ pageNumber = pageNum;
+ }
+
+ public final int getPageNumber() {
+ return pageNumber;
}
/**
* Fills the memory block with the specified byte value.
*/
- public void fill(byte value) {
+ public final void fill(byte value) {
Platform.setMemory(obj, offset, length, value);
}
+
+ /**
+ * Instantiate MemoryBlock for given object type with new offset
+ */
+ public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) {
+ MemoryBlock mb = null;
+ if (obj instanceof byte[]) {
+ byte[] array = (byte[])obj;
+ mb = new ByteArrayMemoryBlock(array, offset, length);
+ } else if (obj instanceof long[]) {
+ long[] array = (long[])obj;
+ mb = new OnHeapMemoryBlock(array, offset, length);
+ } else if (obj == null) {
+ // we assume that to pass null pointer means off-heap
+ mb = new OffHeapMemoryBlock(offset, length);
+ } else {
+ throw new UnsupportedOperationException(
+ "Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now");
+ }
+ return mb;
+ }
+
+ /**
+ * Just instantiate the sub-block with the same type of MemoryBlock with the new size and relative
+ * offset from the original offset. The data is not copied.
+ * If parameters are invalid, an exception is thrown.
+ */
+ public abstract MemoryBlock subBlock(long offset, long size);
+
+ protected void checkSubBlockRange(long offset, long size) {
+ if (offset < 0 || size < 0) {
+ throw new ArrayIndexOutOfBoundsException(
+ "Size " + size + " and offset " + offset + " must be non-negative");
+ }
+ if (offset + size > length) {
+ throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " +
+ offset + " should not be larger than the length " + length + " in the MemoryBlock");
+ }
+ }
+
+ /**
+ * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal
+ * memory access, throw an exception, or etc.
+ * getXXX/putXXX uses an index based on this.offset that includes the size of metadata such as
+ * JVM object header. The offset is 0-based and is expected as an logical offset in the memory
+ * block.
+ */
+ public abstract int getInt(long offset);
+
+ public abstract void putInt(long offset, int value);
+
+ public abstract boolean getBoolean(long offset);
+
+ public abstract void putBoolean(long offset, boolean value);
+
+ public abstract byte getByte(long offset);
+
+ public abstract void putByte(long offset, byte value);
+
+ public abstract short getShort(long offset);
+
+ public abstract void putShort(long offset, short value);
+
+ public abstract long getLong(long offset);
+
+ public abstract void putLong(long offset, long value);
+
+ public abstract float getFloat(long offset);
+
+ public abstract void putFloat(long offset, float value);
+
+ public abstract double getDouble(long offset);
+
+ public abstract void putDouble(long offset, double value);
+
+ public static final void copyMemory(
+ MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset, long length) {
+ assert(srcOffset + length <= src.length && dstOffset + length <= dst.length);
+ Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() + srcOffset,
+ dst.getBaseObject(), dst.getBaseOffset() + dstOffset, length);
+ }
+
+ public static final void copyMemory(MemoryBlock src, MemoryBlock dst, long length) {
+ assert(length <= src.length && length <= dst.length);
+ Platform.copyMemory(src.getBaseObject(), src.getBaseOffset(),
+ dst.getBaseObject(), dst.getBaseOffset(), length);
+ }
+
+ public final void copyFrom(Object src, long srcOffset, long dstOffset, long length) {
+ assert(length <= this.length - srcOffset);
+ Platform.copyMemory(src, srcOffset, obj, offset + dstOffset, length);
+ }
+
+ public final void writeTo(long srcOffset, Object dst, long dstOffset, long length) {
+ assert(length <= this.length - srcOffset);
+ Platform.copyMemory(obj, offset + srcOffset, dst, dstOffset, length);
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java
deleted file mode 100644
index 74ebc87..0000000
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.memory;
-
-import javax.annotation.Nullable;
-
-/**
- * A memory location. Tracked either by a memory address (with off-heap allocation),
- * or by an offset from a JVM object (in-heap allocation).
- */
-public class MemoryLocation {
-
- @Nullable
- Object obj;
-
- long offset;
-
- public MemoryLocation(@Nullable Object obj, long offset) {
- this.obj = obj;
- this.offset = offset;
- }
-
- public MemoryLocation() {
- this(null, 0);
- }
-
- public void setObjAndOffset(Object newObj, long newOffset) {
- this.obj = newObj;
- this.offset = newOffset;
- }
-
- public final Object getBaseObject() {
- return obj;
- }
-
- public final long getBaseOffset() {
- return offset;
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java
new file mode 100644
index 0000000..f90f62b
--- /dev/null
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import org.apache.spark.unsafe.Platform;
+
+public class OffHeapMemoryBlock extends MemoryBlock {
+ static public final OffHeapMemoryBlock NULL = new OffHeapMemoryBlock(0, 0);
+
+ public OffHeapMemoryBlock(long address, long size) {
+ super(null, address, size);
+ }
+
+ @Override
+ public MemoryBlock subBlock(long offset, long size) {
+ checkSubBlockRange(offset, size);
+ if (offset == 0 && size == this.size()) return this;
+ return new OffHeapMemoryBlock(this.offset + offset, size);
+ }
+
+ @Override
+ public final int getInt(long offset) {
+ return Platform.getInt(null, this.offset + offset);
+ }
+
+ @Override
+ public final void putInt(long offset, int value) {
+ Platform.putInt(null, this.offset + offset, value);
+ }
+
+ @Override
+ public final boolean getBoolean(long offset) {
+ return Platform.getBoolean(null, this.offset + offset);
+ }
+
+ @Override
+ public final void putBoolean(long offset, boolean value) {
+ Platform.putBoolean(null, this.offset + offset, value);
+ }
+
+ @Override
+ public final byte getByte(long offset) {
+ return Platform.getByte(null, this.offset + offset);
+ }
+
+ @Override
+ public final void putByte(long offset, byte value) {
+ Platform.putByte(null, this.offset + offset, value);
+ }
+
+ @Override
+ public final short getShort(long offset) {
+ return Platform.getShort(null, this.offset + offset);
+ }
+
+ @Override
+ public final void putShort(long offset, short value) {
+ Platform.putShort(null, this.offset + offset, value);
+ }
+
+ @Override
+ public final long getLong(long offset) {
+ return Platform.getLong(null, this.offset + offset);
+ }
+
+ @Override
+ public final void putLong(long offset, long value) {
+ Platform.putLong(null, this.offset + offset, value);
+ }
+
+ @Override
+ public final float getFloat(long offset) {
+ return Platform.getFloat(null, this.offset + offset);
+ }
+
+ @Override
+ public final void putFloat(long offset, float value) {
+ Platform.putFloat(null, this.offset + offset, value);
+ }
+
+ @Override
+ public final double getDouble(long offset) {
+ return Platform.getDouble(null, this.offset + offset);
+ }
+
+ @Override
+ public final void putDouble(long offset, double value) {
+ Platform.putDouble(null, this.offset + offset, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
new file mode 100644
index 0000000..12f67c7
--- /dev/null
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a long array on Java heap.
+ */
+public final class OnHeapMemoryBlock extends MemoryBlock {
+
+ private final long[] array;
+
+ public OnHeapMemoryBlock(long[] obj, long offset, long size) {
+ super(obj, offset, size);
+ this.array = obj;
+ assert(offset + size <= obj.length * 8L + Platform.LONG_ARRAY_OFFSET) :
+ "The sum of size " + size + " and offset " + offset + " should not be larger than " +
+ "the size of the given memory space " + (obj.length * 8L + Platform.LONG_ARRAY_OFFSET);
+ }
+
+ public OnHeapMemoryBlock(long size) {
+ this(new long[Ints.checkedCast((size + 7) / 8)], Platform.LONG_ARRAY_OFFSET, size);
+ }
+
+ @Override
+ public MemoryBlock subBlock(long offset, long size) {
+ checkSubBlockRange(offset, size);
+ if (offset == 0 && size == this.size()) return this;
+ return new OnHeapMemoryBlock(array, this.offset + offset, size);
+ }
+
+ public long[] getLongArray() { return array; }
+
+ /**
+ * Creates a memory block pointing to the memory used by the long array.
+ */
+ public static OnHeapMemoryBlock fromArray(final long[] array) {
+ return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
+ }
+
+ public static OnHeapMemoryBlock fromArray(final long[] array, long size) {
+ return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
+ }
+
+ @Override
+ public final int getInt(long offset) {
+ return Platform.getInt(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putInt(long offset, int value) {
+ Platform.putInt(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final boolean getBoolean(long offset) {
+ return Platform.getBoolean(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putBoolean(long offset, boolean value) {
+ Platform.putBoolean(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final byte getByte(long offset) {
+ return Platform.getByte(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putByte(long offset, byte value) {
+ Platform.putByte(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final short getShort(long offset) {
+ return Platform.getShort(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putShort(long offset, short value) {
+ Platform.putShort(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final long getLong(long offset) {
+ return Platform.getLong(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putLong(long offset, long value) {
+ Platform.putLong(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final float getFloat(long offset) {
+ return Platform.getFloat(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putFloat(long offset, float value) {
+ Platform.putFloat(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final double getDouble(long offset) {
+ return Platform.getDouble(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putDouble(long offset, double value) {
+ Platform.putDouble(array, this.offset + offset, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
index 4368fb6..5310bdf 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
@@ -25,9 +25,9 @@ import org.apache.spark.unsafe.Platform;
public class UnsafeMemoryAllocator implements MemoryAllocator {
@Override
- public MemoryBlock allocate(long size) throws OutOfMemoryError {
+ public OffHeapMemoryBlock allocate(long size) throws OutOfMemoryError {
long address = Platform.allocateMemory(size);
- MemoryBlock memory = new MemoryBlock(null, address, size);
+ OffHeapMemoryBlock memory = new OffHeapMemoryBlock(address, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
@@ -36,22 +36,25 @@ public class UnsafeMemoryAllocator implements MemoryAllocator {
@Override
public void free(MemoryBlock memory) {
- assert (memory.obj == null) :
- "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?";
- assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+ assert(memory instanceof OffHeapMemoryBlock) :
+ "UnsafeMemoryAllocator can only free OffHeapMemoryBlock.";
+ if (memory == OffHeapMemoryBlock.NULL) return;
+ assert (memory.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"page has already been freed";
- assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
- || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+ assert ((memory.getPageNumber() == MemoryBlock.NO_PAGE_NUMBER)
+ || (memory.getPageNumber() == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
"TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()";
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}
+
Platform.freeMemory(memory.offset);
+
// As an additional layer of defense against use-after-free bugs, we mutate the
// MemoryBlock to reset its pointer.
- memory.offset = 0;
+ memory.resetObjAndOffset();
// Mark the page as freed (so we can detect double-frees).
- memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
+ memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER);
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index 5d468ae..e9b3d9b 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -30,9 +30,12 @@ import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import com.google.common.primitives.Ints;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
+import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
+import org.apache.spark.unsafe.memory.MemoryBlock;
import static org.apache.spark.unsafe.Platform.*;
@@ -50,12 +53,13 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
// These are only updated by readExternal() or read()
@Nonnull
- private Object base;
- private long offset;
+ private MemoryBlock base;
+ // While numBytes has the same value as base.size(), to keep as int avoids cast from long to int
private int numBytes;
- public Object getBaseObject() { return base; }
- public long getBaseOffset() { return offset; }
+ public MemoryBlock getMemoryBlock() { return base; }
+ public Object getBaseObject() { return base.getBaseObject(); }
+ public long getBaseOffset() { return base.getBaseOffset(); }
/**
* A char in UTF-8 encoding can take 1-4 bytes depending on the first byte which
@@ -108,7 +112,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
*/
public static UTF8String fromBytes(byte[] bytes) {
if (bytes != null) {
- return new UTF8String(bytes, BYTE_ARRAY_OFFSET, bytes.length);
+ return new UTF8String(
+ new ByteArrayMemoryBlock(bytes, BYTE_ARRAY_OFFSET, bytes.length));
} else {
return null;
}
@@ -121,20 +126,14 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
*/
public static UTF8String fromBytes(byte[] bytes, int offset, int numBytes) {
if (bytes != null) {
- return new UTF8String(bytes, BYTE_ARRAY_OFFSET + offset, numBytes);
+ return new UTF8String(
+ new ByteArrayMemoryBlock(bytes, BYTE_ARRAY_OFFSET + offset, numBytes));
} else {
return null;
}
}
/**
- * Creates an UTF8String from given address (base and offset) and length.
- */
- public static UTF8String fromAddress(Object base, long offset, int numBytes) {
- return new UTF8String(base, offset, numBytes);
- }
-
- /**
* Creates an UTF8String from String.
*/
public static UTF8String fromString(String str) {
@@ -150,16 +149,13 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
return fromBytes(spaces);
}
- protected UTF8String(Object base, long offset, int numBytes) {
+ public UTF8String(MemoryBlock base) {
this.base = base;
- this.offset = offset;
- this.numBytes = numBytes;
+ this.numBytes = Ints.checkedCast(base.size());
}
// for serialization
- public UTF8String() {
- this(null, 0, 0);
- }
+ public UTF8String() {}
/**
* Writes the content of this string into a memory address, identified by an object and an offset.
@@ -167,7 +163,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
* bytes in this string.
*/
public void writeToMemory(Object target, long targetOffset) {
- Platform.copyMemory(base, offset, target, targetOffset, numBytes);
+ base.writeTo(0, target, targetOffset, numBytes);
}
public void writeTo(ByteBuffer buffer) {
@@ -187,8 +183,9 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
*/
@Nonnull
public ByteBuffer getByteBuffer() {
- if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) {
- final byte[] bytes = (byte[]) base;
+ long offset = base.getBaseOffset();
+ if (base instanceof ByteArrayMemoryBlock && offset >= BYTE_ARRAY_OFFSET) {
+ final byte[] bytes = ((ByteArrayMemoryBlock) base).getByteArray();
// the offset includes an object header... this is only needed for unsafe copies
final long arrayOffset = offset - BYTE_ARRAY_OFFSET;
@@ -255,12 +252,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
long mask = 0;
if (IS_LITTLE_ENDIAN) {
if (numBytes >= 8) {
- p = Platform.getLong(base, offset);
+ p = base.getLong(0);
} else if (numBytes > 4) {
- p = Platform.getLong(base, offset);
+ p = base.getLong(0);
mask = (1L << (8 - numBytes) * 8) - 1;
} else if (numBytes > 0) {
- p = (long) Platform.getInt(base, offset);
+ p = (long) base.getInt(0);
mask = (1L << (8 - numBytes) * 8) - 1;
} else {
p = 0;
@@ -269,12 +266,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
} else {
// byteOrder == ByteOrder.BIG_ENDIAN
if (numBytes >= 8) {
- p = Platform.getLong(base, offset);
+ p = base.getLong(0);
} else if (numBytes > 4) {
- p = Platform.getLong(base, offset);
+ p = base.getLong(0);
mask = (1L << (8 - numBytes) * 8) - 1;
} else if (numBytes > 0) {
- p = ((long) Platform.getInt(base, offset)) << 32;
+ p = ((long) base.getInt(0)) << 32;
mask = (1L << (8 - numBytes) * 8) - 1;
} else {
p = 0;
@@ -289,12 +286,13 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
*/
public byte[] getBytes() {
// avoid copy if `base` is `byte[]`
- if (offset == BYTE_ARRAY_OFFSET && base instanceof byte[]
- && ((byte[]) base).length == numBytes) {
- return (byte[]) base;
+ long offset = base.getBaseOffset();
+ if (offset == BYTE_ARRAY_OFFSET && base instanceof ByteArrayMemoryBlock
+ && (((ByteArrayMemoryBlock) base).getByteArray()).length == numBytes) {
+ return ((ByteArrayMemoryBlock) base).getByteArray();
} else {
byte[] bytes = new byte[numBytes];
- copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, numBytes);
+ base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, numBytes);
return bytes;
}
}
@@ -324,7 +322,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
if (i > j) {
byte[] bytes = new byte[i - j];
- copyMemory(base, offset + j, bytes, BYTE_ARRAY_OFFSET, i - j);
+ base.writeTo(j, bytes, BYTE_ARRAY_OFFSET, i - j);
return fromBytes(bytes);
} else {
return EMPTY_UTF8;
@@ -365,14 +363,14 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
* Returns the byte at position `i`.
*/
private byte getByte(int i) {
- return Platform.getByte(base, offset + i);
+ return base.getByte(i);
}
private boolean matchAt(final UTF8String s, int pos) {
if (s.numBytes + pos > numBytes || pos < 0) {
return false;
}
- return ByteArrayMethods.arrayEquals(base, offset + pos, s.base, s.offset, s.numBytes);
+ return ByteArrayMethods.arrayEqualsBlock(base, pos, s.base, 0, s.numBytes);
}
public boolean startsWith(final UTF8String prefix) {
@@ -499,8 +497,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
for (int i = 0; i < numBytes; i++) {
if (getByte(i) == (byte) ',') {
if (i - (lastComma + 1) == match.numBytes &&
- ByteArrayMethods.arrayEquals(base, offset + (lastComma + 1), match.base, match.offset,
- match.numBytes)) {
+ ByteArrayMethods.arrayEqualsBlock(base, lastComma + 1, match.base, 0, match.numBytes)) {
return n;
}
lastComma = i;
@@ -508,8 +505,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
}
if (numBytes - (lastComma + 1) == match.numBytes &&
- ByteArrayMethods.arrayEquals(base, offset + (lastComma + 1), match.base, match.offset,
- match.numBytes)) {
+ ByteArrayMethods.arrayEqualsBlock(base, lastComma + 1, match.base, 0, match.numBytes)) {
return n;
}
return 0;
@@ -524,7 +520,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
private UTF8String copyUTF8String(int start, int end) {
int len = end - start + 1;
byte[] newBytes = new byte[len];
- copyMemory(base, offset + start, newBytes, BYTE_ARRAY_OFFSET, len);
+ base.writeTo(start, newBytes, BYTE_ARRAY_OFFSET, len);
return UTF8String.fromBytes(newBytes);
}
@@ -671,8 +667,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
int i = 0; // position in byte
while (i < numBytes) {
int len = numBytesForFirstByte(getByte(i));
- copyMemory(this.base, this.offset + i, result,
- BYTE_ARRAY_OFFSET + result.length - i - len, len);
+ base.writeTo(i, result, BYTE_ARRAY_OFFSET + result.length - i - len, len);
i += len;
}
@@ -686,7 +681,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
byte[] newBytes = new byte[numBytes * times];
- copyMemory(this.base, this.offset, newBytes, BYTE_ARRAY_OFFSET, numBytes);
+ base.writeTo(0, newBytes, BYTE_ARRAY_OFFSET, numBytes);
int copied = 1;
while (copied < times) {
@@ -723,7 +718,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
if (i + v.numBytes > numBytes) {
return -1;
}
- if (ByteArrayMethods.arrayEquals(base, offset + i, v.base, v.offset, v.numBytes)) {
+ if (ByteArrayMethods.arrayEqualsBlock(base, i, v.base, 0, v.numBytes)) {
return c;
}
i += numBytesForFirstByte(getByte(i));
@@ -739,7 +734,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
private int find(UTF8String str, int start) {
assert (str.numBytes > 0);
while (start <= numBytes - str.numBytes) {
- if (ByteArrayMethods.arrayEquals(base, offset + start, str.base, str.offset, str.numBytes)) {
+ if (ByteArrayMethods.arrayEqualsBlock(base, start, str.base, 0, str.numBytes)) {
return start;
}
start += 1;
@@ -753,7 +748,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
private int rfind(UTF8String str, int start) {
assert (str.numBytes > 0);
while (start >= 0) {
- if (ByteArrayMethods.arrayEquals(base, offset + start, str.base, str.offset, str.numBytes)) {
+ if (ByteArrayMethods.arrayEqualsBlock(base, start, str.base, 0, str.numBytes)) {
return start;
}
start -= 1;
@@ -786,7 +781,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
return EMPTY_UTF8;
}
byte[] bytes = new byte[idx];
- copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, idx);
+ base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, idx);
return fromBytes(bytes);
} else {
@@ -806,7 +801,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
int size = numBytes - delim.numBytes - idx;
byte[] bytes = new byte[size];
- copyMemory(base, offset + idx + delim.numBytes, bytes, BYTE_ARRAY_OFFSET, size);
+ base.writeTo(idx + delim.numBytes, bytes, BYTE_ARRAY_OFFSET, size);
return fromBytes(bytes);
}
}
@@ -829,15 +824,15 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
UTF8String remain = pad.substring(0, spaces - padChars * count);
byte[] data = new byte[this.numBytes + pad.numBytes * count + remain.numBytes];
- copyMemory(this.base, this.offset, data, BYTE_ARRAY_OFFSET, this.numBytes);
+ base.writeTo(0, data, BYTE_ARRAY_OFFSET, this.numBytes);
int offset = this.numBytes;
int idx = 0;
while (idx < count) {
- copyMemory(pad.base, pad.offset, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
+ pad.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
++ idx;
offset += pad.numBytes;
}
- copyMemory(remain.base, remain.offset, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
+ remain.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
return UTF8String.fromBytes(data);
}
@@ -865,13 +860,13 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
int offset = 0;
int idx = 0;
while (idx < count) {
- copyMemory(pad.base, pad.offset, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
+ pad.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
++ idx;
offset += pad.numBytes;
}
- copyMemory(remain.base, remain.offset, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
+ remain.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
offset += remain.numBytes;
- copyMemory(this.base, this.offset, data, BYTE_ARRAY_OFFSET + offset, numBytes());
+ base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, numBytes());
return UTF8String.fromBytes(data);
}
@@ -896,8 +891,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
int offset = 0;
for (int i = 0; i < inputs.length; i++) {
int len = inputs[i].numBytes;
- copyMemory(
- inputs[i].base, inputs[i].offset,
+ inputs[i].base.writeTo(
+ 0,
result, BYTE_ARRAY_OFFSET + offset,
len);
offset += len;
@@ -936,8 +931,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
for (int i = 0, j = 0; i < inputs.length; i++) {
if (inputs[i] != null) {
int len = inputs[i].numBytes;
- copyMemory(
- inputs[i].base, inputs[i].offset,
+ inputs[i].base.writeTo(
+ 0,
result, BYTE_ARRAY_OFFSET + offset,
len);
offset += len;
@@ -945,8 +940,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
j++;
// Add separator if this is not the last input.
if (j < numInputs) {
- copyMemory(
- separator.base, separator.offset,
+ separator.base.writeTo(
+ 0,
result, BYTE_ARRAY_OFFSET + offset,
separator.numBytes);
offset += separator.numBytes;
@@ -1220,7 +1215,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
public UTF8String copy() {
byte[] bytes = new byte[numBytes];
- copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, numBytes);
+ base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, numBytes);
return fromBytes(bytes);
}
@@ -1228,11 +1223,10 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
public int compareTo(@Nonnull final UTF8String other) {
int len = Math.min(numBytes, other.numBytes);
int wordMax = (len / 8) * 8;
- long roffset = other.offset;
- Object rbase = other.base;
+ MemoryBlock rbase = other.base;
for (int i = 0; i < wordMax; i += 8) {
- long left = getLong(base, offset + i);
- long right = getLong(rbase, roffset + i);
+ long left = base.getLong(i);
+ long right = rbase.getLong(i);
if (left != right) {
if (IS_LITTLE_ENDIAN) {
return Long.compareUnsigned(Long.reverseBytes(left), Long.reverseBytes(right));
@@ -1243,7 +1237,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
for (int i = wordMax; i < len; i++) {
// In UTF-8, the byte should be unsigned, so we should compare them as unsigned int.
- int res = (getByte(i) & 0xFF) - (Platform.getByte(rbase, roffset + i) & 0xFF);
+ int res = (getByte(i) & 0xFF) - (rbase.getByte(i) & 0xFF);
if (res != 0) {
return res;
}
@@ -1262,7 +1256,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
if (numBytes != o.numBytes) {
return false;
}
- return ByteArrayMethods.arrayEquals(base, offset, o.base, o.offset, numBytes);
+ return ByteArrayMethods.arrayEqualsBlock(base, 0, o.base, 0, numBytes);
} else {
return false;
}
@@ -1318,8 +1312,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
num_bytes_j != numBytesForFirstByte(s.getByte(i_bytes))) {
cost = 1;
} else {
- cost = (ByteArrayMethods.arrayEquals(t.base, t.offset + j_bytes, s.base,
- s.offset + i_bytes, num_bytes_j)) ? 0 : 1;
+ cost = (ByteArrayMethods.arrayEqualsBlock(t.base, j_bytes, s.base,
+ i_bytes, num_bytes_j)) ? 0 : 1;
}
d[i + 1] = Math.min(Math.min(d[i] + 1, p[i + 1] + 1), p[i] + cost);
}
@@ -1334,7 +1328,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
@Override
public int hashCode() {
- return Murmur3_x86_32.hashUnsafeBytes(base, offset, numBytes, 42);
+ return Murmur3_x86_32.hashUnsafeBytesBlock(base,42);
}
/**
@@ -1397,10 +1391,10 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- offset = BYTE_ARRAY_OFFSET;
numBytes = in.readInt();
- base = new byte[numBytes];
- in.readFully((byte[]) base);
+ byte[] bytes = new byte[numBytes];
+ in.readFully(bytes);
+ base = ByteArrayMemoryBlock.fromArray(bytes);
}
@Override
@@ -1412,10 +1406,10 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
@Override
public void read(Kryo kryo, Input in) {
- this.offset = BYTE_ARRAY_OFFSET;
- this.numBytes = in.readInt();
- this.base = new byte[numBytes];
- in.read((byte[]) base);
+ numBytes = in.readInt();
+ byte[] bytes = new byte[numBytes];
+ in.read(bytes);
+ base = ByteArrayMemoryBlock.fromArray(bytes);
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
index 3ad9ac7..583a148 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
@@ -81,7 +81,7 @@ public class PlatformUtilSuite {
MemoryAllocator.HEAP.free(block);
Assert.assertNull(block.getBaseObject());
Assert.assertEquals(0, block.getBaseOffset());
- Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.pageNumber);
+ Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.getPageNumber());
}
@Test
@@ -92,7 +92,7 @@ public class PlatformUtilSuite {
MemoryAllocator.UNSAFE.free(block);
Assert.assertNull(block.getBaseObject());
Assert.assertEquals(0, block.getBaseOffset());
- Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.pageNumber);
+ Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.getPageNumber());
}
@Test(expected = AssertionError.class)
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
index fb8e53b..8c2e98c 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
@@ -20,14 +20,13 @@ package org.apache.spark.unsafe.array;
import org.junit.Assert;
import org.junit.Test;
-import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.memory.OnHeapMemoryBlock;
public class LongArraySuite {
@Test
public void basicTest() {
- long[] bytes = new long[2];
- LongArray arr = new LongArray(MemoryBlock.fromLongArray(bytes));
+ LongArray arr = new LongArray(new OnHeapMemoryBlock(16));
arr.set(0, 1L);
arr.set(1, 2L);
arr.set(1, 3L);
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
index 6348a73..d7ed005 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
@@ -71,6 +71,24 @@ public class Murmur3_x86_32Suite {
}
@Test
+ public void testKnownWordsInputs() {
+ byte[] bytes = new byte[16];
+ long offset = Platform.BYTE_ARRAY_OFFSET;
+ for (int i = 0; i < 16; i++) {
+ bytes[i] = 0;
+ }
+ Assert.assertEquals(-300363099, hasher.hashUnsafeWords(bytes, offset, 16, 42));
+ for (int i = 0; i < 16; i++) {
+ bytes[i] = -1;
+ }
+ Assert.assertEquals(-1210324667, hasher.hashUnsafeWords(bytes, offset, 16, 42));
+ for (int i = 0; i < 16; i++) {
+ bytes[i] = (byte)i;
+ }
+ Assert.assertEquals(-634919701, hasher.hashUnsafeWords(bytes, offset, 16, 42));
+ }
+
+ @Test
public void randomizedStressTest() {
int size = 65536;
Random rand = new Random();
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java
new file mode 100644
index 0000000..47f05c9
--- /dev/null
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import org.apache.spark.unsafe.Platform;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteOrder;
+
+import static org.hamcrest.core.StringContains.containsString;
+
+public class MemoryBlockSuite {
+ private static final boolean bigEndianPlatform =
+ ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
+
+ private void check(MemoryBlock memory, Object obj, long offset, int length) {
+ memory.setPageNumber(1);
+ memory.fill((byte)-1);
+ memory.putBoolean(0, true);
+ memory.putByte(1, (byte)127);
+ memory.putShort(2, (short)257);
+ memory.putInt(4, 0x20000002);
+ memory.putLong(8, 0x1234567089ABCDEFL);
+ memory.putFloat(16, 1.0F);
+ memory.putLong(20, 0x1234567089ABCDEFL);
+ memory.putDouble(28, 2.0);
+ MemoryBlock.copyMemory(memory, 0L, memory, 36, 4);
+ int[] a = new int[2];
+ a[0] = 0x12345678;
+ a[1] = 0x13579BDF;
+ memory.copyFrom(a, Platform.INT_ARRAY_OFFSET, 40, 8);
+ byte[] b = new byte[8];
+ memory.writeTo(40, b, Platform.BYTE_ARRAY_OFFSET, 8);
+
+ Assert.assertEquals(obj, memory.getBaseObject());
+ Assert.assertEquals(offset, memory.getBaseOffset());
+ Assert.assertEquals(length, memory.size());
+ Assert.assertEquals(1, memory.getPageNumber());
+ Assert.assertEquals(true, memory.getBoolean(0));
+ Assert.assertEquals((byte)127, memory.getByte(1 ));
+ Assert.assertEquals((short)257, memory.getShort(2));
+ Assert.assertEquals(0x20000002, memory.getInt(4));
+ Assert.assertEquals(0x1234567089ABCDEFL, memory.getLong(8));
+ Assert.assertEquals(1.0F, memory.getFloat(16), 0);
+ Assert.assertEquals(0x1234567089ABCDEFL, memory.getLong(20));
+ Assert.assertEquals(2.0, memory.getDouble(28), 0);
+ Assert.assertEquals(true, memory.getBoolean(36));
+ Assert.assertEquals((byte)127, memory.getByte(37 ));
+ Assert.assertEquals((short)257, memory.getShort(38));
+ Assert.assertEquals(a[0], memory.getInt(40));
+ Assert.assertEquals(a[1], memory.getInt(44));
+ if (bigEndianPlatform) {
+ Assert.assertEquals(a[0],
+ ((int)b[0] & 0xff) << 24 | ((int)b[1] & 0xff) << 16 |
+ ((int)b[2] & 0xff) << 8 | ((int)b[3] & 0xff));
+ Assert.assertEquals(a[1],
+ ((int)b[4] & 0xff) << 24 | ((int)b[5] & 0xff) << 16 |
+ ((int)b[6] & 0xff) << 8 | ((int)b[7] & 0xff));
+ } else {
+ Assert.assertEquals(a[0],
+ ((int)b[3] & 0xff) << 24 | ((int)b[2] & 0xff) << 16 |
+ ((int)b[1] & 0xff) << 8 | ((int)b[0] & 0xff));
+ Assert.assertEquals(a[1],
+ ((int)b[7] & 0xff) << 24 | ((int)b[6] & 0xff) << 16 |
+ ((int)b[5] & 0xff) << 8 | ((int)b[4] & 0xff));
+ }
+ for (int i = 48; i < memory.size(); i++) {
+ Assert.assertEquals((byte) -1, memory.getByte(i));
+ }
+
+ assert(memory.subBlock(0, memory.size()) == memory);
+
+ try {
+ memory.subBlock(-8, 8);
+ Assert.fail();
+ } catch (Exception expected) {
+ Assert.assertThat(expected.getMessage(), containsString("non-negative"));
+ }
+
+ try {
+ memory.subBlock(0, -8);
+ Assert.fail();
+ } catch (Exception expected) {
+ Assert.assertThat(expected.getMessage(), containsString("non-negative"));
+ }
+
+ try {
+ memory.subBlock(0, length + 8);
+ Assert.fail();
+ } catch (Exception expected) {
+ Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
+ }
+
+ try {
+ memory.subBlock(8, length - 4);
+ Assert.fail();
+ } catch (Exception expected) {
+ Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
+ }
+
+ try {
+ memory.subBlock(length + 8, 4);
+ Assert.fail();
+ } catch (Exception expected) {
+ Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
+ }
+ }
+
+ @Test
+ public void ByteArrayMemoryBlockTest() {
+ byte[] obj = new byte[56];
+ long offset = Platform.BYTE_ARRAY_OFFSET;
+ int length = obj.length;
+
+ MemoryBlock memory = new ByteArrayMemoryBlock(obj, offset, length);
+ check(memory, obj, offset, length);
+
+ memory = ByteArrayMemoryBlock.fromArray(obj);
+ check(memory, obj, offset, length);
+
+ obj = new byte[112];
+ memory = new ByteArrayMemoryBlock(obj, offset, length);
+ check(memory, obj, offset, length);
+ }
+
+ @Test
+ public void OnHeapMemoryBlockTest() {
+ long[] obj = new long[7];
+ long offset = Platform.LONG_ARRAY_OFFSET;
+ int length = obj.length * 8;
+
+ MemoryBlock memory = new OnHeapMemoryBlock(obj, offset, length);
+ check(memory, obj, offset, length);
+
+ memory = OnHeapMemoryBlock.fromArray(obj);
+ check(memory, obj, offset, length);
+
+ obj = new long[14];
+ memory = new OnHeapMemoryBlock(obj, offset, length);
+ check(memory, obj, offset, length);
+ }
+
+ @Test
+ public void OffHeapArrayMemoryBlockTest() {
+ MemoryAllocator memoryAllocator = new UnsafeMemoryAllocator();
+ MemoryBlock memory = memoryAllocator.allocate(56);
+ Object obj = memory.getBaseObject();
+ long offset = memory.getBaseOffset();
+ int length = 56;
+
+ check(memory, obj, offset, length);
+
+ long address = Platform.allocateMemory(112);
+ memory = new OffHeapMemoryBlock(address, length);
+ obj = memory.getBaseObject();
+ offset = memory.getBaseOffset();
+ check(memory, obj, offset, length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
index 7c34d41..bad908f 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
@@ -26,6 +26,9 @@ import java.util.*;
import com.google.common.collect.ImmutableMap;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
+import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.memory.OnHeapMemoryBlock;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -519,7 +522,8 @@ public class UTF8StringSuite {
final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
- UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i)
+ new UTF8String(
+ new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i))
.writeTo(outputStream);
final ByteBuffer buffer = ByteBuffer.wrap(outputStream.toByteArray(), i, test.length);
assertEquals("01234567", StandardCharsets.UTF_8.decode(buffer).toString());
@@ -534,7 +538,7 @@ public class UTF8StringSuite {
for (int i = 0; i < test.length; ++i) {
for (int j = 0; j < test.length - i; ++j) {
- UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET + i, j)
+ new UTF8String(ByteArrayMemoryBlock.fromArray(test).subBlock(i, j))
.writeTo(outputStream);
assertArrayEquals(Arrays.copyOfRange(test, i, i + j), outputStream.toByteArray());
@@ -565,7 +569,7 @@ public class UTF8StringSuite {
for (final long offset : offsets) {
try {
- fromAddress(test, BYTE_ARRAY_OFFSET + offset, test.length)
+ new UTF8String(ByteArrayMemoryBlock.fromArray(test).subBlock(offset, test.length))
.writeTo(outputStream);
throw new IllegalStateException(Long.toString(offset));
@@ -592,26 +596,25 @@ public class UTF8StringSuite {
}
@Test
- public void writeToOutputStreamIntArray() throws IOException {
+ public void writeToOutputStreamLongArray() throws IOException {
// verify that writes work on objects that are not byte arrays
- final ByteBuffer buffer = StandardCharsets.UTF_8.encode("大千世界");
+ final ByteBuffer buffer = StandardCharsets.UTF_8.encode("3千大千世界");
buffer.position(0);
buffer.order(ByteOrder.nativeOrder());
final int length = buffer.limit();
- assertEquals(12, length);
+ assertEquals(16, length);
- final int ints = length / 4;
- final int[] array = new int[ints];
+ final int longs = length / 8;
+ final long[] array = new long[longs];
- for (int i = 0; i < ints; ++i) {
- array[i] = buffer.getInt();
+ for (int i = 0; i < longs; ++i) {
+ array[i] = buffer.getLong();
}
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- fromAddress(array, Platform.INT_ARRAY_OFFSET, length)
- .writeTo(outputStream);
- assertEquals("大千世界", outputStream.toString("UTF-8"));
+ new UTF8String(OnHeapMemoryBlock.fromArray(array)).writeTo(outputStream);
+ assertEquals("3千大千世界", outputStream.toString("UTF-8"));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org