You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/04/06 02:14:05 UTC
[1/2] spark git commit: [SPARK-10399][CORE][SQL] Introduce multiple
MemoryBlocks to choose several types of memory block
Repository: spark
Updated Branches:
refs/heads/master d9ca1c906 -> 4807d381b
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index d07faf1..8651a63 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -311,7 +311,7 @@ public class TaskMemoryManager {
// this could trigger spilling to free some pages.
return allocatePage(size, consumer);
}
- page.pageNumber = pageNumber;
+ page.setPageNumber(pageNumber);
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
@@ -323,25 +323,25 @@ public class TaskMemoryManager {
* Free a block of memory allocated via {@link TaskMemoryManager#allocatePage}.
*/
public void freePage(MemoryBlock page, MemoryConsumer consumer) {
- assert (page.pageNumber != MemoryBlock.NO_PAGE_NUMBER) :
+ assert (page.getPageNumber() != MemoryBlock.NO_PAGE_NUMBER) :
"Called freePage() on memory that wasn't allocated with allocatePage()";
- assert (page.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+ assert (page.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"Called freePage() on a memory block that has already been freed";
- assert (page.pageNumber != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) :
+ assert (page.getPageNumber() != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) :
"Called freePage() on a memory block that has already been freed";
- assert(allocatedPages.get(page.pageNumber));
- pageTable[page.pageNumber] = null;
+ assert(allocatedPages.get(page.getPageNumber()));
+ pageTable[page.getPageNumber()] = null;
synchronized (this) {
- allocatedPages.clear(page.pageNumber);
+ allocatedPages.clear(page.getPageNumber());
}
if (logger.isTraceEnabled()) {
- logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
+ logger.trace("Freed page number {} ({} bytes)", page.getPageNumber(), page.size());
}
long pageSize = page.size();
// Clear the page number before passing the block to the MemoryAllocator's free().
// Doing this allows the MemoryAllocator to detect when a TaskMemoryManager-managed
// page has been inappropriately directly freed without calling TMM.freePage().
- page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
+ page.setPageNumber(MemoryBlock.FREED_IN_TMM_PAGE_NUMBER);
memoryManager.tungstenMemoryAllocator().free(page);
releaseExecutionMemory(pageSize, consumer);
}
@@ -363,7 +363,7 @@ public class TaskMemoryManager {
// relative to the page's base offset; this relative offset will fit in 51 bits.
offsetInPage -= page.getBaseOffset();
}
- return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
+ return encodePageNumberAndOffset(page.getPageNumber(), offsetInPage);
}
@VisibleForTesting
@@ -434,7 +434,7 @@ public class TaskMemoryManager {
for (MemoryBlock page : pageTable) {
if (page != null) {
logger.debug("unreleased page: " + page + " in task " + taskAttemptId);
- page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
+ page.setPageNumber(MemoryBlock.FREED_IN_TMM_PAGE_NUMBER);
memoryManager.tungstenMemoryAllocator().free(page);
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index dc36809..8f49859 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -20,7 +20,6 @@ package org.apache.spark.shuffle.sort;
import java.util.Comparator;
import org.apache.spark.memory.MemoryConsumer;
-import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.collection.Sorter;
@@ -105,13 +104,7 @@ final class ShuffleInMemorySorter {
public void expandPointerArray(LongArray newArray) {
assert(newArray.size() > array.size());
- Platform.copyMemory(
- array.getBaseObject(),
- array.getBaseOffset(),
- newArray.getBaseObject(),
- newArray.getBaseOffset(),
- pos * 8L
- );
+ MemoryBlock.copyMemory(array.memoryBlock(), newArray.memoryBlock(), pos * 8L);
consumer.freeArray(array);
array = newArray;
usableCapacity = getUsableCapacity();
@@ -180,10 +173,7 @@ final class ShuffleInMemorySorter {
PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,
PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
} else {
- MemoryBlock unused = new MemoryBlock(
- array.getBaseObject(),
- array.getBaseOffset() + pos * 8L,
- (array.size() - pos) * 8L);
+ MemoryBlock unused = array.memoryBlock().subBlock(pos * 8L, (array.size() - pos) * 8L);
LongArray buffer = new LongArray(unused);
Sorter<PackedRecordPointer, LongArray> sorter =
new Sorter<>(new ShuffleSortDataFormat(buffer));
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
index 717bdd7..254449e 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
@@ -17,8 +17,8 @@
package org.apache.spark.shuffle.sort;
-import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;
+import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.collection.SortDataFormat;
final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, LongArray> {
@@ -60,13 +60,8 @@ final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, Lo
@Override
public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) {
- Platform.copyMemory(
- src.getBaseObject(),
- src.getBaseOffset() + srcPos * 8L,
- dst.getBaseObject(),
- dst.getBaseOffset() + dstPos * 8L,
- length * 8L
- );
+ MemoryBlock.copyMemory(src.memoryBlock(), srcPos * 8L,
+ dst.memoryBlock(),dstPos * 8L,length * 8L);
}
@Override
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 66118f4..4fc19b1 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -544,7 +544,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// is accessing the current record. We free this page in that caller's next loadNext()
// call.
for (MemoryBlock page : allocatedPages) {
- if (!loaded || page.pageNumber !=
+ if (!loaded || page.getPageNumber() !=
((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) {
released += page.size();
freePage(page);
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index b3c27d8..20a7a8b 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -26,7 +26,6 @@ import org.apache.spark.TaskContext;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.memory.TaskMemoryManager;
-import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.UnsafeAlignedOffset;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
@@ -216,12 +215,7 @@ public final class UnsafeInMemorySorter {
if (newArray.size() < array.size()) {
throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
}
- Platform.copyMemory(
- array.getBaseObject(),
- array.getBaseOffset(),
- newArray.getBaseObject(),
- newArray.getBaseOffset(),
- pos * 8L);
+ MemoryBlock.copyMemory(array.memoryBlock(), newArray.memoryBlock(), pos * 8L);
consumer.freeArray(array);
array = newArray;
usableCapacity = getUsableCapacity();
@@ -348,10 +342,7 @@ public final class UnsafeInMemorySorter {
array, nullBoundaryPos, (pos - nullBoundaryPos) / 2L, 0, 7,
radixSortSupport.sortDescending(), radixSortSupport.sortSigned());
} else {
- MemoryBlock unused = new MemoryBlock(
- array.getBaseObject(),
- array.getBaseOffset() + pos * 8L,
- (array.size() - pos) * 8L);
+ MemoryBlock unused = array.memoryBlock().subBlock(pos * 8L, (array.size() - pos) * 8L);
LongArray buffer = new LongArray(unused);
Sorter<RecordPointerAndKeyPrefix, LongArray> sorter =
new Sorter<>(new UnsafeSortDataFormat(buffer));
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index a0664b3..d7d2d0b 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -76,7 +76,7 @@ public class TaskMemoryManagerSuite {
final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
final MemoryBlock dataPage = manager.allocatePage(256, c);
c.freePage(dataPage);
- Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, dataPage.pageNumber);
+ Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, dataPage.getPageNumber());
}
@Test(expected = AssertionError.class)
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 47173b8..3e56db5 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark._
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.unsafe.array.LongArray
-import org.apache.spark.unsafe.memory.MemoryBlock
+import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordPointerAndKeyPrefix, UnsafeSortDataFormat}
class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
@@ -105,9 +105,8 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
// the form [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999]
// that can trigger copyRange() in TimSort.mergeLo() or TimSort.mergeHi()
val ref = Array.tabulate[Long](size) { i => if (i < size / 2) size / 2 + i else i }
- val buf = new LongArray(MemoryBlock.fromLongArray(ref))
- val tmp = new Array[Long](size/2)
- val tmpBuf = new LongArray(MemoryBlock.fromLongArray(tmp))
+ val buf = new LongArray(OnHeapMemoryBlock.fromArray(ref))
+ val tmpBuf = new LongArray(new OnHeapMemoryBlock((size/2) * 8L))
new Sorter(new UnsafeSortDataFormat(tmpBuf)).sort(
buf, 0, size, new Comparator[RecordPointerAndKeyPrefix] {
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
index d5956ea..ddf3740 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
@@ -27,7 +27,7 @@ import com.google.common.primitives.Ints
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.unsafe.array.LongArray
-import org.apache.spark.unsafe.memory.MemoryBlock
+import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
import org.apache.spark.util.collection.Sorter
import org.apache.spark.util.random.XORShiftRandom
@@ -78,14 +78,14 @@ class RadixSortSuite extends SparkFunSuite with Logging {
private def generateTestData(size: Long, rand: => Long): (Array[JLong], LongArray) = {
val ref = Array.tabulate[Long](Ints.checkedCast(size)) { i => rand }
val extended = ref ++ Array.fill[Long](Ints.checkedCast(size))(0)
- (ref.map(i => new JLong(i)), new LongArray(MemoryBlock.fromLongArray(extended)))
+ (ref.map(i => new JLong(i)), new LongArray(OnHeapMemoryBlock.fromArray(extended)))
}
private def generateKeyPrefixTestData(size: Long, rand: => Long): (LongArray, LongArray) = {
val ref = Array.tabulate[Long](Ints.checkedCast(size * 2)) { i => rand }
val extended = ref ++ Array.fill[Long](Ints.checkedCast(size * 2))(0)
- (new LongArray(MemoryBlock.fromLongArray(ref)),
- new LongArray(MemoryBlock.fromLongArray(extended)))
+ (new LongArray(OnHeapMemoryBlock.fromArray(ref)),
+ new LongArray(OnHeapMemoryBlock.fromArray(extended)))
}
private def collectToArray(array: LongArray, offset: Int, length: Long): Array[Long] = {
@@ -110,7 +110,7 @@ class RadixSortSuite extends SparkFunSuite with Logging {
}
private def referenceKeyPrefixSort(buf: LongArray, lo: Long, hi: Long, refCmp: PrefixComparator) {
- val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt)))
+ val sortBuffer = new LongArray(new OnHeapMemoryBlock(buf.size() * 8L))
new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort(
buf, Ints.checkedCast(lo), Ints.checkedCast(hi), new Comparator[RecordPointerAndKeyPrefix] {
override def compare(
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
index c78f61a..d67e481 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
@@ -29,7 +29,7 @@ import org.apache.spark.mllib.feature.{HashingTF => OldHashingTF}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.hash.Murmur3_x86_32.{hashInt, hashLong, hashUnsafeBytes2}
+import org.apache.spark.unsafe.hash.Murmur3_x86_32.{hashInt, hashLong, hashUnsafeBytes2Block}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.OpenHashMap
@@ -243,8 +243,7 @@ object FeatureHasher extends DefaultParamsReadable[FeatureHasher] {
case f: Float => hashInt(java.lang.Float.floatToIntBits(f), seed)
case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed)
case s: String =>
- val utf8 = UTF8String.fromString(s)
- hashUnsafeBytes2(utf8.getBaseObject, utf8.getBaseOffset, utf8.numBytes(), seed)
+ hashUnsafeBytes2Block(UTF8String.fromString(s).getMemoryBlock, seed)
case _ => throw new SparkException("FeatureHasher with murmur3 algorithm does not " +
s"support type ${term.getClass.getCanonicalName} of input data.")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala
index 8935c84..7b73b28 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala
@@ -160,7 +160,7 @@ object HashingTF {
case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed)
case s: String =>
val utf8 = UTF8String.fromString(s)
- hashUnsafeBytes(utf8.getBaseObject, utf8.getBaseOffset, utf8.numBytes(), seed)
+ hashUnsafeBytesBlock(utf8.getMemoryBlock(), seed)
case _ => throw new SparkException("HashingTF with murmur3 algorithm does not " +
s"support type ${term.getClass.getCanonicalName} of input data.")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
index d18542b..8546c28 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
@@ -27,6 +27,7 @@ import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.bitset.BitSetMethods;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
+import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
@@ -230,7 +231,8 @@ public final class UnsafeArrayData extends ArrayData {
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int size = (int) offsetAndSize;
- return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
+ MemoryBlock mb = MemoryBlock.allocateFromObject(baseObject, baseOffset + offset, size);
+ return new UTF8String(mb);
}
@Override
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 71c0860..29a1411 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -37,6 +37,7 @@ import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.bitset.BitSetMethods;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
+import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
@@ -414,7 +415,8 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int size = (int) offsetAndSize;
- return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
+ MemoryBlock mb = MemoryBlock.allocateFromObject(baseObject, baseOffset + offset, size);
+ return new UTF8String(mb);
}
@Override
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
index f37ef83..8837489 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
@@ -16,7 +16,10 @@
*/
package org.apache.spark.sql.catalyst.expressions;
+import com.google.common.primitives.Ints;
+
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.memory.MemoryBlock;
// scalastyle: off
/**
@@ -71,13 +74,13 @@ public final class XXH64 {
return fmix(hash);
}
- public long hashUnsafeWords(Object base, long offset, int length) {
- return hashUnsafeWords(base, offset, length, seed);
+ public long hashUnsafeWordsBlock(MemoryBlock mb) {
+ return hashUnsafeWordsBlock(mb, seed);
}
- public static long hashUnsafeWords(Object base, long offset, int length, long seed) {
- assert (length % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)";
- long hash = hashBytesByWords(base, offset, length, seed);
+ public static long hashUnsafeWordsBlock(MemoryBlock mb, long seed) {
+ assert (mb.size() % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)";
+ long hash = hashBytesByWordsBlock(mb, seed);
return fmix(hash);
}
@@ -85,26 +88,32 @@ public final class XXH64 {
return hashUnsafeBytes(base, offset, length, seed);
}
- public static long hashUnsafeBytes(Object base, long offset, int length, long seed) {
+ public static long hashUnsafeBytesBlock(MemoryBlock mb, long seed) {
+ long offset = 0;
+ long length = mb.size();
assert (length >= 0) : "lengthInBytes cannot be negative";
- long hash = hashBytesByWords(base, offset, length, seed);
+ long hash = hashBytesByWordsBlock(mb, seed);
long end = offset + length;
offset += length & -8;
if (offset + 4L <= end) {
- hash ^= (Platform.getInt(base, offset) & 0xFFFFFFFFL) * PRIME64_1;
+ hash ^= (mb.getInt(offset) & 0xFFFFFFFFL) * PRIME64_1;
hash = Long.rotateLeft(hash, 23) * PRIME64_2 + PRIME64_3;
offset += 4L;
}
while (offset < end) {
- hash ^= (Platform.getByte(base, offset) & 0xFFL) * PRIME64_5;
+ hash ^= (mb.getByte(offset) & 0xFFL) * PRIME64_5;
hash = Long.rotateLeft(hash, 11) * PRIME64_1;
offset++;
}
return fmix(hash);
}
+ public static long hashUnsafeBytes(Object base, long offset, int length, long seed) {
+ return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, length), seed);
+ }
+
private static long fmix(long hash) {
hash ^= hash >>> 33;
hash *= PRIME64_2;
@@ -114,30 +123,31 @@ public final class XXH64 {
return hash;
}
- private static long hashBytesByWords(Object base, long offset, int length, long seed) {
- long end = offset + length;
+ private static long hashBytesByWordsBlock(MemoryBlock mb, long seed) {
+ long offset = 0;
+ long length = mb.size();
long hash;
if (length >= 32) {
- long limit = end - 32;
+ long limit = length - 32;
long v1 = seed + PRIME64_1 + PRIME64_2;
long v2 = seed + PRIME64_2;
long v3 = seed;
long v4 = seed - PRIME64_1;
do {
- v1 += Platform.getLong(base, offset) * PRIME64_2;
+ v1 += mb.getLong(offset) * PRIME64_2;
v1 = Long.rotateLeft(v1, 31);
v1 *= PRIME64_1;
- v2 += Platform.getLong(base, offset + 8) * PRIME64_2;
+ v2 += mb.getLong(offset + 8) * PRIME64_2;
v2 = Long.rotateLeft(v2, 31);
v2 *= PRIME64_1;
- v3 += Platform.getLong(base, offset + 16) * PRIME64_2;
+ v3 += mb.getLong(offset + 16) * PRIME64_2;
v3 = Long.rotateLeft(v3, 31);
v3 *= PRIME64_1;
- v4 += Platform.getLong(base, offset + 24) * PRIME64_2;
+ v4 += mb.getLong(offset + 24) * PRIME64_2;
v4 = Long.rotateLeft(v4, 31);
v4 *= PRIME64_1;
@@ -178,9 +188,9 @@ public final class XXH64 {
hash += length;
- long limit = end - 8;
+ long limit = length - 8;
while (offset <= limit) {
- long k1 = Platform.getLong(base, offset);
+ long k1 = mb.getLong(offset);
hash ^= Long.rotateLeft(k1 * PRIME64_2, 31) * PRIME64_1;
hash = Long.rotateLeft(hash, 27) * PRIME64_1 + PRIME64_4;
offset += 8L;
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index b702422..b76b64a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.hash.Murmur3_x86_32
+import org.apache.spark.unsafe.memory.MemoryBlock
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -360,10 +361,8 @@ abstract class HashExpression[E] extends Expression {
}
protected def genHashString(input: String, result: String): String = {
- val baseObject = s"$input.getBaseObject()"
- val baseOffset = s"$input.getBaseOffset()"
- val numBytes = s"$input.numBytes()"
- s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);"
+ val mb = s"$input.getMemoryBlock()"
+ s"$result = $hasherClassName.hashUnsafeBytesBlock($mb, $result);"
}
protected def genHashForMap(
@@ -465,6 +464,8 @@ abstract class InterpretedHashFunction {
protected def hashUnsafeBytes(base: AnyRef, offset: Long, length: Int, seed: Long): Long
+ protected def hashUnsafeBytesBlock(base: MemoryBlock, seed: Long): Long
+
/**
* Computes hash of a given `value` of type `dataType`. The caller needs to check the validity
* of input `value`.
@@ -490,8 +491,7 @@ abstract class InterpretedHashFunction {
case c: CalendarInterval => hashInt(c.months, hashLong(c.microseconds, seed))
case a: Array[Byte] =>
hashUnsafeBytes(a, Platform.BYTE_ARRAY_OFFSET, a.length, seed)
- case s: UTF8String =>
- hashUnsafeBytes(s.getBaseObject, s.getBaseOffset, s.numBytes(), seed)
+ case s: UTF8String => hashUnsafeBytesBlock(s.getMemoryBlock(), seed)
case array: ArrayData =>
val elementType = dataType match {
@@ -578,9 +578,15 @@ object Murmur3HashFunction extends InterpretedHashFunction {
Murmur3_x86_32.hashLong(l, seed.toInt)
}
- override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
+ override protected def hashUnsafeBytes(
+ base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
Murmur3_x86_32.hashUnsafeBytes(base, offset, len, seed.toInt)
}
+
+ override protected def hashUnsafeBytesBlock(
+ base: MemoryBlock, seed: Long): Long = {
+ Murmur3_x86_32.hashUnsafeBytesBlock(base, seed.toInt)
+ }
}
/**
@@ -605,9 +611,14 @@ object XxHash64Function extends InterpretedHashFunction {
override protected def hashLong(l: Long, seed: Long): Long = XXH64.hashLong(l, seed)
- override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
+ override protected def hashUnsafeBytes(
+ base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
XXH64.hashUnsafeBytes(base, offset, len, seed)
}
+
+ override protected def hashUnsafeBytesBlock(base: MemoryBlock, seed: Long): Long = {
+ XXH64.hashUnsafeBytesBlock(base, seed)
+ }
}
/**
@@ -714,10 +725,8 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] {
"""
override protected def genHashString(input: String, result: String): String = {
- val baseObject = s"$input.getBaseObject()"
- val baseOffset = s"$input.getBaseOffset()"
- val numBytes = s"$input.numBytes()"
- s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes);"
+ val mb = s"$input.getMemoryBlock()"
+ s"$result = $hasherClassName.hashUnsafeBytesBlock($mb);"
}
override protected def genHashForArray(
@@ -805,10 +814,14 @@ object HiveHashFunction extends InterpretedHashFunction {
HiveHasher.hashLong(l)
}
- override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
+ override protected def hashUnsafeBytes(
+ base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
HiveHasher.hashUnsafeBytes(base, offset, len)
}
+ override protected def hashUnsafeBytesBlock(
+ base: MemoryBlock, seed: Long): Long = HiveHasher.hashUnsafeBytesBlock(base)
+
private val HIVE_DECIMAL_MAX_PRECISION = 38
private val HIVE_DECIMAL_MAX_SCALE = 38
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
index b67c6f3..8ffc1d7 100644
--- a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
@@ -18,6 +18,8 @@
package org.apache.spark.sql.catalyst.expressions;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
+import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;
import org.junit.Assert;
import org.junit.Test;
@@ -53,7 +55,7 @@ public class HiveHasherSuite {
for (int i = 0; i < inputs.length; i++) {
UTF8String s = UTF8String.fromString("val_" + inputs[i]);
- int hash = HiveHasher.hashUnsafeBytes(s.getBaseObject(), s.getBaseOffset(), s.numBytes());
+ int hash = HiveHasher.hashUnsafeBytesBlock(s.getMemoryBlock());
Assert.assertEquals(expected[i], ((31 * inputs[i]) + hash));
}
}
@@ -89,13 +91,13 @@ public class HiveHasherSuite {
int byteArrSize = rand.nextInt(100) * 8;
byte[] bytes = new byte[byteArrSize];
rand.nextBytes(bytes);
+ MemoryBlock mb = ByteArrayMemoryBlock.fromArray(bytes);
Assert.assertEquals(
- HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
- HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+ HiveHasher.hashUnsafeBytesBlock(mb),
+ HiveHasher.hashUnsafeBytesBlock(mb));
- hashcodes.add(HiveHasher.hashUnsafeBytes(
- bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+ hashcodes.add(HiveHasher.hashUnsafeBytesBlock(mb));
}
// A very loose bound.
@@ -112,13 +114,13 @@ public class HiveHasherSuite {
byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
byte[] paddedBytes = new byte[byteArrSize];
System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length);
+ MemoryBlock mb = ByteArrayMemoryBlock.fromArray(paddedBytes);
Assert.assertEquals(
- HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
- HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+ HiveHasher.hashUnsafeBytesBlock(mb),
+ HiveHasher.hashUnsafeBytesBlock(mb));
- hashcodes.add(HiveHasher.hashUnsafeBytes(
- paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+ hashcodes.add(HiveHasher.hashUnsafeBytesBlock(mb));
}
// A very loose bound.
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
index 1baee91..cd8bce6 100644
--- a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
@@ -24,6 +24,8 @@ import java.util.Random;
import java.util.Set;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
+import org.apache.spark.unsafe.memory.MemoryBlock;
import org.junit.Assert;
import org.junit.Test;
@@ -142,13 +144,13 @@ public class XXH64Suite {
int byteArrSize = rand.nextInt(100) * 8;
byte[] bytes = new byte[byteArrSize];
rand.nextBytes(bytes);
+ MemoryBlock mb = ByteArrayMemoryBlock.fromArray(bytes);
Assert.assertEquals(
- hasher.hashUnsafeWords(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
- hasher.hashUnsafeWords(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+ hasher.hashUnsafeWordsBlock(mb),
+ hasher.hashUnsafeWordsBlock(mb));
- hashcodes.add(hasher.hashUnsafeWords(
- bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+ hashcodes.add(hasher.hashUnsafeWordsBlock(mb));
}
// A very loose bound.
@@ -165,13 +167,13 @@ public class XXH64Suite {
byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
byte[] paddedBytes = new byte[byteArrSize];
System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length);
+ MemoryBlock mb = ByteArrayMemoryBlock.fromArray(paddedBytes);
Assert.assertEquals(
- hasher.hashUnsafeWords(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
- hasher.hashUnsafeWords(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+ hasher.hashUnsafeWordsBlock(mb),
+ hasher.hashUnsafeWordsBlock(mb));
- hashcodes.add(hasher.hashUnsafeWords(
- paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
+ hashcodes.add(hasher.hashUnsafeWordsBlock(mb));
}
// A very loose bound.
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 754c265..4733f36 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.memory.OffHeapMemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;
/**
@@ -206,7 +207,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
@Override
protected UTF8String getBytesAsUTF8String(int rowId, int count) {
- return UTF8String.fromAddress(null, data + rowId, count);
+ return new UTF8String(new OffHeapMemoryBlock(data + rowId, count));
}
//
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
index f8e37e9..227a16f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
@@ -25,6 +25,7 @@ import org.apache.arrow.vector.holders.NullableVarCharHolder;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.arrow.ArrowUtils;
import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.memory.OffHeapMemoryBlock;
import org.apache.spark.unsafe.types.UTF8String;
/**
@@ -377,9 +378,10 @@ public final class ArrowColumnVector extends ColumnVector {
if (stringResult.isSet == 0) {
return null;
} else {
- return UTF8String.fromAddress(null,
+ return new UTF8String(new OffHeapMemoryBlock(
stringResult.buffer.memoryAddress() + stringResult.start,
- stringResult.end - stringResult.start);
+ stringResult.end - stringResult.start
+ ));
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
index 50ae26a..470b93e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.benchmark
import java.util.{Arrays, Comparator}
import org.apache.spark.unsafe.array.LongArray
-import org.apache.spark.unsafe.memory.MemoryBlock
+import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
import org.apache.spark.util.Benchmark
import org.apache.spark.util.collection.Sorter
import org.apache.spark.util.collection.unsafe.sort._
@@ -36,7 +36,7 @@ import org.apache.spark.util.random.XORShiftRandom
class SortBenchmark extends BenchmarkBase {
private def referenceKeyPrefixSort(buf: LongArray, lo: Int, hi: Int, refCmp: PrefixComparator) {
- val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt)))
+ val sortBuffer = new LongArray(new OnHeapMemoryBlock(buf.size() * 8L))
new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort(
buf, lo, hi, new Comparator[RecordPointerAndKeyPrefix] {
override def compare(
@@ -50,8 +50,8 @@ class SortBenchmark extends BenchmarkBase {
private def generateKeyPrefixTestData(size: Int, rand: => Long): (LongArray, LongArray) = {
val ref = Array.tabulate[Long](size * 2) { i => rand }
val extended = ref ++ Array.fill[Long](size * 2)(0)
- (new LongArray(MemoryBlock.fromLongArray(ref)),
- new LongArray(MemoryBlock.fromLongArray(extended)))
+ (new LongArray(OnHeapMemoryBlock.fromArray(ref)),
+ new LongArray(OnHeapMemoryBlock.fromArray(extended)))
}
ignore("sort") {
@@ -60,7 +60,7 @@ class SortBenchmark extends BenchmarkBase {
val benchmark = new Benchmark("radix sort " + size, size)
benchmark.addTimerCase("reference TimSort key prefix array") { timer =>
val array = Array.tabulate[Long](size * 2) { i => rand.nextLong }
- val buf = new LongArray(MemoryBlock.fromLongArray(array))
+ val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
timer.startTiming()
referenceKeyPrefixSort(buf, 0, size, PrefixComparators.BINARY)
timer.stopTiming()
@@ -78,7 +78,7 @@ class SortBenchmark extends BenchmarkBase {
array(i) = rand.nextLong & 0xff
i += 1
}
- val buf = new LongArray(MemoryBlock.fromLongArray(array))
+ val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
timer.startTiming()
RadixSort.sort(buf, size, 0, 7, false, false)
timer.stopTiming()
@@ -90,7 +90,7 @@ class SortBenchmark extends BenchmarkBase {
array(i) = rand.nextLong & 0xffff
i += 1
}
- val buf = new LongArray(MemoryBlock.fromLongArray(array))
+ val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
timer.startTiming()
RadixSort.sort(buf, size, 0, 7, false, false)
timer.stopTiming()
@@ -102,7 +102,7 @@ class SortBenchmark extends BenchmarkBase {
array(i) = rand.nextLong
i += 1
}
- val buf = new LongArray(MemoryBlock.fromLongArray(array))
+ val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
timer.startTiming()
RadixSort.sort(buf, size, 0, 7, false, false)
timer.stopTiming()
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
index ffda33c..25ee95d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
@@ -22,13 +22,13 @@ import java.io.File
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.memory.{MemoryManager, TaskMemoryManager, TestMemoryManager}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.unsafe.memory.MemoryBlock
+import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
import org.apache.spark.util.Utils
class RowQueueSuite extends SparkFunSuite {
test("in-memory queue") {
- val page = MemoryBlock.fromLongArray(new Array[Long](1<<10))
+ val page = new OnHeapMemoryBlock((1<<10) * 8L)
val queue = new InMemoryRowQueue(page, 1) {
override def close() {}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org
[2/2] spark git commit: [SPARK-10399][CORE][SQL] Introduce multiple
MemoryBlocks to choose several types of memory block
Posted by we...@apache.org.
[SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks to choose several types of memory block
## What changes were proposed in this pull request?
This PR allows us to use one of several types of `MemoryBlock`, such as byte array, int array, long array, or `java.nio.DirectByteBuffer`. To use `java.nio.DirectByteBuffer` allows to have off heap memory which is automatically deallocated by JVM. `MemoryBlock` class has primitive accessors like `Platform.getInt()`, `Platform.putint()`, or `Platform.copyMemory()`.
This PR uses `MemoryBlock` for `OffHeapColumnVector`, `UTF8String`, and other places. This PR can improve performance of operations involving memory accesses (e.g. `UTF8String.trim`) by 1.8x.
For now, this PR does not use `MemoryBlock` for `BufferHolder` based on cloud-fan's [suggestion](https://github.com/apache/spark/pull/11494#issuecomment-309694290).
Since this PR is a successor of #11494, close #11494. Many codes were ported from #11494. Many efforts were put here. **I think this PR should credit to yzotov.**
This PR can achieve **1.1-1.4x performance improvements** for operations in `UTF8String` or `Murmur3_x86_32`. Other operations are almost comparable performances.
Without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Hash byte arrays with length 268435487: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Murmur3_x86_32 526 / 536 0.0 131399881.5 1.0X
UTF8String benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
hashCode 525 / 552 1022.6 1.0 1.0X
substring 414 / 423 1298.0 0.8 1.3X
```
With this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Hash byte arrays with length 268435487: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Murmur3_x86_32 474 / 488 0.0 118552232.0 1.0X
UTF8String benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
hashCode 476 / 480 1127.3 0.9 1.0X
substring 287 / 291 1869.9 0.5 1.7X
```
Benchmark program
```
test("benchmark Murmur3_x86_32") {
val length = 8192 * 32768 + 31
val seed = 42L
val iters = 1 << 2
val random = new Random(seed)
val arrays = Array.fill[MemoryBlock](numArrays) {
val bytes = new Array[Byte](length)
random.nextBytes(bytes)
new ByteArrayMemoryBlock(bytes, Platform.BYTE_ARRAY_OFFSET, length)
}
val benchmark = new Benchmark("Hash byte arrays with length " + length,
iters * numArrays, minNumIters = 20)
benchmark.addCase("HiveHasher") { _: Int =>
var sum = 0L
for (_ <- 0L until iters) {
sum += HiveHasher.hashUnsafeBytesBlock(
arrays(i), Platform.BYTE_ARRAY_OFFSET, length)
}
}
benchmark.run()
}
test("benchmark UTF8String") {
val N = 512 * 1024 * 1024
val iters = 2
val benchmark = new Benchmark("UTF8String benchmark", N, minNumIters = 20)
val str0 = new java.io.StringWriter() { { for (i <- 0 until N) { write(" ") } } }.toString
val s0 = UTF8String.fromString(str0)
benchmark.addCase("hashCode") { _: Int =>
var h: Int = 0
for (_ <- 0L until iters) { h += s0.hashCode }
}
benchmark.addCase("substring") { _: Int =>
var s: UTF8String = null
for (_ <- 0L until iters) { s = s0.substring(N / 2 - 5, N / 2 + 5) }
}
benchmark.run()
}
```
I run [this benchmark program](https://gist.github.com/kiszk/94f75b506c93a663bbbc372ffe8f05de) using [the commit](https://github.com/apache/spark/pull/19222/commits/ee5a79861c18725fb1cd9b518cdfd2489c05b81d6). I got the following results:
```
OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Memory access benchmarks: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
ByteArrayMemoryBlock get/putInt() 220 / 221 609.3 1.6 1.0X
Platform get/putInt(byte[]) 220 / 236 610.9 1.6 1.0X
Platform get/putInt(Object) 492 / 494 272.8 3.7 0.4X
OnHeapMemoryBlock get/putLong() 322 / 323 416.5 2.4 0.7X
long[] 221 / 221 608.0 1.6 1.0X
Platform get/putLong(long[]) 321 / 321 418.7 2.4 0.7X
Platform get/putLong(Object) 561 / 563 239.2 4.2 0.4X
```
I also run [this benchmark program](https://gist.github.com/kiszk/5fdb4e03733a5d110421177e289d1fb5) for comparing performance of `Platform.copyMemory()`.
```
OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
Platform copyMemory: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Object to Object 1961 / 1967 8.6 116.9 1.0X
System.arraycopy Object to Object 1917 / 1921 8.8 114.3 1.0X
byte array to byte array 1961 / 1968 8.6 116.9 1.0X
System.arraycopy byte array to byte array 1909 / 1937 8.8 113.8 1.0X
int array to int array 1921 / 1990 8.7 114.5 1.0X
double array to double array 1918 / 1923 8.7 114.3 1.0X
Object to byte array 1961 / 1967 8.6 116.9 1.0X
Object to short array 1965 / 1972 8.5 117.1 1.0X
Object to int array 1910 / 1915 8.8 113.9 1.0X
Object to float array 1971 / 1978 8.5 117.5 1.0X
Object to double array 1919 / 1944 8.7 114.4 1.0X
byte array to Object 1959 / 1967 8.6 116.8 1.0X
int array to Object 1961 / 1970 8.6 116.9 1.0X
double array to Object 1917 / 1924 8.8 114.3 1.0X
```
These results show three facts:
1. According to the second/third or sixth/seventh results in the first experiment, if we use `Platform.get/putInt(Object)`, we achieve more than 2x worse performance than `Platform.get/putInt(byte[])` with concrete type (i.e. `byte[]`).
2. According to the second/third or fourth/fifth/sixth results in the first experiment, the fastest way to access an array element on Java heap is `array[]`. **Cons of `array[]` is that it is not possible to support unaligned-8byte access.**
3. According to the first/second/third or fourth/sixth/seventh results in the first experiment, `getInt()/putInt() or getLong()/putLong()` in subclasses of `MemoryBlock` can achieve comparable performance to `Platform.get/putInt()` or `Platform.get/putLong()` with concrete type (second or sixth result). There is no overhead regarding virtual call.
4. According to results in the second experiment, for `Platform.copy()`, to pass `Object` can achieve the same performance as to pass any type of primitive array as source or destination.
5. According to second/fourth results in the second experiment, `Platform.copy()` can achieve the same performance as `System.arrayCopy`. **It would be good to use `Platform.copy()` since `Platform.copy()` can take any types for src and dst.**
We are incrementally replace `Platform.get/putXXX` with `MemoryBlock.get/putXXX`. This is because we have two advantages.
1) Achieve better performance due to having a concrete type for an array.
2) Use simple OO design instead of passing `Object`
It is easy to use `MemoryBlock` in `InternalRow`, `BufferHolder`, `TaskMemoryManager`, and others that are already abstracted. It is not easy to use `MemoryBlock` in utility classes related to hashing or others.
Other candidates are
- UnsafeRow, UnsafeArrayData, UnsafeMapData, SpecificUnsafeRowJoiner
- UTF8StringBuffer
- BufferHolder
- TaskMemoryManager
- OnHeapColumnVector
- BytesToBytesMap
- CachedBatch
- classes for hash
- others.
## How was this patch tested?
Added `UnsafeMemoryAllocator`
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Closes #19222 from kiszk/SPARK-10399.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4807d381
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4807d381
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4807d381
Branch: refs/heads/master
Commit: 4807d381bb113a5c61e6dad88202f23a8b6dd141
Parents: d9ca1c9
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Authored: Fri Apr 6 10:13:59 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Apr 6 10:13:59 2018 +0800
----------------------------------------------------------------------
.../sql/catalyst/expressions/HiveHasher.java | 12 +-
.../java/org/apache/spark/unsafe/Platform.java | 2 +-
.../spark/unsafe/array/ByteArrayMethods.java | 13 +-
.../apache/spark/unsafe/array/LongArray.java | 17 +-
.../spark/unsafe/hash/Murmur3_x86_32.java | 45 +++--
.../unsafe/memory/ByteArrayMemoryBlock.java | 128 ++++++++++++++
.../unsafe/memory/HeapMemoryAllocator.java | 19 +-
.../spark/unsafe/memory/MemoryAllocator.java | 4 +-
.../apache/spark/unsafe/memory/MemoryBlock.java | 157 +++++++++++++++--
.../spark/unsafe/memory/MemoryLocation.java | 54 ------
.../spark/unsafe/memory/OffHeapMemoryBlock.java | 105 +++++++++++
.../spark/unsafe/memory/OnHeapMemoryBlock.java | 132 ++++++++++++++
.../unsafe/memory/UnsafeMemoryAllocator.java | 21 ++-
.../apache/spark/unsafe/types/UTF8String.java | 148 ++++++++--------
.../apache/spark/unsafe/PlatformUtilSuite.java | 4 +-
.../spark/unsafe/array/LongArraySuite.java | 5 +-
.../spark/unsafe/hash/Murmur3_x86_32Suite.java | 18 ++
.../spark/unsafe/memory/MemoryBlockSuite.java | 175 +++++++++++++++++++
.../spark/unsafe/types/UTF8StringSuite.java | 29 +--
.../apache/spark/memory/TaskMemoryManager.java | 22 +--
.../shuffle/sort/ShuffleInMemorySorter.java | 14 +-
.../shuffle/sort/ShuffleSortDataFormat.java | 11 +-
.../unsafe/sort/UnsafeExternalSorter.java | 2 +-
.../unsafe/sort/UnsafeInMemorySorter.java | 13 +-
.../spark/memory/TaskMemoryManagerSuite.java | 2 +-
.../util/collection/ExternalSorterSuite.scala | 7 +-
.../collection/unsafe/sort/RadixSortSuite.scala | 10 +-
.../apache/spark/ml/feature/FeatureHasher.scala | 5 +-
.../apache/spark/mllib/feature/HashingTF.scala | 2 +-
.../catalyst/expressions/UnsafeArrayData.java | 4 +-
.../sql/catalyst/expressions/UnsafeRow.java | 4 +-
.../spark/sql/catalyst/expressions/XXH64.java | 46 +++--
.../spark/sql/catalyst/expressions/hash.scala | 39 +++--
.../catalyst/expressions/HiveHasherSuite.java | 20 ++-
.../sql/catalyst/expressions/XXH64Suite.java | 18 +-
.../vectorized/OffHeapColumnVector.java | 3 +-
.../spark/sql/vectorized/ArrowColumnVector.java | 6 +-
.../sql/execution/benchmark/SortBenchmark.scala | 16 +-
.../sql/execution/python/RowQueueSuite.scala | 4 +-
39 files changed, 1002 insertions(+), 334 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
index 7357743..5d90594 100644
--- a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
+++ b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.memory.MemoryBlock;
/**
* Simulates Hive's hashing function from Hive v1.2.1
@@ -38,12 +39,17 @@ public class HiveHasher {
return (int) ((input >>> 32) ^ input);
}
- public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
+ public static int hashUnsafeBytesBlock(MemoryBlock mb) {
+ long lengthInBytes = mb.size();
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int result = 0;
- for (int i = 0; i < lengthInBytes; i++) {
- result = (result * 31) + (int) Platform.getByte(base, offset + i);
+ for (long i = 0; i < lengthInBytes; i++) {
+ result = (result * 31) + (int) mb.getByte(i);
}
return result;
}
+
+ public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
+ return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes));
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index aca6fca..54dcadf 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -187,7 +187,7 @@ public final class Platform {
}
public static void copyMemory(
- Object src, long srcOffset, Object dst, long dstOffset, long length) {
+ Object src, long srcOffset, Object dst, long dstOffset, long length) {
// Check if dstOffset is before or after srcOffset to determine if we should copy
// forward or backwards. This is necessary in case src and dst overlap.
if (dstOffset < srcOffset) {
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
index a6b1f7a..c334c96 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
@@ -18,6 +18,7 @@
package org.apache.spark.unsafe.array;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.memory.MemoryBlock;
public class ByteArrayMethods {
@@ -49,6 +50,16 @@ public class ByteArrayMethods {
private static final boolean unaligned = Platform.unaligned();
/**
+ * MemoryBlock equality check for MemoryBlocks.
+ * @return true if the arrays are equal, false otherwise
+ */
+ public static boolean arrayEqualsBlock(
+ MemoryBlock leftBase, long leftOffset, MemoryBlock rightBase, long rightOffset, final long length) {
+ return arrayEquals(leftBase.getBaseObject(), leftBase.getBaseOffset() + leftOffset,
+ rightBase.getBaseObject(), rightBase.getBaseOffset() + rightOffset, length);
+ }
+
+ /**
* Optimized byte array equality check for byte arrays.
* @return true if the arrays are equal, false otherwise
*/
@@ -56,7 +67,7 @@ public class ByteArrayMethods {
Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
int i = 0;
- // check if stars align and we can get both offsets to be aligned
+ // check if starts align and we can get both offsets to be aligned
if ((leftOffset % 8) == (rightOffset % 8)) {
while ((leftOffset + i) % 8 != 0 && i < length) {
if (Platform.getByte(leftBase, leftOffset + i) !=
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
index 2cd39bd..b74d2de 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
@@ -17,7 +17,6 @@
package org.apache.spark.unsafe.array;
-import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
/**
@@ -33,16 +32,12 @@ public final class LongArray {
private static final long WIDTH = 8;
private final MemoryBlock memory;
- private final Object baseObj;
- private final long baseOffset;
private final long length;
public LongArray(MemoryBlock memory) {
assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
this.memory = memory;
- this.baseObj = memory.getBaseObject();
- this.baseOffset = memory.getBaseOffset();
this.length = memory.size() / WIDTH;
}
@@ -51,11 +46,11 @@ public final class LongArray {
}
public Object getBaseObject() {
- return baseObj;
+ return memory.getBaseObject();
}
public long getBaseOffset() {
- return baseOffset;
+ return memory.getBaseOffset();
}
/**
@@ -69,8 +64,8 @@ public final class LongArray {
* Fill this all with 0L.
*/
public void zeroOut() {
- for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) {
- Platform.putLong(baseObj, off, 0);
+ for (long off = 0; off < length * WIDTH; off += WIDTH) {
+ memory.putLong(off, 0);
}
}
@@ -80,7 +75,7 @@ public final class LongArray {
public void set(int index, long value) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < length : "index (" + index + ") should < length (" + length + ")";
- Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
+ memory.putLong(index * WIDTH, value);
}
/**
@@ -89,6 +84,6 @@ public final class LongArray {
public long get(int index) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < length : "index (" + index + ") should < length (" + length + ")";
- return Platform.getLong(baseObj, baseOffset + index * WIDTH);
+ return memory.getLong(index * WIDTH);
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
index d239de6..f372b19 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
@@ -17,7 +17,9 @@
package org.apache.spark.unsafe.hash;
-import org.apache.spark.unsafe.Platform;
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.memory.MemoryBlock;
/**
* 32-bit Murmur3 hasher. This is based on Guava's Murmur3_32HashFunction.
@@ -49,49 +51,66 @@ public final class Murmur3_x86_32 {
}
public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
- return hashUnsafeWords(base, offset, lengthInBytes, seed);
+ return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
}
- public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
+ public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
// This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+ int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)";
- int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
+ int h1 = hashBytesByIntBlock(base, seed);
return fmix(h1, lengthInBytes);
}
- public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
+ public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
+ // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
+ return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
+ }
+
+ public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) {
// This is not compatible with original and another implementations.
// But remain it for backward compatibility for the components existing before 2.3.
+ int lengthInBytes = Ints.checkedCast(base.size());
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
- int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
+ int h1 = hashBytesByIntBlock(base.subBlock(0, lengthAligned), seed);
for (int i = lengthAligned; i < lengthInBytes; i++) {
- int halfWord = Platform.getByte(base, offset + i);
+ int halfWord = base.getByte(i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
return fmix(h1, lengthInBytes);
}
+ public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
+ return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
+ }
+
public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
+ return hashUnsafeBytes2Block(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
+ }
+
+ public static int hashUnsafeBytes2Block(MemoryBlock base, int seed) {
// This is compatible with original and another implementations.
// Use this method for new components after Spark 2.3.
- assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
+ int lengthInBytes = Ints.checkedCast(base.size());
+ assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative";
int lengthAligned = lengthInBytes - lengthInBytes % 4;
- int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
+ int h1 = hashBytesByIntBlock(base.subBlock(0, lengthAligned), seed);
int k1 = 0;
for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
- k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
+ k1 ^= (base.getByte(i) & 0xFF) << shift;
}
h1 ^= mixK1(k1);
return fmix(h1, lengthInBytes);
}
- private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
+ private static int hashBytesByIntBlock(MemoryBlock base, int seed) {
+ long lengthInBytes = base.size();
assert (lengthInBytes % 4 == 0);
int h1 = seed;
- for (int i = 0; i < lengthInBytes; i += 4) {
- int halfWord = Platform.getInt(base, offset + i);
+ for (long i = 0; i < lengthInBytes; i += 4) {
+ int halfWord = base.getInt(i);
int k1 = mixK1(halfWord);
h1 = mixH1(h1, k1);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
new file mode 100644
index 0000000..99a9868
--- /dev/null
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a byte array on Java heap.
+ */
+public final class ByteArrayMemoryBlock extends MemoryBlock {
+
+ private final byte[] array;
+
+ public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
+ super(obj, offset, size);
+ this.array = obj;
+ assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
+ "The sum of size " + size + " and offset " + offset + " should not be larger than " +
+ "the size of the given memory space " + (obj.length + Platform.BYTE_ARRAY_OFFSET);
+ }
+
+ public ByteArrayMemoryBlock(long length) {
+ this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, length);
+ }
+
+ @Override
+ public MemoryBlock subBlock(long offset, long size) {
+ checkSubBlockRange(offset, size);
+ if (offset == 0 && size == this.size()) return this;
+ return new ByteArrayMemoryBlock(array, this.offset + offset, size);
+ }
+
+ public byte[] getByteArray() { return array; }
+
+ /**
+ * Creates a memory block pointing to the memory used by the byte array.
+ */
+ public static ByteArrayMemoryBlock fromArray(final byte[] array) {
+ return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length);
+ }
+
+ @Override
+ public final int getInt(long offset) {
+ return Platform.getInt(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putInt(long offset, int value) {
+ Platform.putInt(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final boolean getBoolean(long offset) {
+ return Platform.getBoolean(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putBoolean(long offset, boolean value) {
+ Platform.putBoolean(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final byte getByte(long offset) {
+ return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)];
+ }
+
+ @Override
+ public final void putByte(long offset, byte value) {
+ array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)] = value;
+ }
+
+ @Override
+ public final short getShort(long offset) {
+ return Platform.getShort(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putShort(long offset, short value) {
+ Platform.putShort(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final long getLong(long offset) {
+ return Platform.getLong(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putLong(long offset, long value) {
+ Platform.putLong(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final float getFloat(long offset) {
+ return Platform.getFloat(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putFloat(long offset, float value) {
+ Platform.putFloat(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final double getDouble(long offset) {
+ return Platform.getDouble(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putDouble(long offset, double value) {
+ Platform.putDouble(array, this.offset + offset, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
index 2733760..acf28fd 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
@@ -58,7 +58,7 @@ public class HeapMemoryAllocator implements MemoryAllocator {
final long[] array = arrayReference.get();
if (array != null) {
assert (array.length * 8L >= size);
- MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
+ MemoryBlock memory = OnHeapMemoryBlock.fromArray(array, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
@@ -70,7 +70,7 @@ public class HeapMemoryAllocator implements MemoryAllocator {
}
}
long[] array = new long[numWords];
- MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
+ MemoryBlock memory = OnHeapMemoryBlock.fromArray(array, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
@@ -79,12 +79,13 @@ public class HeapMemoryAllocator implements MemoryAllocator {
@Override
public void free(MemoryBlock memory) {
- assert (memory.obj != null) :
+ assert(memory instanceof OnHeapMemoryBlock);
+ assert (memory.getBaseObject() != null) :
"baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?";
- assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+ assert (memory.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"page has already been freed";
- assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
- || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+ assert ((memory.getPageNumber() == MemoryBlock.NO_PAGE_NUMBER)
+ || (memory.getPageNumber() == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
"TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator " +
"free()";
@@ -94,12 +95,12 @@ public class HeapMemoryAllocator implements MemoryAllocator {
}
// Mark the page as freed (so we can detect double-frees).
- memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
+ memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER);
// As an additional layer of defense against use-after-free bugs, we mutate the
// MemoryBlock to null out its reference to the long[] array.
- long[] array = (long[]) memory.obj;
- memory.setObjAndOffset(null, 0);
+ long[] array = ((OnHeapMemoryBlock)memory).getLongArray();
+ memory.resetObjAndOffset();
long alignedSize = ((size + 7) / 8) * 8;
if (shouldPool(alignedSize)) {
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
index 7b58868..38315fb 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
@@ -38,7 +38,7 @@ public interface MemoryAllocator {
void free(MemoryBlock memory);
- MemoryAllocator UNSAFE = new UnsafeMemoryAllocator();
+ UnsafeMemoryAllocator UNSAFE = new UnsafeMemoryAllocator();
- MemoryAllocator HEAP = new HeapMemoryAllocator();
+ HeapMemoryAllocator HEAP = new HeapMemoryAllocator();
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
index c333857..b086941 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
@@ -22,10 +22,10 @@ import javax.annotation.Nullable;
import org.apache.spark.unsafe.Platform;
/**
- * A consecutive block of memory, starting at a {@link MemoryLocation} with a fixed size.
+ * A representation of a consecutive memory block in Spark. It defines the common interfaces
+ * for memory accessing and mutating.
*/
-public class MemoryBlock extends MemoryLocation {
-
+public abstract class MemoryBlock {
/** Special `pageNumber` value for pages which were not allocated by TaskMemoryManagers */
public static final int NO_PAGE_NUMBER = -1;
@@ -45,38 +45,163 @@ public class MemoryBlock extends MemoryLocation {
*/
public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
- private final long length;
+ @Nullable
+ protected Object obj;
+
+ protected long offset;
+
+ protected long length;
/**
* Optional page number; used when this MemoryBlock represents a page allocated by a
- * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager,
- * which lives in a different package.
+ * TaskMemoryManager. This field can be updated using setPageNumber method so that
+ * this can be modified by the TaskMemoryManager, which lives in a different package.
*/
- public int pageNumber = NO_PAGE_NUMBER;
+ private int pageNumber = NO_PAGE_NUMBER;
- public MemoryBlock(@Nullable Object obj, long offset, long length) {
- super(obj, offset);
+ protected MemoryBlock(@Nullable Object obj, long offset, long length) {
+ if (offset < 0 || length < 0) {
+ throw new IllegalArgumentException(
+ "Length " + length + " and offset " + offset + "must be non-negative");
+ }
+ this.obj = obj;
+ this.offset = offset;
this.length = length;
}
+ protected MemoryBlock() {
+ this(null, 0, 0);
+ }
+
+ public final Object getBaseObject() {
+ return obj;
+ }
+
+ public final long getBaseOffset() {
+ return offset;
+ }
+
+ public void resetObjAndOffset() {
+ this.obj = null;
+ this.offset = 0;
+ }
+
/**
* Returns the size of the memory block.
*/
- public long size() {
+ public final long size() {
return length;
}
- /**
- * Creates a memory block pointing to the memory used by the long array.
- */
- public static MemoryBlock fromLongArray(final long[] array) {
- return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
+ public final void setPageNumber(int pageNum) {
+ pageNumber = pageNum;
+ }
+
+ public final int getPageNumber() {
+ return pageNumber;
}
/**
* Fills the memory block with the specified byte value.
*/
- public void fill(byte value) {
+ public final void fill(byte value) {
Platform.setMemory(obj, offset, length, value);
}
+
+ /**
+ * Instantiate MemoryBlock for given object type with new offset
+ */
+ public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) {
+ MemoryBlock mb = null;
+ if (obj instanceof byte[]) {
+ byte[] array = (byte[])obj;
+ mb = new ByteArrayMemoryBlock(array, offset, length);
+ } else if (obj instanceof long[]) {
+ long[] array = (long[])obj;
+ mb = new OnHeapMemoryBlock(array, offset, length);
+ } else if (obj == null) {
+ // we assume that to pass null pointer means off-heap
+ mb = new OffHeapMemoryBlock(offset, length);
+ } else {
+ throw new UnsupportedOperationException(
+ "Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now");
+ }
+ return mb;
+ }
+
+ /**
+ * Just instantiate the sub-block with the same type of MemoryBlock with the new size and relative
+ * offset from the original offset. The data is not copied.
+ * If parameters are invalid, an exception is thrown.
+ */
+ public abstract MemoryBlock subBlock(long offset, long size);
+
+ protected void checkSubBlockRange(long offset, long size) {
+ if (offset < 0 || size < 0) {
+ throw new ArrayIndexOutOfBoundsException(
+ "Size " + size + " and offset " + offset + " must be non-negative");
+ }
+ if (offset + size > length) {
+ throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " +
+ offset + " should not be larger than the length " + length + " in the MemoryBlock");
+ }
+ }
+
+ /**
+ * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal
+ * memory access, throw an exception, or etc.
+ * getXXX/putXXX uses an index based on this.offset that includes the size of metadata such as
+ * JVM object header. The offset is 0-based and is expected as an logical offset in the memory
+ * block.
+ */
+ public abstract int getInt(long offset);
+
+ public abstract void putInt(long offset, int value);
+
+ public abstract boolean getBoolean(long offset);
+
+ public abstract void putBoolean(long offset, boolean value);
+
+ public abstract byte getByte(long offset);
+
+ public abstract void putByte(long offset, byte value);
+
+ public abstract short getShort(long offset);
+
+ public abstract void putShort(long offset, short value);
+
+ public abstract long getLong(long offset);
+
+ public abstract void putLong(long offset, long value);
+
+ public abstract float getFloat(long offset);
+
+ public abstract void putFloat(long offset, float value);
+
+ public abstract double getDouble(long offset);
+
+ public abstract void putDouble(long offset, double value);
+
+ public static final void copyMemory(
+ MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset, long length) {
+ assert(srcOffset + length <= src.length && dstOffset + length <= dst.length);
+ Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() + srcOffset,
+ dst.getBaseObject(), dst.getBaseOffset() + dstOffset, length);
+ }
+
+ public static final void copyMemory(MemoryBlock src, MemoryBlock dst, long length) {
+ assert(length <= src.length && length <= dst.length);
+ Platform.copyMemory(src.getBaseObject(), src.getBaseOffset(),
+ dst.getBaseObject(), dst.getBaseOffset(), length);
+ }
+
+ public final void copyFrom(Object src, long srcOffset, long dstOffset, long length) {
+ assert(length <= this.length - srcOffset);
+ Platform.copyMemory(src, srcOffset, obj, offset + dstOffset, length);
+ }
+
+ public final void writeTo(long srcOffset, Object dst, long dstOffset, long length) {
+ assert(length <= this.length - srcOffset);
+ Platform.copyMemory(obj, offset + srcOffset, dst, dstOffset, length);
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java
deleted file mode 100644
index 74ebc87..0000000
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.memory;
-
-import javax.annotation.Nullable;
-
-/**
- * A memory location. Tracked either by a memory address (with off-heap allocation),
- * or by an offset from a JVM object (in-heap allocation).
- */
-public class MemoryLocation {
-
- @Nullable
- Object obj;
-
- long offset;
-
- public MemoryLocation(@Nullable Object obj, long offset) {
- this.obj = obj;
- this.offset = offset;
- }
-
- public MemoryLocation() {
- this(null, 0);
- }
-
- public void setObjAndOffset(Object newObj, long newOffset) {
- this.obj = newObj;
- this.offset = newOffset;
- }
-
- public final Object getBaseObject() {
- return obj;
- }
-
- public final long getBaseOffset() {
- return offset;
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java
new file mode 100644
index 0000000..f90f62b
--- /dev/null
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import org.apache.spark.unsafe.Platform;
+
+public class OffHeapMemoryBlock extends MemoryBlock {
+ static public final OffHeapMemoryBlock NULL = new OffHeapMemoryBlock(0, 0);
+
+ public OffHeapMemoryBlock(long address, long size) {
+ super(null, address, size);
+ }
+
+ @Override
+ public MemoryBlock subBlock(long offset, long size) {
+ checkSubBlockRange(offset, size);
+ if (offset == 0 && size == this.size()) return this;
+ return new OffHeapMemoryBlock(this.offset + offset, size);
+ }
+
+ @Override
+ public final int getInt(long offset) {
+ return Platform.getInt(null, this.offset + offset);
+ }
+
+ @Override
+ public final void putInt(long offset, int value) {
+ Platform.putInt(null, this.offset + offset, value);
+ }
+
+ @Override
+ public final boolean getBoolean(long offset) {
+ return Platform.getBoolean(null, this.offset + offset);
+ }
+
+ @Override
+ public final void putBoolean(long offset, boolean value) {
+ Platform.putBoolean(null, this.offset + offset, value);
+ }
+
+ @Override
+ public final byte getByte(long offset) {
+ return Platform.getByte(null, this.offset + offset);
+ }
+
+ @Override
+ public final void putByte(long offset, byte value) {
+ Platform.putByte(null, this.offset + offset, value);
+ }
+
+ @Override
+ public final short getShort(long offset) {
+ return Platform.getShort(null, this.offset + offset);
+ }
+
+ @Override
+ public final void putShort(long offset, short value) {
+ Platform.putShort(null, this.offset + offset, value);
+ }
+
+ @Override
+ public final long getLong(long offset) {
+ return Platform.getLong(null, this.offset + offset);
+ }
+
+ @Override
+ public final void putLong(long offset, long value) {
+ Platform.putLong(null, this.offset + offset, value);
+ }
+
+ @Override
+ public final float getFloat(long offset) {
+ return Platform.getFloat(null, this.offset + offset);
+ }
+
+ @Override
+ public final void putFloat(long offset, float value) {
+ Platform.putFloat(null, this.offset + offset, value);
+ }
+
+ @Override
+ public final double getDouble(long offset) {
+ return Platform.getDouble(null, this.offset + offset);
+ }
+
+ @Override
+ public final void putDouble(long offset, double value) {
+ Platform.putDouble(null, this.offset + offset, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
new file mode 100644
index 0000000..12f67c7
--- /dev/null
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a long array on Java heap.
+ */
+public final class OnHeapMemoryBlock extends MemoryBlock {
+
+ private final long[] array;
+
+ public OnHeapMemoryBlock(long[] obj, long offset, long size) {
+ super(obj, offset, size);
+ this.array = obj;
+ assert(offset + size <= obj.length * 8L + Platform.LONG_ARRAY_OFFSET) :
+ "The sum of size " + size + " and offset " + offset + " should not be larger than " +
+ "the size of the given memory space " + (obj.length * 8L + Platform.LONG_ARRAY_OFFSET);
+ }
+
+ public OnHeapMemoryBlock(long size) {
+ this(new long[Ints.checkedCast((size + 7) / 8)], Platform.LONG_ARRAY_OFFSET, size);
+ }
+
+ @Override
+ public MemoryBlock subBlock(long offset, long size) {
+ checkSubBlockRange(offset, size);
+ if (offset == 0 && size == this.size()) return this;
+ return new OnHeapMemoryBlock(array, this.offset + offset, size);
+ }
+
+ public long[] getLongArray() { return array; }
+
+ /**
+ * Creates a memory block pointing to the memory used by the long array.
+ */
+ public static OnHeapMemoryBlock fromArray(final long[] array) {
+ return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
+ }
+
+ public static OnHeapMemoryBlock fromArray(final long[] array, long size) {
+ return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
+ }
+
+ @Override
+ public final int getInt(long offset) {
+ return Platform.getInt(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putInt(long offset, int value) {
+ Platform.putInt(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final boolean getBoolean(long offset) {
+ return Platform.getBoolean(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putBoolean(long offset, boolean value) {
+ Platform.putBoolean(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final byte getByte(long offset) {
+ return Platform.getByte(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putByte(long offset, byte value) {
+ Platform.putByte(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final short getShort(long offset) {
+ return Platform.getShort(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putShort(long offset, short value) {
+ Platform.putShort(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final long getLong(long offset) {
+ return Platform.getLong(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putLong(long offset, long value) {
+ Platform.putLong(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final float getFloat(long offset) {
+ return Platform.getFloat(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putFloat(long offset, float value) {
+ Platform.putFloat(array, this.offset + offset, value);
+ }
+
+ @Override
+ public final double getDouble(long offset) {
+ return Platform.getDouble(array, this.offset + offset);
+ }
+
+ @Override
+ public final void putDouble(long offset, double value) {
+ Platform.putDouble(array, this.offset + offset, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
index 4368fb6..5310bdf 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
@@ -25,9 +25,9 @@ import org.apache.spark.unsafe.Platform;
public class UnsafeMemoryAllocator implements MemoryAllocator {
@Override
- public MemoryBlock allocate(long size) throws OutOfMemoryError {
+ public OffHeapMemoryBlock allocate(long size) throws OutOfMemoryError {
long address = Platform.allocateMemory(size);
- MemoryBlock memory = new MemoryBlock(null, address, size);
+ OffHeapMemoryBlock memory = new OffHeapMemoryBlock(address, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
@@ -36,22 +36,25 @@ public class UnsafeMemoryAllocator implements MemoryAllocator {
@Override
public void free(MemoryBlock memory) {
- assert (memory.obj == null) :
- "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?";
- assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+ assert(memory instanceof OffHeapMemoryBlock) :
+ "UnsafeMemoryAllocator can only free OffHeapMemoryBlock.";
+ if (memory == OffHeapMemoryBlock.NULL) return;
+ assert (memory.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
"page has already been freed";
- assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
- || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+ assert ((memory.getPageNumber() == MemoryBlock.NO_PAGE_NUMBER)
+ || (memory.getPageNumber() == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
"TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()";
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
}
+
Platform.freeMemory(memory.offset);
+
// As an additional layer of defense against use-after-free bugs, we mutate the
// MemoryBlock to reset its pointer.
- memory.offset = 0;
+ memory.resetObjAndOffset();
// Mark the page as freed (so we can detect double-frees).
- memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
+ memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER);
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index 5d468ae..e9b3d9b 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -30,9 +30,12 @@ import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import com.google.common.primitives.Ints;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
+import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
+import org.apache.spark.unsafe.memory.MemoryBlock;
import static org.apache.spark.unsafe.Platform.*;
@@ -50,12 +53,13 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
// These are only updated by readExternal() or read()
@Nonnull
- private Object base;
- private long offset;
+ private MemoryBlock base;
+ // While numBytes has the same value as base.size(), to keep as int avoids cast from long to int
private int numBytes;
- public Object getBaseObject() { return base; }
- public long getBaseOffset() { return offset; }
+ public MemoryBlock getMemoryBlock() { return base; }
+ public Object getBaseObject() { return base.getBaseObject(); }
+ public long getBaseOffset() { return base.getBaseOffset(); }
/**
* A char in UTF-8 encoding can take 1-4 bytes depending on the first byte which
@@ -108,7 +112,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
*/
public static UTF8String fromBytes(byte[] bytes) {
if (bytes != null) {
- return new UTF8String(bytes, BYTE_ARRAY_OFFSET, bytes.length);
+ return new UTF8String(
+ new ByteArrayMemoryBlock(bytes, BYTE_ARRAY_OFFSET, bytes.length));
} else {
return null;
}
@@ -121,20 +126,14 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
*/
public static UTF8String fromBytes(byte[] bytes, int offset, int numBytes) {
if (bytes != null) {
- return new UTF8String(bytes, BYTE_ARRAY_OFFSET + offset, numBytes);
+ return new UTF8String(
+ new ByteArrayMemoryBlock(bytes, BYTE_ARRAY_OFFSET + offset, numBytes));
} else {
return null;
}
}
/**
- * Creates an UTF8String from given address (base and offset) and length.
- */
- public static UTF8String fromAddress(Object base, long offset, int numBytes) {
- return new UTF8String(base, offset, numBytes);
- }
-
- /**
* Creates an UTF8String from String.
*/
public static UTF8String fromString(String str) {
@@ -150,16 +149,13 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
return fromBytes(spaces);
}
- protected UTF8String(Object base, long offset, int numBytes) {
+ public UTF8String(MemoryBlock base) {
this.base = base;
- this.offset = offset;
- this.numBytes = numBytes;
+ this.numBytes = Ints.checkedCast(base.size());
}
// for serialization
- public UTF8String() {
- this(null, 0, 0);
- }
+ public UTF8String() {}
/**
* Writes the content of this string into a memory address, identified by an object and an offset.
@@ -167,7 +163,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
* bytes in this string.
*/
public void writeToMemory(Object target, long targetOffset) {
- Platform.copyMemory(base, offset, target, targetOffset, numBytes);
+ base.writeTo(0, target, targetOffset, numBytes);
}
public void writeTo(ByteBuffer buffer) {
@@ -187,8 +183,9 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
*/
@Nonnull
public ByteBuffer getByteBuffer() {
- if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) {
- final byte[] bytes = (byte[]) base;
+ long offset = base.getBaseOffset();
+ if (base instanceof ByteArrayMemoryBlock && offset >= BYTE_ARRAY_OFFSET) {
+ final byte[] bytes = ((ByteArrayMemoryBlock) base).getByteArray();
// the offset includes an object header... this is only needed for unsafe copies
final long arrayOffset = offset - BYTE_ARRAY_OFFSET;
@@ -255,12 +252,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
long mask = 0;
if (IS_LITTLE_ENDIAN) {
if (numBytes >= 8) {
- p = Platform.getLong(base, offset);
+ p = base.getLong(0);
} else if (numBytes > 4) {
- p = Platform.getLong(base, offset);
+ p = base.getLong(0);
mask = (1L << (8 - numBytes) * 8) - 1;
} else if (numBytes > 0) {
- p = (long) Platform.getInt(base, offset);
+ p = (long) base.getInt(0);
mask = (1L << (8 - numBytes) * 8) - 1;
} else {
p = 0;
@@ -269,12 +266,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
} else {
// byteOrder == ByteOrder.BIG_ENDIAN
if (numBytes >= 8) {
- p = Platform.getLong(base, offset);
+ p = base.getLong(0);
} else if (numBytes > 4) {
- p = Platform.getLong(base, offset);
+ p = base.getLong(0);
mask = (1L << (8 - numBytes) * 8) - 1;
} else if (numBytes > 0) {
- p = ((long) Platform.getInt(base, offset)) << 32;
+ p = ((long) base.getInt(0)) << 32;
mask = (1L << (8 - numBytes) * 8) - 1;
} else {
p = 0;
@@ -289,12 +286,13 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
*/
public byte[] getBytes() {
// avoid copy if `base` is `byte[]`
- if (offset == BYTE_ARRAY_OFFSET && base instanceof byte[]
- && ((byte[]) base).length == numBytes) {
- return (byte[]) base;
+ long offset = base.getBaseOffset();
+ if (offset == BYTE_ARRAY_OFFSET && base instanceof ByteArrayMemoryBlock
+ && (((ByteArrayMemoryBlock) base).getByteArray()).length == numBytes) {
+ return ((ByteArrayMemoryBlock) base).getByteArray();
} else {
byte[] bytes = new byte[numBytes];
- copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, numBytes);
+ base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, numBytes);
return bytes;
}
}
@@ -324,7 +322,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
if (i > j) {
byte[] bytes = new byte[i - j];
- copyMemory(base, offset + j, bytes, BYTE_ARRAY_OFFSET, i - j);
+ base.writeTo(j, bytes, BYTE_ARRAY_OFFSET, i - j);
return fromBytes(bytes);
} else {
return EMPTY_UTF8;
@@ -365,14 +363,14 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
* Returns the byte at position `i`.
*/
private byte getByte(int i) {
- return Platform.getByte(base, offset + i);
+ return base.getByte(i);
}
private boolean matchAt(final UTF8String s, int pos) {
if (s.numBytes + pos > numBytes || pos < 0) {
return false;
}
- return ByteArrayMethods.arrayEquals(base, offset + pos, s.base, s.offset, s.numBytes);
+ return ByteArrayMethods.arrayEqualsBlock(base, pos, s.base, 0, s.numBytes);
}
public boolean startsWith(final UTF8String prefix) {
@@ -499,8 +497,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
for (int i = 0; i < numBytes; i++) {
if (getByte(i) == (byte) ',') {
if (i - (lastComma + 1) == match.numBytes &&
- ByteArrayMethods.arrayEquals(base, offset + (lastComma + 1), match.base, match.offset,
- match.numBytes)) {
+ ByteArrayMethods.arrayEqualsBlock(base, lastComma + 1, match.base, 0, match.numBytes)) {
return n;
}
lastComma = i;
@@ -508,8 +505,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
}
if (numBytes - (lastComma + 1) == match.numBytes &&
- ByteArrayMethods.arrayEquals(base, offset + (lastComma + 1), match.base, match.offset,
- match.numBytes)) {
+ ByteArrayMethods.arrayEqualsBlock(base, lastComma + 1, match.base, 0, match.numBytes)) {
return n;
}
return 0;
@@ -524,7 +520,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
private UTF8String copyUTF8String(int start, int end) {
int len = end - start + 1;
byte[] newBytes = new byte[len];
- copyMemory(base, offset + start, newBytes, BYTE_ARRAY_OFFSET, len);
+ base.writeTo(start, newBytes, BYTE_ARRAY_OFFSET, len);
return UTF8String.fromBytes(newBytes);
}
@@ -671,8 +667,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
int i = 0; // position in byte
while (i < numBytes) {
int len = numBytesForFirstByte(getByte(i));
- copyMemory(this.base, this.offset + i, result,
- BYTE_ARRAY_OFFSET + result.length - i - len, len);
+ base.writeTo(i, result, BYTE_ARRAY_OFFSET + result.length - i - len, len);
i += len;
}
@@ -686,7 +681,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
byte[] newBytes = new byte[numBytes * times];
- copyMemory(this.base, this.offset, newBytes, BYTE_ARRAY_OFFSET, numBytes);
+ base.writeTo(0, newBytes, BYTE_ARRAY_OFFSET, numBytes);
int copied = 1;
while (copied < times) {
@@ -723,7 +718,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
if (i + v.numBytes > numBytes) {
return -1;
}
- if (ByteArrayMethods.arrayEquals(base, offset + i, v.base, v.offset, v.numBytes)) {
+ if (ByteArrayMethods.arrayEqualsBlock(base, i, v.base, 0, v.numBytes)) {
return c;
}
i += numBytesForFirstByte(getByte(i));
@@ -739,7 +734,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
private int find(UTF8String str, int start) {
assert (str.numBytes > 0);
while (start <= numBytes - str.numBytes) {
- if (ByteArrayMethods.arrayEquals(base, offset + start, str.base, str.offset, str.numBytes)) {
+ if (ByteArrayMethods.arrayEqualsBlock(base, start, str.base, 0, str.numBytes)) {
return start;
}
start += 1;
@@ -753,7 +748,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
private int rfind(UTF8String str, int start) {
assert (str.numBytes > 0);
while (start >= 0) {
- if (ByteArrayMethods.arrayEquals(base, offset + start, str.base, str.offset, str.numBytes)) {
+ if (ByteArrayMethods.arrayEqualsBlock(base, start, str.base, 0, str.numBytes)) {
return start;
}
start -= 1;
@@ -786,7 +781,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
return EMPTY_UTF8;
}
byte[] bytes = new byte[idx];
- copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, idx);
+ base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, idx);
return fromBytes(bytes);
} else {
@@ -806,7 +801,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
int size = numBytes - delim.numBytes - idx;
byte[] bytes = new byte[size];
- copyMemory(base, offset + idx + delim.numBytes, bytes, BYTE_ARRAY_OFFSET, size);
+ base.writeTo(idx + delim.numBytes, bytes, BYTE_ARRAY_OFFSET, size);
return fromBytes(bytes);
}
}
@@ -829,15 +824,15 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
UTF8String remain = pad.substring(0, spaces - padChars * count);
byte[] data = new byte[this.numBytes + pad.numBytes * count + remain.numBytes];
- copyMemory(this.base, this.offset, data, BYTE_ARRAY_OFFSET, this.numBytes);
+ base.writeTo(0, data, BYTE_ARRAY_OFFSET, this.numBytes);
int offset = this.numBytes;
int idx = 0;
while (idx < count) {
- copyMemory(pad.base, pad.offset, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
+ pad.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
++ idx;
offset += pad.numBytes;
}
- copyMemory(remain.base, remain.offset, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
+ remain.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
return UTF8String.fromBytes(data);
}
@@ -865,13 +860,13 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
int offset = 0;
int idx = 0;
while (idx < count) {
- copyMemory(pad.base, pad.offset, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
+ pad.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
++ idx;
offset += pad.numBytes;
}
- copyMemory(remain.base, remain.offset, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
+ remain.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
offset += remain.numBytes;
- copyMemory(this.base, this.offset, data, BYTE_ARRAY_OFFSET + offset, numBytes());
+ base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, numBytes());
return UTF8String.fromBytes(data);
}
@@ -896,8 +891,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
int offset = 0;
for (int i = 0; i < inputs.length; i++) {
int len = inputs[i].numBytes;
- copyMemory(
- inputs[i].base, inputs[i].offset,
+ inputs[i].base.writeTo(
+ 0,
result, BYTE_ARRAY_OFFSET + offset,
len);
offset += len;
@@ -936,8 +931,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
for (int i = 0, j = 0; i < inputs.length; i++) {
if (inputs[i] != null) {
int len = inputs[i].numBytes;
- copyMemory(
- inputs[i].base, inputs[i].offset,
+ inputs[i].base.writeTo(
+ 0,
result, BYTE_ARRAY_OFFSET + offset,
len);
offset += len;
@@ -945,8 +940,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
j++;
// Add separator if this is not the last input.
if (j < numInputs) {
- copyMemory(
- separator.base, separator.offset,
+ separator.base.writeTo(
+ 0,
result, BYTE_ARRAY_OFFSET + offset,
separator.numBytes);
offset += separator.numBytes;
@@ -1220,7 +1215,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
public UTF8String copy() {
byte[] bytes = new byte[numBytes];
- copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, numBytes);
+ base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, numBytes);
return fromBytes(bytes);
}
@@ -1228,11 +1223,10 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
public int compareTo(@Nonnull final UTF8String other) {
int len = Math.min(numBytes, other.numBytes);
int wordMax = (len / 8) * 8;
- long roffset = other.offset;
- Object rbase = other.base;
+ MemoryBlock rbase = other.base;
for (int i = 0; i < wordMax; i += 8) {
- long left = getLong(base, offset + i);
- long right = getLong(rbase, roffset + i);
+ long left = base.getLong(i);
+ long right = rbase.getLong(i);
if (left != right) {
if (IS_LITTLE_ENDIAN) {
return Long.compareUnsigned(Long.reverseBytes(left), Long.reverseBytes(right));
@@ -1243,7 +1237,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
for (int i = wordMax; i < len; i++) {
// In UTF-8, the byte should be unsigned, so we should compare them as unsigned int.
- int res = (getByte(i) & 0xFF) - (Platform.getByte(rbase, roffset + i) & 0xFF);
+ int res = (getByte(i) & 0xFF) - (rbase.getByte(i) & 0xFF);
if (res != 0) {
return res;
}
@@ -1262,7 +1256,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
if (numBytes != o.numBytes) {
return false;
}
- return ByteArrayMethods.arrayEquals(base, offset, o.base, o.offset, numBytes);
+ return ByteArrayMethods.arrayEqualsBlock(base, 0, o.base, 0, numBytes);
} else {
return false;
}
@@ -1318,8 +1312,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
num_bytes_j != numBytesForFirstByte(s.getByte(i_bytes))) {
cost = 1;
} else {
- cost = (ByteArrayMethods.arrayEquals(t.base, t.offset + j_bytes, s.base,
- s.offset + i_bytes, num_bytes_j)) ? 0 : 1;
+ cost = (ByteArrayMethods.arrayEqualsBlock(t.base, j_bytes, s.base,
+ i_bytes, num_bytes_j)) ? 0 : 1;
}
d[i + 1] = Math.min(Math.min(d[i] + 1, p[i + 1] + 1), p[i] + cost);
}
@@ -1334,7 +1328,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
@Override
public int hashCode() {
- return Murmur3_x86_32.hashUnsafeBytes(base, offset, numBytes, 42);
+ return Murmur3_x86_32.hashUnsafeBytesBlock(base,42);
}
/**
@@ -1397,10 +1391,10 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- offset = BYTE_ARRAY_OFFSET;
numBytes = in.readInt();
- base = new byte[numBytes];
- in.readFully((byte[]) base);
+ byte[] bytes = new byte[numBytes];
+ in.readFully(bytes);
+ base = ByteArrayMemoryBlock.fromArray(bytes);
}
@Override
@@ -1412,10 +1406,10 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
@Override
public void read(Kryo kryo, Input in) {
- this.offset = BYTE_ARRAY_OFFSET;
- this.numBytes = in.readInt();
- this.base = new byte[numBytes];
- in.read((byte[]) base);
+ numBytes = in.readInt();
+ byte[] bytes = new byte[numBytes];
+ in.read(bytes);
+ base = ByteArrayMemoryBlock.fromArray(bytes);
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
index 3ad9ac7..583a148 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
@@ -81,7 +81,7 @@ public class PlatformUtilSuite {
MemoryAllocator.HEAP.free(block);
Assert.assertNull(block.getBaseObject());
Assert.assertEquals(0, block.getBaseOffset());
- Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.pageNumber);
+ Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.getPageNumber());
}
@Test
@@ -92,7 +92,7 @@ public class PlatformUtilSuite {
MemoryAllocator.UNSAFE.free(block);
Assert.assertNull(block.getBaseObject());
Assert.assertEquals(0, block.getBaseOffset());
- Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.pageNumber);
+ Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.getPageNumber());
}
@Test(expected = AssertionError.class)
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
index fb8e53b..8c2e98c 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
@@ -20,14 +20,13 @@ package org.apache.spark.unsafe.array;
import org.junit.Assert;
import org.junit.Test;
-import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.memory.OnHeapMemoryBlock;
public class LongArraySuite {
@Test
public void basicTest() {
- long[] bytes = new long[2];
- LongArray arr = new LongArray(MemoryBlock.fromLongArray(bytes));
+ LongArray arr = new LongArray(new OnHeapMemoryBlock(16));
arr.set(0, 1L);
arr.set(1, 2L);
arr.set(1, 3L);
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
index 6348a73..d7ed005 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
@@ -71,6 +71,24 @@ public class Murmur3_x86_32Suite {
}
@Test
+ public void testKnownWordsInputs() {
+ byte[] bytes = new byte[16];
+ long offset = Platform.BYTE_ARRAY_OFFSET;
+ for (int i = 0; i < 16; i++) {
+ bytes[i] = 0;
+ }
+ Assert.assertEquals(-300363099, hasher.hashUnsafeWords(bytes, offset, 16, 42));
+ for (int i = 0; i < 16; i++) {
+ bytes[i] = -1;
+ }
+ Assert.assertEquals(-1210324667, hasher.hashUnsafeWords(bytes, offset, 16, 42));
+ for (int i = 0; i < 16; i++) {
+ bytes[i] = (byte)i;
+ }
+ Assert.assertEquals(-634919701, hasher.hashUnsafeWords(bytes, offset, 16, 42));
+ }
+
+ @Test
public void randomizedStressTest() {
int size = 65536;
Random rand = new Random();
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java
new file mode 100644
index 0000000..47f05c9
--- /dev/null
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import org.apache.spark.unsafe.Platform;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteOrder;
+
+import static org.hamcrest.core.StringContains.containsString;
+
+public class MemoryBlockSuite {
+ private static final boolean bigEndianPlatform =
+ ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
+
+ private void check(MemoryBlock memory, Object obj, long offset, int length) {
+ memory.setPageNumber(1);
+ memory.fill((byte)-1);
+ memory.putBoolean(0, true);
+ memory.putByte(1, (byte)127);
+ memory.putShort(2, (short)257);
+ memory.putInt(4, 0x20000002);
+ memory.putLong(8, 0x1234567089ABCDEFL);
+ memory.putFloat(16, 1.0F);
+ memory.putLong(20, 0x1234567089ABCDEFL);
+ memory.putDouble(28, 2.0);
+ MemoryBlock.copyMemory(memory, 0L, memory, 36, 4);
+ int[] a = new int[2];
+ a[0] = 0x12345678;
+ a[1] = 0x13579BDF;
+ memory.copyFrom(a, Platform.INT_ARRAY_OFFSET, 40, 8);
+ byte[] b = new byte[8];
+ memory.writeTo(40, b, Platform.BYTE_ARRAY_OFFSET, 8);
+
+ Assert.assertEquals(obj, memory.getBaseObject());
+ Assert.assertEquals(offset, memory.getBaseOffset());
+ Assert.assertEquals(length, memory.size());
+ Assert.assertEquals(1, memory.getPageNumber());
+ Assert.assertEquals(true, memory.getBoolean(0));
+ Assert.assertEquals((byte)127, memory.getByte(1 ));
+ Assert.assertEquals((short)257, memory.getShort(2));
+ Assert.assertEquals(0x20000002, memory.getInt(4));
+ Assert.assertEquals(0x1234567089ABCDEFL, memory.getLong(8));
+ Assert.assertEquals(1.0F, memory.getFloat(16), 0);
+ Assert.assertEquals(0x1234567089ABCDEFL, memory.getLong(20));
+ Assert.assertEquals(2.0, memory.getDouble(28), 0);
+ Assert.assertEquals(true, memory.getBoolean(36));
+ Assert.assertEquals((byte)127, memory.getByte(37 ));
+ Assert.assertEquals((short)257, memory.getShort(38));
+ Assert.assertEquals(a[0], memory.getInt(40));
+ Assert.assertEquals(a[1], memory.getInt(44));
+ if (bigEndianPlatform) {
+ Assert.assertEquals(a[0],
+ ((int)b[0] & 0xff) << 24 | ((int)b[1] & 0xff) << 16 |
+ ((int)b[2] & 0xff) << 8 | ((int)b[3] & 0xff));
+ Assert.assertEquals(a[1],
+ ((int)b[4] & 0xff) << 24 | ((int)b[5] & 0xff) << 16 |
+ ((int)b[6] & 0xff) << 8 | ((int)b[7] & 0xff));
+ } else {
+ Assert.assertEquals(a[0],
+ ((int)b[3] & 0xff) << 24 | ((int)b[2] & 0xff) << 16 |
+ ((int)b[1] & 0xff) << 8 | ((int)b[0] & 0xff));
+ Assert.assertEquals(a[1],
+ ((int)b[7] & 0xff) << 24 | ((int)b[6] & 0xff) << 16 |
+ ((int)b[5] & 0xff) << 8 | ((int)b[4] & 0xff));
+ }
+ for (int i = 48; i < memory.size(); i++) {
+ Assert.assertEquals((byte) -1, memory.getByte(i));
+ }
+
+ assert(memory.subBlock(0, memory.size()) == memory);
+
+ try {
+ memory.subBlock(-8, 8);
+ Assert.fail();
+ } catch (Exception expected) {
+ Assert.assertThat(expected.getMessage(), containsString("non-negative"));
+ }
+
+ try {
+ memory.subBlock(0, -8);
+ Assert.fail();
+ } catch (Exception expected) {
+ Assert.assertThat(expected.getMessage(), containsString("non-negative"));
+ }
+
+ try {
+ memory.subBlock(0, length + 8);
+ Assert.fail();
+ } catch (Exception expected) {
+ Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
+ }
+
+ try {
+ memory.subBlock(8, length - 4);
+ Assert.fail();
+ } catch (Exception expected) {
+ Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
+ }
+
+ try {
+ memory.subBlock(length + 8, 4);
+ Assert.fail();
+ } catch (Exception expected) {
+ Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
+ }
+ }
+
+ @Test
+ public void ByteArrayMemoryBlockTest() {
+ byte[] obj = new byte[56];
+ long offset = Platform.BYTE_ARRAY_OFFSET;
+ int length = obj.length;
+
+ MemoryBlock memory = new ByteArrayMemoryBlock(obj, offset, length);
+ check(memory, obj, offset, length);
+
+ memory = ByteArrayMemoryBlock.fromArray(obj);
+ check(memory, obj, offset, length);
+
+ obj = new byte[112];
+ memory = new ByteArrayMemoryBlock(obj, offset, length);
+ check(memory, obj, offset, length);
+ }
+
+ @Test
+ public void OnHeapMemoryBlockTest() {
+ long[] obj = new long[7];
+ long offset = Platform.LONG_ARRAY_OFFSET;
+ int length = obj.length * 8;
+
+ MemoryBlock memory = new OnHeapMemoryBlock(obj, offset, length);
+ check(memory, obj, offset, length);
+
+ memory = OnHeapMemoryBlock.fromArray(obj);
+ check(memory, obj, offset, length);
+
+ obj = new long[14];
+ memory = new OnHeapMemoryBlock(obj, offset, length);
+ check(memory, obj, offset, length);
+ }
+
+ @Test
+ public void OffHeapArrayMemoryBlockTest() {
+ MemoryAllocator memoryAllocator = new UnsafeMemoryAllocator();
+ MemoryBlock memory = memoryAllocator.allocate(56);
+ Object obj = memory.getBaseObject();
+ long offset = memory.getBaseOffset();
+ int length = 56;
+
+ check(memory, obj, offset, length);
+
+ long address = Platform.allocateMemory(112);
+ memory = new OffHeapMemoryBlock(address, length);
+ obj = memory.getBaseObject();
+ offset = memory.getBaseOffset();
+ check(memory, obj, offset, length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/4807d381/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
index 7c34d41..bad908f 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
@@ -26,6 +26,9 @@ import java.util.*;
import com.google.common.collect.ImmutableMap;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
+import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.memory.OnHeapMemoryBlock;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -519,7 +522,8 @@ public class UTF8StringSuite {
final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
- UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i)
+ new UTF8String(
+ new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i))
.writeTo(outputStream);
final ByteBuffer buffer = ByteBuffer.wrap(outputStream.toByteArray(), i, test.length);
assertEquals("01234567", StandardCharsets.UTF_8.decode(buffer).toString());
@@ -534,7 +538,7 @@ public class UTF8StringSuite {
for (int i = 0; i < test.length; ++i) {
for (int j = 0; j < test.length - i; ++j) {
- UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET + i, j)
+ new UTF8String(ByteArrayMemoryBlock.fromArray(test).subBlock(i, j))
.writeTo(outputStream);
assertArrayEquals(Arrays.copyOfRange(test, i, i + j), outputStream.toByteArray());
@@ -565,7 +569,7 @@ public class UTF8StringSuite {
for (final long offset : offsets) {
try {
- fromAddress(test, BYTE_ARRAY_OFFSET + offset, test.length)
+ new UTF8String(ByteArrayMemoryBlock.fromArray(test).subBlock(offset, test.length))
.writeTo(outputStream);
throw new IllegalStateException(Long.toString(offset));
@@ -592,26 +596,25 @@ public class UTF8StringSuite {
}
@Test
- public void writeToOutputStreamIntArray() throws IOException {
+ public void writeToOutputStreamLongArray() throws IOException {
// verify that writes work on objects that are not byte arrays
- final ByteBuffer buffer = StandardCharsets.UTF_8.encode("大千世界");
+ final ByteBuffer buffer = StandardCharsets.UTF_8.encode("3千大千世界");
buffer.position(0);
buffer.order(ByteOrder.nativeOrder());
final int length = buffer.limit();
- assertEquals(12, length);
+ assertEquals(16, length);
- final int ints = length / 4;
- final int[] array = new int[ints];
+ final int longs = length / 8;
+ final long[] array = new long[longs];
- for (int i = 0; i < ints; ++i) {
- array[i] = buffer.getInt();
+ for (int i = 0; i < longs; ++i) {
+ array[i] = buffer.getLong();
}
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- fromAddress(array, Platform.INT_ARRAY_OFFSET, length)
- .writeTo(outputStream);
- assertEquals("大千世界", outputStream.toString("UTF-8"));
+ new UTF8String(OnHeapMemoryBlock.fromArray(array)).writeTo(outputStream);
+ assertEquals("3千大千世界", outputStream.toString("UTF-8"));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org