You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/09 13:25:36 UTC

[1/2] spark git commit: Revert [SPARK-10399] [SPARK-23879] [SPARK-23762] [SPARK-25317]

Repository: spark
Updated Branches:
  refs/heads/master 1cfda4482 -> 0b9ccd55c


http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 717823e..75690ae 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -26,6 +26,7 @@ import org.apache.spark.TaskContext;
 import org.apache.spark.memory.MemoryConsumer;
 import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.UnsafeAlignedOffset;
 import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.memory.MemoryBlock;
@@ -215,7 +216,12 @@ public final class UnsafeInMemorySorter {
     if (newArray.size() < array.size()) {
       throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
     }
-    MemoryBlock.copyMemory(array.memoryBlock(), newArray.memoryBlock(), pos * 8L);
+    Platform.copyMemory(
+      array.getBaseObject(),
+      array.getBaseOffset(),
+      newArray.getBaseObject(),
+      newArray.getBaseOffset(),
+      pos * 8L);
     consumer.freeArray(array);
     array = newArray;
     usableCapacity = getUsableCapacity();
@@ -342,7 +348,10 @@ public final class UnsafeInMemorySorter {
           array, nullBoundaryPos, (pos - nullBoundaryPos) / 2L, 0, 7,
           radixSortSupport.sortDescending(), radixSortSupport.sortSigned());
       } else {
-        MemoryBlock unused = array.memoryBlock().subBlock(pos * 8L, (array.size() - pos) * 8L);
+        MemoryBlock unused = new MemoryBlock(
+          array.getBaseObject(),
+          array.getBaseOffset() + pos * 8L,
+          (array.size() - pos) * 8L);
         LongArray buffer = new LongArray(unused);
         Sorter<RecordPointerAndKeyPrefix, LongArray> sorter =
           new Sorter<>(new UnsafeSortDataFormat(buffer));

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index d7d2d0b..a0664b3 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -76,7 +76,7 @@ public class TaskMemoryManagerSuite {
     final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
     final MemoryBlock dataPage = manager.allocatePage(256, c);
     c.freePage(dataPage);
-    Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, dataPage.getPageNumber());
+    Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, dataPage.pageNumber);
   }
 
   @Test(expected = AssertionError.class)

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 3e56db5..47173b8 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark._
 import org.apache.spark.memory.MemoryTestingUtils
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.unsafe.array.LongArray
-import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
+import org.apache.spark.unsafe.memory.MemoryBlock
 import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordPointerAndKeyPrefix, UnsafeSortDataFormat}
 
 class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
@@ -105,8 +105,9 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
     // the form [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999]
     // that can trigger copyRange() in TimSort.mergeLo() or TimSort.mergeHi()
     val ref = Array.tabulate[Long](size) { i => if (i < size / 2) size / 2 + i else i }
-    val buf = new LongArray(OnHeapMemoryBlock.fromArray(ref))
-    val tmpBuf = new LongArray(new OnHeapMemoryBlock((size/2) * 8L))
+    val buf = new LongArray(MemoryBlock.fromLongArray(ref))
+    val tmp = new Array[Long](size/2)
+    val tmpBuf = new LongArray(MemoryBlock.fromLongArray(tmp))
 
     new Sorter(new UnsafeSortDataFormat(tmpBuf)).sort(
       buf, 0, size, new Comparator[RecordPointerAndKeyPrefix] {

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
index ddf3740..d5956ea 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
@@ -27,7 +27,7 @@ import com.google.common.primitives.Ints
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.Logging
 import org.apache.spark.unsafe.array.LongArray
-import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
+import org.apache.spark.unsafe.memory.MemoryBlock
 import org.apache.spark.util.collection.Sorter
 import org.apache.spark.util.random.XORShiftRandom
 
@@ -78,14 +78,14 @@ class RadixSortSuite extends SparkFunSuite with Logging {
   private def generateTestData(size: Long, rand: => Long): (Array[JLong], LongArray) = {
     val ref = Array.tabulate[Long](Ints.checkedCast(size)) { i => rand }
     val extended = ref ++ Array.fill[Long](Ints.checkedCast(size))(0)
-    (ref.map(i => new JLong(i)), new LongArray(OnHeapMemoryBlock.fromArray(extended)))
+    (ref.map(i => new JLong(i)), new LongArray(MemoryBlock.fromLongArray(extended)))
   }
 
   private def generateKeyPrefixTestData(size: Long, rand: => Long): (LongArray, LongArray) = {
     val ref = Array.tabulate[Long](Ints.checkedCast(size * 2)) { i => rand }
     val extended = ref ++ Array.fill[Long](Ints.checkedCast(size * 2))(0)
-    (new LongArray(OnHeapMemoryBlock.fromArray(ref)),
-     new LongArray(OnHeapMemoryBlock.fromArray(extended)))
+    (new LongArray(MemoryBlock.fromLongArray(ref)),
+     new LongArray(MemoryBlock.fromLongArray(extended)))
   }
 
   private def collectToArray(array: LongArray, offset: Int, length: Long): Array[Long] = {
@@ -110,7 +110,7 @@ class RadixSortSuite extends SparkFunSuite with Logging {
   }
 
   private def referenceKeyPrefixSort(buf: LongArray, lo: Long, hi: Long, refCmp: PrefixComparator) {
-    val sortBuffer = new LongArray(new OnHeapMemoryBlock(buf.size() * 8L))
+    val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt)))
     new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort(
       buf, Ints.checkedCast(lo), Ints.checkedCast(hi), new Comparator[RecordPointerAndKeyPrefix] {
         override def compare(

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
index dc38ee3..dc18e1d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
@@ -29,7 +29,7 @@ import org.apache.spark.mllib.feature.{HashingTF => OldHashingTF}
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.hash.Murmur3_x86_32.{hashInt, hashLong, hashUnsafeBytes2Block}
+import org.apache.spark.unsafe.hash.Murmur3_x86_32.{hashInt, hashLong, hashUnsafeBytes2}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.OpenHashMap
@@ -244,7 +244,8 @@ object FeatureHasher extends DefaultParamsReadable[FeatureHasher] {
       case f: Float => hashInt(java.lang.Float.floatToIntBits(f), seed)
       case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed)
       case s: String =>
-        hashUnsafeBytes2Block(UTF8String.fromString(s).getMemoryBlock, seed)
+        val utf8 = UTF8String.fromString(s)
+        hashUnsafeBytes2(utf8.getBaseObject, utf8.getBaseOffset, utf8.numBytes(), seed)
       case _ => throw new SparkException("FeatureHasher with murmur3 algorithm does not " +
         s"support type ${term.getClass.getCanonicalName} of input data.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala
index 7b73b28..8935c84 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala
@@ -160,7 +160,7 @@ object HashingTF {
       case d: Double => hashLong(java.lang.Double.doubleToLongBits(d), seed)
       case s: String =>
         val utf8 = UTF8String.fromString(s)
-        hashUnsafeBytesBlock(utf8.getMemoryBlock(), seed)
+        hashUnsafeBytes(utf8.getBaseObject, utf8.getBaseOffset, utf8.numBytes(), seed)
       case _ => throw new SparkException("HashingTF with murmur3 algorithm does not " +
         s"support type ${term.getClass.getCanonicalName} of input data.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
index 9e7b15d..9002abd 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
@@ -27,7 +27,6 @@ import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
-import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
@@ -241,8 +240,7 @@ public final class UnsafeArrayData extends ArrayData {
     final long offsetAndSize = getLong(ordinal);
     final int offset = (int) (offsetAndSize >> 32);
     final int size = (int) offsetAndSize;
-    MemoryBlock mb = MemoryBlock.allocateFromObject(baseObject, baseOffset + offset, size);
-    return new UTF8String(mb);
+    return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 469b0e6..a76e6ef 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -37,7 +37,6 @@ import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.bitset.BitSetMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
-import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
@@ -417,8 +416,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
     final long offsetAndSize = getLong(ordinal);
     final int offset = (int) (offsetAndSize >> 32);
     final int size = (int) offsetAndSize;
-    MemoryBlock mb = MemoryBlock.allocateFromObject(baseObject, baseOffset + offset, size);
-    return new UTF8String(mb);
+    return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
index 8e9c0a2..eb5051b 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.catalyst.expressions;
 
-import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.types.UTF8String;
 
 // scalastyle: off
@@ -72,13 +72,13 @@ public final class XXH64 {
     return fmix(hash);
   }
 
-  public long hashUnsafeWordsBlock(MemoryBlock mb) {
-    return hashUnsafeWordsBlock(mb, seed);
+  public long hashUnsafeWords(Object base, long offset, int length) {
+    return hashUnsafeWords(base, offset, length, seed);
   }
 
-  public static long hashUnsafeWordsBlock(MemoryBlock mb, long seed) {
-    assert (mb.size() % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)";
-    long hash = hashBytesByWordsBlock(mb, seed);
+  public static long hashUnsafeWords(Object base, long offset, int length, long seed) {
+    assert (length % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)";
+    long hash = hashBytesByWords(base, offset, length, seed);
     return fmix(hash);
   }
 
@@ -86,22 +86,20 @@ public final class XXH64 {
     return hashUnsafeBytes(base, offset, length, seed);
   }
 
-  public static long hashUnsafeBytesBlock(MemoryBlock mb, long seed) {
-    long offset = 0;
-    long length = mb.size();
+  public static long hashUnsafeBytes(Object base, long offset, int length, long seed) {
     assert (length >= 0) : "lengthInBytes cannot be negative";
-    long hash = hashBytesByWordsBlock(mb, seed);
+    long hash = hashBytesByWords(base, offset, length, seed);
     long end = offset + length;
     offset += length & -8;
 
     if (offset + 4L <= end) {
-      hash ^= (mb.getInt(offset) & 0xFFFFFFFFL) * PRIME64_1;
+      hash ^= (Platform.getInt(base, offset) & 0xFFFFFFFFL) * PRIME64_1;
       hash = Long.rotateLeft(hash, 23) * PRIME64_2 + PRIME64_3;
       offset += 4L;
     }
 
     while (offset < end) {
-      hash ^= (mb.getByte(offset) & 0xFFL) * PRIME64_5;
+      hash ^= (Platform.getByte(base, offset) & 0xFFL) * PRIME64_5;
       hash = Long.rotateLeft(hash, 11) * PRIME64_1;
       offset++;
     }
@@ -109,11 +107,7 @@ public final class XXH64 {
   }
 
   public static long hashUTF8String(UTF8String str, long seed) {
-    return hashUnsafeBytesBlock(str.getMemoryBlock(), seed);
-  }
-
-  public static long hashUnsafeBytes(Object base, long offset, int length, long seed) {
-    return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, length), seed);
+    return hashUnsafeBytes(str.getBaseObject(), str.getBaseOffset(), str.numBytes(), seed);
   }
 
   private static long fmix(long hash) {
@@ -125,31 +119,30 @@ public final class XXH64 {
     return hash;
   }
 
-  private static long hashBytesByWordsBlock(MemoryBlock mb, long seed) {
-    long offset = 0;
-    long length = mb.size();
+  private static long hashBytesByWords(Object base, long offset, int length, long seed) {
+    long end = offset + length;
     long hash;
     if (length >= 32) {
-      long limit = length - 32;
+      long limit = end - 32;
       long v1 = seed + PRIME64_1 + PRIME64_2;
       long v2 = seed + PRIME64_2;
       long v3 = seed;
       long v4 = seed - PRIME64_1;
 
       do {
-        v1 += mb.getLong(offset) * PRIME64_2;
+        v1 += Platform.getLong(base, offset) * PRIME64_2;
         v1 = Long.rotateLeft(v1, 31);
         v1 *= PRIME64_1;
 
-        v2 += mb.getLong(offset + 8) * PRIME64_2;
+        v2 += Platform.getLong(base, offset + 8) * PRIME64_2;
         v2 = Long.rotateLeft(v2, 31);
         v2 *= PRIME64_1;
 
-        v3 += mb.getLong(offset + 16) * PRIME64_2;
+        v3 += Platform.getLong(base, offset + 16) * PRIME64_2;
         v3 = Long.rotateLeft(v3, 31);
         v3 *= PRIME64_1;
 
-        v4 += mb.getLong(offset + 24) * PRIME64_2;
+        v4 += Platform.getLong(base, offset + 24) * PRIME64_2;
         v4 = Long.rotateLeft(v4, 31);
         v4 *= PRIME64_1;
 
@@ -190,9 +183,9 @@ public final class XXH64 {
 
     hash += length;
 
-    long limit = length - 8;
+    long limit = end - 8;
     while (offset <= limit) {
-      long k1 = mb.getLong(offset);
+      long k1 = Platform.getLong(base, offset);
       hash ^= Long.rotateLeft(k1 * PRIME64_2, 31) * PRIME64_1;
       hash = Long.rotateLeft(hash, 27) * PRIME64_1 + PRIME64_4;
       offset += 8L;

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java
index f8000d7..f0f66ba 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java
@@ -19,8 +19,6 @@ package org.apache.spark.sql.catalyst.expressions.codegen;
 
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
-import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
-import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
@@ -31,34 +29,43 @@ public class UTF8StringBuilder {
 
   private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
 
-  private ByteArrayMemoryBlock buffer;
-  private int length = 0;
+  private byte[] buffer;
+  private int cursor = Platform.BYTE_ARRAY_OFFSET;
 
   public UTF8StringBuilder() {
     // Since initial buffer size is 16 in `StringBuilder`, we set the same size here
-    this.buffer = new ByteArrayMemoryBlock(16);
+    this.buffer = new byte[16];
   }
 
   // Grows the buffer by at least `neededSize`
   private void grow(int neededSize) {
-    if (neededSize > ARRAY_MAX - length) {
+    if (neededSize > ARRAY_MAX - totalSize()) {
       throw new UnsupportedOperationException(
         "Cannot grow internal buffer by size " + neededSize + " because the size after growing " +
           "exceeds size limitation " + ARRAY_MAX);
     }
-    final int requestedSize = length + neededSize;
-    if (buffer.size() < requestedSize) {
-      int newLength = requestedSize < ARRAY_MAX / 2 ? requestedSize * 2 : ARRAY_MAX;
-      final ByteArrayMemoryBlock tmp = new ByteArrayMemoryBlock(newLength);
-      MemoryBlock.copyMemory(buffer, tmp, length);
+    final int length = totalSize() + neededSize;
+    if (buffer.length < length) {
+      int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
+      final byte[] tmp = new byte[newLength];
+      Platform.copyMemory(
+        buffer,
+        Platform.BYTE_ARRAY_OFFSET,
+        tmp,
+        Platform.BYTE_ARRAY_OFFSET,
+        totalSize());
       buffer = tmp;
     }
   }
 
+  private int totalSize() {
+    return cursor - Platform.BYTE_ARRAY_OFFSET;
+  }
+
   public void append(UTF8String value) {
     grow(value.numBytes());
-    value.writeToMemory(buffer.getByteArray(), length + Platform.BYTE_ARRAY_OFFSET);
-    length += value.numBytes();
+    value.writeToMemory(buffer, cursor);
+    cursor += value.numBytes();
   }
 
   public void append(String value) {
@@ -66,6 +73,6 @@ public class UTF8StringBuilder {
   }
 
   public UTF8String build() {
-    return UTF8String.fromBytes(buffer.getByteArray(), 0, length);
+    return UTF8String.fromBytes(buffer, 0, totalSize());
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index a754e87..742a4f8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.hash.Murmur3_x86_32
-import org.apache.spark.unsafe.memory.MemoryBlock
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
 ////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -362,7 +361,10 @@ abstract class HashExpression[E] extends Expression {
   }
 
   protected def genHashString(input: String, result: String): String = {
-    s"$result = $hasherClassName.hashUTF8String($input, $result);"
+    val baseObject = s"$input.getBaseObject()"
+    val baseOffset = s"$input.getBaseOffset()"
+    val numBytes = s"$input.numBytes()"
+    s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes, $result);"
   }
 
   protected def genHashForMap(
@@ -469,8 +471,6 @@ abstract class InterpretedHashFunction {
 
   protected def hashUnsafeBytes(base: AnyRef, offset: Long, length: Int, seed: Long): Long
 
-  protected def hashUnsafeBytesBlock(base: MemoryBlock, seed: Long): Long
-
   /**
    * Computes hash of a given `value` of type `dataType`. The caller needs to check the validity
    * of input `value`.
@@ -496,7 +496,8 @@ abstract class InterpretedHashFunction {
       case c: CalendarInterval => hashInt(c.months, hashLong(c.microseconds, seed))
       case a: Array[Byte] =>
         hashUnsafeBytes(a, Platform.BYTE_ARRAY_OFFSET, a.length, seed)
-      case s: UTF8String => hashUnsafeBytesBlock(s.getMemoryBlock(), seed)
+      case s: UTF8String =>
+        hashUnsafeBytes(s.getBaseObject, s.getBaseOffset, s.numBytes(), seed)
 
       case array: ArrayData =>
         val elementType = dataType match {
@@ -583,15 +584,9 @@ object Murmur3HashFunction extends InterpretedHashFunction {
     Murmur3_x86_32.hashLong(l, seed.toInt)
   }
 
-  override protected def hashUnsafeBytes(
-      base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
+  override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
     Murmur3_x86_32.hashUnsafeBytes(base, offset, len, seed.toInt)
   }
-
-  override protected def hashUnsafeBytesBlock(
-      base: MemoryBlock, seed: Long): Long = {
-    Murmur3_x86_32.hashUnsafeBytesBlock(base, seed.toInt)
-  }
 }
 
 /**
@@ -616,14 +611,9 @@ object XxHash64Function extends InterpretedHashFunction {
 
   override protected def hashLong(l: Long, seed: Long): Long = XXH64.hashLong(l, seed)
 
-  override protected def hashUnsafeBytes(
-      base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
+  override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
     XXH64.hashUnsafeBytes(base, offset, len, seed)
   }
-
-  override protected def hashUnsafeBytesBlock(base: MemoryBlock, seed: Long): Long = {
-    XXH64.hashUnsafeBytesBlock(base, seed)
-  }
 }
 
 /**
@@ -730,7 +720,10 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] {
      """
 
   override protected def genHashString(input: String, result: String): String = {
-    s"$result = $hasherClassName.hashUTF8String($input);"
+    val baseObject = s"$input.getBaseObject()"
+    val baseOffset = s"$input.getBaseOffset()"
+    val numBytes = s"$input.numBytes()"
+    s"$result = $hasherClassName.hashUnsafeBytes($baseObject, $baseOffset, $numBytes);"
   }
 
   override protected def genHashForArray(
@@ -824,14 +817,10 @@ object HiveHashFunction extends InterpretedHashFunction {
     HiveHasher.hashLong(l)
   }
 
-  override protected def hashUnsafeBytes(
-      base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
+  override protected def hashUnsafeBytes(base: AnyRef, offset: Long, len: Int, seed: Long): Long = {
     HiveHasher.hashUnsafeBytes(base, offset, len)
   }
 
-  override protected def hashUnsafeBytesBlock(
-    base: MemoryBlock, seed: Long): Long = HiveHasher.hashUnsafeBytesBlock(base)
-
   private val HIVE_DECIMAL_MAX_PRECISION = 38
   private val HIVE_DECIMAL_MAX_SCALE = 38
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
index 76930f9..b67c6f3 100644
--- a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java
@@ -17,8 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions;
 
-import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
-import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.types.UTF8String;
 import org.junit.Assert;
 import org.junit.Test;
@@ -54,7 +53,7 @@ public class HiveHasherSuite {
 
     for (int i = 0; i < inputs.length; i++) {
       UTF8String s = UTF8String.fromString("val_" + inputs[i]);
-      int hash = HiveHasher.hashUnsafeBytesBlock(s.getMemoryBlock());
+      int hash = HiveHasher.hashUnsafeBytes(s.getBaseObject(), s.getBaseOffset(), s.numBytes());
       Assert.assertEquals(expected[i], ((31 * inputs[i]) + hash));
     }
   }
@@ -90,13 +89,13 @@ public class HiveHasherSuite {
       int byteArrSize = rand.nextInt(100) * 8;
       byte[] bytes = new byte[byteArrSize];
       rand.nextBytes(bytes);
-      MemoryBlock mb = ByteArrayMemoryBlock.fromArray(bytes);
 
       Assert.assertEquals(
-          HiveHasher.hashUnsafeBytesBlock(mb),
-          HiveHasher.hashUnsafeBytesBlock(mb));
+          HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
+          HiveHasher.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
 
-      hashcodes.add(HiveHasher.hashUnsafeBytesBlock(mb));
+      hashcodes.add(HiveHasher.hashUnsafeBytes(
+          bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
     }
 
     // A very loose bound.
@@ -113,13 +112,13 @@ public class HiveHasherSuite {
       byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
       byte[] paddedBytes = new byte[byteArrSize];
       System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length);
-      MemoryBlock mb = ByteArrayMemoryBlock.fromArray(paddedBytes);
 
       Assert.assertEquals(
-          HiveHasher.hashUnsafeBytesBlock(mb),
-          HiveHasher.hashUnsafeBytesBlock(mb));
+          HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
+          HiveHasher.hashUnsafeBytes(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
 
-      hashcodes.add(HiveHasher.hashUnsafeBytesBlock(mb));
+      hashcodes.add(HiveHasher.hashUnsafeBytes(
+          paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
     }
 
     // A very loose bound.

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
index cd8bce6..1baee91 100644
--- a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/XXH64Suite.java
@@ -24,8 +24,6 @@ import java.util.Random;
 import java.util.Set;
 
 import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
-import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -144,13 +142,13 @@ public class XXH64Suite {
       int byteArrSize = rand.nextInt(100) * 8;
       byte[] bytes = new byte[byteArrSize];
       rand.nextBytes(bytes);
-      MemoryBlock mb = ByteArrayMemoryBlock.fromArray(bytes);
 
       Assert.assertEquals(
-              hasher.hashUnsafeWordsBlock(mb),
-              hasher.hashUnsafeWordsBlock(mb));
+              hasher.hashUnsafeWords(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
+              hasher.hashUnsafeWords(bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
 
-      hashcodes.add(hasher.hashUnsafeWordsBlock(mb));
+      hashcodes.add(hasher.hashUnsafeWords(
+              bytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
     }
 
     // A very loose bound.
@@ -167,13 +165,13 @@ public class XXH64Suite {
       byte[] strBytes = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
       byte[] paddedBytes = new byte[byteArrSize];
       System.arraycopy(strBytes, 0, paddedBytes, 0, strBytes.length);
-      MemoryBlock mb = ByteArrayMemoryBlock.fromArray(paddedBytes);
 
       Assert.assertEquals(
-              hasher.hashUnsafeWordsBlock(mb),
-              hasher.hashUnsafeWordsBlock(mb));
+              hasher.hashUnsafeWords(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize),
+              hasher.hashUnsafeWords(paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
 
-      hashcodes.add(hasher.hashUnsafeWordsBlock(mb));
+      hashcodes.add(hasher.hashUnsafeWords(
+              paddedBytes, Platform.BYTE_ARRAY_OFFSET, byteArrSize));
     }
 
     // A very loose bound.

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilderSuite.scala
deleted file mode 100644
index 1b25a4b..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilderSuite.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.expressions.codegen
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.unsafe.types.UTF8String
-
-class UTF8StringBuilderSuite extends SparkFunSuite {
-
-  test("basic test") {
-    val sb = new UTF8StringBuilder()
-    assert(sb.build() === UTF8String.EMPTY_UTF8)
-
-    sb.append("")
-    assert(sb.build() === UTF8String.EMPTY_UTF8)
-
-    sb.append("abcd")
-    assert(sb.build() === UTF8String.fromString("abcd"))
-
-    sb.append(UTF8String.fromString("1234"))
-    assert(sb.build() === UTF8String.fromString("abcd1234"))
-
-    // expect to grow an internal buffer
-    sb.append(UTF8String.fromString("efgijk567890"))
-    assert(sb.build() === UTF8String.fromString("abcd1234efgijk567890"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 6fdadde..5e0cf7d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.memory.OffHeapMemoryBlock;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
@@ -207,7 +206,7 @@ public final class OffHeapColumnVector extends WritableColumnVector {
 
   @Override
   protected UTF8String getBytesAsUTF8String(int rowId, int count) {
-    return new UTF8String(new OffHeapMemoryBlock(data + rowId, count));
+    return UTF8String.fromAddress(null, data + rowId, count);
   }
 
   //

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
index 1c9beda..5f58b03 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
@@ -25,7 +25,6 @@ import org.apache.arrow.vector.holders.NullableVarCharHolder;
 import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.execution.arrow.ArrowUtils;
 import org.apache.spark.sql.types.*;
-import org.apache.spark.unsafe.memory.OffHeapMemoryBlock;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
@@ -378,10 +377,9 @@ public final class ArrowColumnVector extends ColumnVector {
       if (stringResult.isSet == 0) {
         return null;
       } else {
-        return new UTF8String(new OffHeapMemoryBlock(
+        return UTF8String.fromAddress(null,
           stringResult.buffer.memoryAddress() + stringResult.start,
-          stringResult.end - stringResult.start
-        ));
+          stringResult.end - stringResult.start);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
index 470b93e..50ae26a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.benchmark
 import java.util.{Arrays, Comparator}
 
 import org.apache.spark.unsafe.array.LongArray
-import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
+import org.apache.spark.unsafe.memory.MemoryBlock
 import org.apache.spark.util.Benchmark
 import org.apache.spark.util.collection.Sorter
 import org.apache.spark.util.collection.unsafe.sort._
@@ -36,7 +36,7 @@ import org.apache.spark.util.random.XORShiftRandom
 class SortBenchmark extends BenchmarkBase {
 
   private def referenceKeyPrefixSort(buf: LongArray, lo: Int, hi: Int, refCmp: PrefixComparator) {
-    val sortBuffer = new LongArray(new OnHeapMemoryBlock(buf.size() * 8L))
+    val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt)))
     new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort(
       buf, lo, hi, new Comparator[RecordPointerAndKeyPrefix] {
         override def compare(
@@ -50,8 +50,8 @@ class SortBenchmark extends BenchmarkBase {
   private def generateKeyPrefixTestData(size: Int, rand: => Long): (LongArray, LongArray) = {
     val ref = Array.tabulate[Long](size * 2) { i => rand }
     val extended = ref ++ Array.fill[Long](size * 2)(0)
-    (new LongArray(OnHeapMemoryBlock.fromArray(ref)),
-      new LongArray(OnHeapMemoryBlock.fromArray(extended)))
+    (new LongArray(MemoryBlock.fromLongArray(ref)),
+      new LongArray(MemoryBlock.fromLongArray(extended)))
   }
 
   ignore("sort") {
@@ -60,7 +60,7 @@ class SortBenchmark extends BenchmarkBase {
     val benchmark = new Benchmark("radix sort " + size, size)
     benchmark.addTimerCase("reference TimSort key prefix array") { timer =>
       val array = Array.tabulate[Long](size * 2) { i => rand.nextLong }
-      val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
+      val buf = new LongArray(MemoryBlock.fromLongArray(array))
       timer.startTiming()
       referenceKeyPrefixSort(buf, 0, size, PrefixComparators.BINARY)
       timer.stopTiming()
@@ -78,7 +78,7 @@ class SortBenchmark extends BenchmarkBase {
         array(i) = rand.nextLong & 0xff
         i += 1
       }
-      val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
+      val buf = new LongArray(MemoryBlock.fromLongArray(array))
       timer.startTiming()
       RadixSort.sort(buf, size, 0, 7, false, false)
       timer.stopTiming()
@@ -90,7 +90,7 @@ class SortBenchmark extends BenchmarkBase {
         array(i) = rand.nextLong & 0xffff
         i += 1
       }
-      val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
+      val buf = new LongArray(MemoryBlock.fromLongArray(array))
       timer.startTiming()
       RadixSort.sort(buf, size, 0, 7, false, false)
       timer.stopTiming()
@@ -102,7 +102,7 @@ class SortBenchmark extends BenchmarkBase {
         array(i) = rand.nextLong
         i += 1
       }
-      val buf = new LongArray(OnHeapMemoryBlock.fromArray(array))
+      val buf = new LongArray(MemoryBlock.fromLongArray(array))
       timer.startTiming()
       RadixSort.sort(buf, size, 0, 7, false, false)
       timer.stopTiming()

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
index 25ee95d..ffda33c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/RowQueueSuite.scala
@@ -22,13 +22,13 @@ import java.io.File
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.memory.{MemoryManager, TaskMemoryManager, TestMemoryManager}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.unsafe.memory.OnHeapMemoryBlock
+import org.apache.spark.unsafe.memory.MemoryBlock
 import org.apache.spark.util.Utils
 
 class RowQueueSuite extends SparkFunSuite {
 
   test("in-memory queue") {
-    val page = new OnHeapMemoryBlock((1<<10) * 8L)
+    val page = MemoryBlock.fromLongArray(new Array[Long](1<<10))
     val queue = new InMemoryRowQueue(page, 1) {
       override def close() {}
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: Revert [SPARK-10399] [SPARK-23879] [SPARK-23762] [SPARK-25317]

Posted by we...@apache.org.
Revert [SPARK-10399] [SPARK-23879] [SPARK-23762] [SPARK-25317]

## What changes were proposed in this pull request?

When running TPC-DS benchmarks on 2.4 release, npoggi and winglungngai  saw more than 10% performance regression on the following queries: q67, q24a and q24b. After we applying the PR https://github.com/apache/spark/pull/22338, the performance regression still exists. If we revert the changes in https://github.com/apache/spark/pull/19222, npoggi and winglungngai  found the performance regression was resolved. Thus, this PR is to revert the related changes for unblocking the 2.4 release.

In the future release, we still can continue the investigation and find out the root cause of the regression.

## How was this patch tested?

The existing test cases

Closes #22361 from gatorsmile/revertMemoryBlock.

Authored-by: gatorsmile <ga...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b9ccd55
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b9ccd55
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b9ccd55

Branch: refs/heads/master
Commit: 0b9ccd55c2986957863dcad3b44ce80403eecfa1
Parents: 1cfda44
Author: gatorsmile <ga...@gmail.com>
Authored: Sun Sep 9 21:25:19 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sun Sep 9 21:25:19 2018 +0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/HiveHasher.java    |  18 +-
 .../java/org/apache/spark/unsafe/Platform.java  |   2 +-
 .../spark/unsafe/array/ByteArrayMethods.java    |  15 +-
 .../apache/spark/unsafe/array/LongArray.java    |  17 +-
 .../spark/unsafe/hash/Murmur3_x86_32.java       |  53 ++----
 .../unsafe/memory/ByteArrayMemoryBlock.java     | 128 -------------
 .../unsafe/memory/HeapMemoryAllocator.java      |  21 +--
 .../spark/unsafe/memory/MemoryAllocator.java    |   4 +-
 .../apache/spark/unsafe/memory/MemoryBlock.java | 157 ++--------------
 .../spark/unsafe/memory/MemoryLocation.java     |  54 ++++++
 .../spark/unsafe/memory/OffHeapMemoryBlock.java | 105 -----------
 .../spark/unsafe/memory/OnHeapMemoryBlock.java  | 132 --------------
 .../unsafe/memory/UnsafeMemoryAllocator.java    |  21 +--
 .../apache/spark/unsafe/types/UTF8String.java   | 147 +++++++--------
 .../apache/spark/unsafe/PlatformUtilSuite.java  |   4 +-
 .../spark/unsafe/array/LongArraySuite.java      |   5 +-
 .../spark/unsafe/hash/Murmur3_x86_32Suite.java  |  18 --
 .../spark/unsafe/memory/MemoryBlockSuite.java   | 179 -------------------
 .../spark/unsafe/types/UTF8StringSuite.java     |  41 +++--
 .../apache/spark/memory/TaskMemoryManager.java  |  22 +--
 .../shuffle/sort/ShuffleInMemorySorter.java     |  14 +-
 .../shuffle/sort/ShuffleSortDataFormat.java     |  11 +-
 .../unsafe/sort/UnsafeExternalSorter.java       |   2 +-
 .../unsafe/sort/UnsafeInMemorySorter.java       |  13 +-
 .../spark/memory/TaskMemoryManagerSuite.java    |   2 +-
 .../util/collection/ExternalSorterSuite.scala   |   7 +-
 .../collection/unsafe/sort/RadixSortSuite.scala |  10 +-
 .../apache/spark/ml/feature/FeatureHasher.scala |   5 +-
 .../apache/spark/mllib/feature/HashingTF.scala  |   2 +-
 .../catalyst/expressions/UnsafeArrayData.java   |   4 +-
 .../sql/catalyst/expressions/UnsafeRow.java     |   4 +-
 .../spark/sql/catalyst/expressions/XXH64.java   |  47 +++--
 .../expressions/codegen/UTF8StringBuilder.java  |  35 ++--
 .../spark/sql/catalyst/expressions/hash.scala   |  37 ++--
 .../catalyst/expressions/HiveHasherSuite.java   |  21 ++-
 .../sql/catalyst/expressions/XXH64Suite.java    |  18 +-
 .../codegen/UTF8StringBuilderSuite.scala        |  42 -----
 .../vectorized/OffHeapColumnVector.java         |   3 +-
 .../spark/sql/vectorized/ArrowColumnVector.java |   6 +-
 .../sql/execution/benchmark/SortBenchmark.scala |  16 +-
 .../sql/execution/python/RowQueueSuite.scala    |   4 +-
 41 files changed, 376 insertions(+), 1070 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
index 62b75ae..7357743 100644
--- a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
+++ b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
@@ -17,8 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions;
 
-import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.unsafe.Platform;
 
 /**
  * Simulates Hive's hashing function from Hive v1.2.1
@@ -39,21 +38,12 @@ public class HiveHasher {
     return (int) ((input >>> 32) ^ input);
   }
 
-  public static int hashUnsafeBytesBlock(MemoryBlock mb) {
-    long lengthInBytes = mb.size();
+  public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
     assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
     int result = 0;
-    for (long i = 0; i < lengthInBytes; i++) {
-      result = (result * 31) + (int) mb.getByte(i);
+    for (int i = 0; i < lengthInBytes; i++) {
+      result = (result * 31) + (int) Platform.getByte(base, offset + i);
     }
     return result;
   }
-
-  public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
-    return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes));
-  }
-
-  public static int hashUTF8String(UTF8String str) {
-    return hashUnsafeBytesBlock(str.getMemoryBlock());
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 54dcadf..aca6fca 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -187,7 +187,7 @@ public final class Platform {
   }
 
   public static void copyMemory(
-      Object src, long srcOffset, Object dst, long dstOffset, long length) {
+    Object src, long srcOffset, Object dst, long dstOffset, long length) {
     // Check if dstOffset is before or after srcOffset to determine if we should copy
     // forward or backwards. This is necessary in case src and dst overlap.
     if (dstOffset < srcOffset) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
index ef0f78d..cec8c30 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
@@ -18,7 +18,6 @@
 package org.apache.spark.unsafe.array;
 
 import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.memory.MemoryBlock;
 
 public class ByteArrayMethods {
 
@@ -54,24 +53,14 @@ public class ByteArrayMethods {
 
   private static final boolean unaligned = Platform.unaligned();
   /**
-   * MemoryBlock equality check for MemoryBlocks.
-   * @return true if the arrays are equal, false otherwise
-   */
-  public static boolean arrayEqualsBlock(
-      MemoryBlock leftBase, long leftOffset, MemoryBlock rightBase, long rightOffset, long length) {
-    return arrayEquals(leftBase.getBaseObject(), leftBase.getBaseOffset() + leftOffset,
-      rightBase.getBaseObject(), rightBase.getBaseOffset() + rightOffset, length);
-  }
-
-  /**
    * Optimized byte array equality check for byte arrays.
    * @return true if the arrays are equal, false otherwise
    */
   public static boolean arrayEquals(
-      Object leftBase, long leftOffset, Object rightBase, long rightOffset, long length) {
+      Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
     int i = 0;
 
-    // check if starts align and we can get both offsets to be aligned
+    // check if stars align and we can get both offsets to be aligned
     if ((leftOffset % 8) == (rightOffset % 8)) {
       while ((leftOffset + i) % 8 != 0 && i < length) {
         if (Platform.getByte(leftBase, leftOffset + i) !=

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
index b74d2de..2cd39bd 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.unsafe.array;
 
+import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 
 /**
@@ -32,12 +33,16 @@ public final class LongArray {
   private static final long WIDTH = 8;
 
   private final MemoryBlock memory;
+  private final Object baseObj;
+  private final long baseOffset;
 
   private final long length;
 
   public LongArray(MemoryBlock memory) {
     assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
     this.memory = memory;
+    this.baseObj = memory.getBaseObject();
+    this.baseOffset = memory.getBaseOffset();
     this.length = memory.size() / WIDTH;
   }
 
@@ -46,11 +51,11 @@ public final class LongArray {
   }
 
   public Object getBaseObject() {
-    return memory.getBaseObject();
+    return baseObj;
   }
 
   public long getBaseOffset() {
-    return memory.getBaseOffset();
+    return baseOffset;
   }
 
   /**
@@ -64,8 +69,8 @@ public final class LongArray {
    * Fill this all with 0L.
    */
   public void zeroOut() {
-    for (long off = 0; off < length * WIDTH; off += WIDTH) {
-      memory.putLong(off, 0);
+    for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) {
+      Platform.putLong(baseObj, off, 0);
     }
   }
 
@@ -75,7 +80,7 @@ public final class LongArray {
   public void set(int index, long value) {
     assert index >= 0 : "index (" + index + ") should >= 0";
     assert index < length : "index (" + index + ") should < length (" + length + ")";
-    memory.putLong(index * WIDTH, value);
+    Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
   }
 
   /**
@@ -84,6 +89,6 @@ public final class LongArray {
   public long get(int index) {
     assert index >= 0 : "index (" + index + ") should >= 0";
     assert index < length : "index (" + index + ") should < length (" + length + ")";
-    return memory.getLong(index * WIDTH);
+    return Platform.getLong(baseObj, baseOffset + index * WIDTH);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
index 566f116..d239de6 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
@@ -17,11 +17,7 @@
 
 package org.apache.spark.unsafe.hash;
 
-import com.google.common.primitives.Ints;
-
 import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * 32-bit Murmur3 hasher.  This is based on Guava's Murmur3_32HashFunction.
@@ -53,74 +49,49 @@ public final class Murmur3_x86_32 {
   }
 
   public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
-    return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
+    return hashUnsafeWords(base, offset, lengthInBytes, seed);
   }
 
-  public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
+  public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
     // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
-    int lengthInBytes = Ints.checkedCast(base.size());
     assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)";
-    int h1 = hashBytesByIntBlock(base, lengthInBytes, seed);
+    int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
     return fmix(h1, lengthInBytes);
   }
 
-  public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
-    // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
-    return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
-  }
-
-  public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) {
-    return hashUnsafeBytesBlock(base, Ints.checkedCast(base.size()), seed);
-  }
-
-  private static int hashUnsafeBytesBlock(MemoryBlock base, int lengthInBytes, int seed) {
+  public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
     // This is not compatible with original and another implementations.
     // But remain it for backward compatibility for the components existing before 2.3.
     assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
     int lengthAligned = lengthInBytes - lengthInBytes % 4;
-    int h1 = hashBytesByIntBlock(base, lengthAligned, seed);
-    long offset = base.getBaseOffset();
-    Object o = base.getBaseObject();
+    int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
     for (int i = lengthAligned; i < lengthInBytes; i++) {
-      int halfWord = Platform.getByte(o, offset + i);
+      int halfWord = Platform.getByte(base, offset + i);
       int k1 = mixK1(halfWord);
       h1 = mixH1(h1, k1);
     }
     return fmix(h1, lengthInBytes);
   }
 
-  public static int hashUTF8String(UTF8String str, int seed) {
-    return hashUnsafeBytesBlock(str.getMemoryBlock(), str.numBytes(), seed);
-  }
-
-  public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
-    return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
-  }
-
   public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
-    return hashUnsafeBytes2Block(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
-  }
-
-  public static int hashUnsafeBytes2Block(MemoryBlock base, int seed) {
-    // This is compatible with original and other implementations.
+    // This is compatible with original and another implementations.
     // Use this method for new components after Spark 2.3.
-    int lengthInBytes = Ints.checkedCast(base.size());
-    assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative";
+    assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
     int lengthAligned = lengthInBytes - lengthInBytes % 4;
-    int h1 = hashBytesByIntBlock(base, lengthAligned, seed);
+    int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
     int k1 = 0;
     for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
-      k1 ^= (base.getByte(i) & 0xFF) << shift;
+      k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
     }
     h1 ^= mixK1(k1);
     return fmix(h1, lengthInBytes);
   }
 
-  private static int hashBytesByIntBlock(MemoryBlock base, int lengthInBytes, int seed) {
+  private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
     assert (lengthInBytes % 4 == 0);
     int h1 = seed;
     for (int i = 0; i < lengthInBytes; i += 4) {
-      int halfWord = base.getInt(i);
+      int halfWord = Platform.getInt(base, offset + i);
       int k1 = mixK1(halfWord);
       h1 = mixH1(h1, k1);
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
deleted file mode 100644
index 9f23863..0000000
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.memory;
-
-import com.google.common.primitives.Ints;
-
-import org.apache.spark.unsafe.Platform;
-
-/**
- * A consecutive block of memory with a byte array on Java heap.
- */
-public final class ByteArrayMemoryBlock extends MemoryBlock {
-
-  private final byte[] array;
-
-  public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
-    super(obj, offset, size);
-    this.array = obj;
-    assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
-      "The sum of size " + size + " and offset " + offset + " should not be larger than " +
-        "the size of the given memory space " + (obj.length + Platform.BYTE_ARRAY_OFFSET);
-  }
-
-  public ByteArrayMemoryBlock(long length) {
-    this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, length);
-  }
-
-  @Override
-  public MemoryBlock subBlock(long offset, long size) {
-    checkSubBlockRange(offset, size);
-    if (offset == 0 && size == this.size()) return this;
-    return new ByteArrayMemoryBlock(array, this.offset + offset, size);
-  }
-
-  public byte[] getByteArray() { return array; }
-
-  /**
-   * Creates a memory block pointing to the memory used by the byte array.
-   */
-  public static ByteArrayMemoryBlock fromArray(final byte[] array) {
-    return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length);
-  }
-
-  @Override
-  public int getInt(long offset) {
-    return Platform.getInt(array, this.offset + offset);
-  }
-
-  @Override
-  public void putInt(long offset, int value) {
-    Platform.putInt(array, this.offset + offset, value);
-  }
-
-  @Override
-  public boolean getBoolean(long offset) {
-    return Platform.getBoolean(array, this.offset + offset);
-  }
-
-  @Override
-  public void putBoolean(long offset, boolean value) {
-    Platform.putBoolean(array, this.offset + offset, value);
-  }
-
-  @Override
-  public byte getByte(long offset) {
-    return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)];
-  }
-
-  @Override
-  public void putByte(long offset, byte value) {
-    array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)] = value;
-  }
-
-  @Override
-  public short getShort(long offset) {
-    return Platform.getShort(array, this.offset + offset);
-  }
-
-  @Override
-  public void putShort(long offset, short value) {
-    Platform.putShort(array, this.offset + offset, value);
-  }
-
-  @Override
-  public long getLong(long offset) {
-    return Platform.getLong(array, this.offset + offset);
-  }
-
-  @Override
-  public void putLong(long offset, long value) {
-    Platform.putLong(array, this.offset + offset, value);
-  }
-
-  @Override
-  public float getFloat(long offset) {
-    return Platform.getFloat(array, this.offset + offset);
-  }
-
-  @Override
-  public void putFloat(long offset, float value) {
-    Platform.putFloat(array, this.offset + offset, value);
-  }
-
-  @Override
-  public double getDouble(long offset) {
-    return Platform.getDouble(array, this.offset + offset);
-  }
-
-  @Override
-  public void putDouble(long offset, double value) {
-    Platform.putDouble(array, this.offset + offset, value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
index 36caf80..2733760 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 
+import org.apache.spark.unsafe.Platform;
+
 /**
  * A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array.
  */
@@ -56,7 +58,7 @@ public class HeapMemoryAllocator implements MemoryAllocator {
             final long[] array = arrayReference.get();
             if (array != null) {
               assert (array.length * 8L >= size);
-              MemoryBlock memory = OnHeapMemoryBlock.fromArray(array, size);
+              MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
               if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
                 memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
               }
@@ -68,7 +70,7 @@ public class HeapMemoryAllocator implements MemoryAllocator {
       }
     }
     long[] array = new long[numWords];
-    MemoryBlock memory = OnHeapMemoryBlock.fromArray(array, size);
+    MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
     if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
       memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
     }
@@ -77,13 +79,12 @@ public class HeapMemoryAllocator implements MemoryAllocator {
 
   @Override
   public void free(MemoryBlock memory) {
-    assert(memory instanceof OnHeapMemoryBlock);
-    assert (memory.getBaseObject() != null) :
+    assert (memory.obj != null) :
       "baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?";
-    assert (memory.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+    assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
       "page has already been freed";
-    assert ((memory.getPageNumber() == MemoryBlock.NO_PAGE_NUMBER)
-            || (memory.getPageNumber() == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+    assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
+            || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
       "TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator " +
         "free()";
 
@@ -93,12 +94,12 @@ public class HeapMemoryAllocator implements MemoryAllocator {
     }
 
     // Mark the page as freed (so we can detect double-frees).
-    memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER);
+    memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
 
     // As an additional layer of defense against use-after-free bugs, we mutate the
     // MemoryBlock to null out its reference to the long[] array.
-    long[] array = ((OnHeapMemoryBlock)memory).getLongArray();
-    memory.resetObjAndOffset();
+    long[] array = (long[]) memory.obj;
+    memory.setObjAndOffset(null, 0);
 
     long alignedSize = ((size + 7) / 8) * 8;
     if (shouldPool(alignedSize)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
index 38315fb..7b58868 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
@@ -38,7 +38,7 @@ public interface MemoryAllocator {
 
   void free(MemoryBlock memory);
 
-  UnsafeMemoryAllocator UNSAFE = new UnsafeMemoryAllocator();
+  MemoryAllocator UNSAFE = new UnsafeMemoryAllocator();
 
-  HeapMemoryAllocator HEAP = new HeapMemoryAllocator();
+  MemoryAllocator HEAP = new HeapMemoryAllocator();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
index ca7213b..c333857 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
@@ -22,10 +22,10 @@ import javax.annotation.Nullable;
 import org.apache.spark.unsafe.Platform;
 
 /**
- * A representation of a consecutive memory block in Spark. It defines the common interfaces
- * for memory accessing and mutating.
+ * A consecutive block of memory, starting at a {@link MemoryLocation} with a fixed size.
  */
-public abstract class MemoryBlock {
+public class MemoryBlock extends MemoryLocation {
+
   /** Special `pageNumber` value for pages which were not allocated by TaskMemoryManagers */
   public static final int NO_PAGE_NUMBER = -1;
 
@@ -45,163 +45,38 @@ public abstract class MemoryBlock {
    */
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  @Nullable
-  protected Object obj;
-
-  protected long offset;
-
-  protected long length;
+  private final long length;
 
   /**
    * Optional page number; used when this MemoryBlock represents a page allocated by a
-   * TaskMemoryManager. This field can be updated using setPageNumber method so that
-   * this can be modified by the TaskMemoryManager, which lives in a different package.
+   * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager,
+   * which lives in a different package.
    */
-  private int pageNumber = NO_PAGE_NUMBER;
+  public int pageNumber = NO_PAGE_NUMBER;
 
-  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
-    if (offset < 0 || length < 0) {
-      throw new IllegalArgumentException(
-        "Length " + length + " and offset " + offset + "must be non-negative");
-    }
-    this.obj = obj;
-    this.offset = offset;
+  public MemoryBlock(@Nullable Object obj, long offset, long length) {
+    super(obj, offset);
     this.length = length;
   }
 
-  protected MemoryBlock() {
-    this(null, 0, 0);
-  }
-
-  public final Object getBaseObject() {
-    return obj;
-  }
-
-  public final long getBaseOffset() {
-    return offset;
-  }
-
-  public void resetObjAndOffset() {
-    this.obj = null;
-    this.offset = 0;
-  }
-
   /**
    * Returns the size of the memory block.
    */
-  public final long size() {
+  public long size() {
     return length;
   }
 
-  public final void setPageNumber(int pageNum) {
-    pageNumber = pageNum;
-  }
-
-  public final int getPageNumber() {
-    return pageNumber;
-  }
-
-  /**
-   * Fills the memory block with the specified byte value.
-   */
-  public final void fill(byte value) {
-    Platform.setMemory(obj, offset, length, value);
-  }
-
-  /**
-   * Instantiate MemoryBlock for given object type with new offset
-   */
-  public static final MemoryBlock allocateFromObject(Object obj, long offset, long length) {
-    MemoryBlock mb = null;
-    if (obj instanceof byte[]) {
-      byte[] array = (byte[])obj;
-      mb = new ByteArrayMemoryBlock(array, offset, length);
-    } else if (obj instanceof long[]) {
-      long[] array = (long[])obj;
-      mb = new OnHeapMemoryBlock(array, offset, length);
-    } else if (obj == null) {
-      // we assume that to pass null pointer means off-heap
-      mb = new OffHeapMemoryBlock(offset, length);
-    } else {
-      throw new UnsupportedOperationException(
-        "Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now");
-    }
-    return mb;
-  }
-
   /**
-   * Just instantiate the sub-block with the same type of MemoryBlock with the new size and relative
-   * offset from the original offset. The data is not copied.
-   * If parameters are invalid, an exception is thrown.
+   * Creates a memory block pointing to the memory used by the long array.
    */
-  public abstract MemoryBlock subBlock(long offset, long size);
-
-  protected void checkSubBlockRange(long offset, long size) {
-    if (offset < 0 || size < 0) {
-      throw new ArrayIndexOutOfBoundsException(
-        "Size " + size + " and offset " + offset + " must be non-negative");
-    }
-    if (offset + size > length) {
-      throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " +
-        offset + " should not be larger than the length " + length + " in the MemoryBlock");
-    }
+  public static MemoryBlock fromLongArray(final long[] array) {
+    return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
   }
 
   /**
-   * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g  cause illegal
-   * memory access, throw an exception, or etc.
-   * getXXX/putXXX uses an index based on this.offset that includes the size of metadata such as
-   * JVM object header. The offset is 0-based and is expected as an logical offset in the memory
-   * block.
+   * Fills the memory block with the specified byte value.
    */
-  public abstract int getInt(long offset);
-
-  public abstract void putInt(long offset, int value);
-
-  public abstract boolean getBoolean(long offset);
-
-  public abstract void putBoolean(long offset, boolean value);
-
-  public abstract byte getByte(long offset);
-
-  public abstract void putByte(long offset, byte value);
-
-  public abstract short getShort(long offset);
-
-  public abstract void putShort(long offset, short value);
-
-  public abstract long getLong(long offset);
-
-  public abstract void putLong(long offset, long value);
-
-  public abstract float getFloat(long offset);
-
-  public abstract void putFloat(long offset, float value);
-
-  public abstract double getDouble(long offset);
-
-  public abstract void putDouble(long offset, double value);
-
-  public static final void copyMemory(
-      MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset, long length) {
-    assert(srcOffset + length <= src.length && dstOffset + length <= dst.length);
-    Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() + srcOffset,
-      dst.getBaseObject(), dst.getBaseOffset() + dstOffset, length);
-  }
-
-  public static final void copyMemory(MemoryBlock src, MemoryBlock dst, long length) {
-    assert(length <= src.length && length <= dst.length);
-    Platform.copyMemory(src.getBaseObject(), src.getBaseOffset(),
-      dst.getBaseObject(), dst.getBaseOffset(), length);
-  }
-
-  public final void copyFrom(Object src, long srcOffset, long dstOffset, long length) {
-    assert(length <= this.length - srcOffset);
-    Platform.copyMemory(src, srcOffset, obj, offset + dstOffset, length);
-  }
-
-  public final void writeTo(long srcOffset, Object dst, long dstOffset, long length) {
-    assert(length <= this.length - srcOffset);
-    Platform.copyMemory(obj, offset + srcOffset, dst, dstOffset, length);
+  public void fill(byte value) {
+    Platform.setMemory(obj, offset, length, value);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java
new file mode 100644
index 0000000..74ebc87
--- /dev/null
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import javax.annotation.Nullable;
+
+/**
+ * A memory location. Tracked either by a memory address (with off-heap allocation),
+ * or by an offset from a JVM object (in-heap allocation).
+ */
+public class MemoryLocation {
+
+  @Nullable
+  Object obj;
+
+  long offset;
+
+  public MemoryLocation(@Nullable Object obj, long offset) {
+    this.obj = obj;
+    this.offset = offset;
+  }
+
+  public MemoryLocation() {
+    this(null, 0);
+  }
+
+  public void setObjAndOffset(Object newObj, long newOffset) {
+    this.obj = newObj;
+    this.offset = newOffset;
+  }
+
+  public final Object getBaseObject() {
+    return obj;
+  }
+
+  public final long getBaseOffset() {
+    return offset;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java
deleted file mode 100644
index 3431b08..0000000
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.memory;
-
-import org.apache.spark.unsafe.Platform;
-
-public class OffHeapMemoryBlock extends MemoryBlock {
-  public static final OffHeapMemoryBlock NULL = new OffHeapMemoryBlock(0, 0);
-
-  public OffHeapMemoryBlock(long address, long size) {
-    super(null, address, size);
-  }
-
-  @Override
-  public MemoryBlock subBlock(long offset, long size) {
-    checkSubBlockRange(offset, size);
-    if (offset == 0 && size == this.size()) return this;
-    return new OffHeapMemoryBlock(this.offset + offset, size);
-  }
-
-  @Override
-  public final int getInt(long offset) {
-    return Platform.getInt(null, this.offset + offset);
-  }
-
-  @Override
-  public final void putInt(long offset, int value) {
-    Platform.putInt(null, this.offset + offset, value);
-  }
-
-  @Override
-  public final boolean getBoolean(long offset) {
-    return Platform.getBoolean(null, this.offset + offset);
-  }
-
-  @Override
-  public final void putBoolean(long offset, boolean value) {
-    Platform.putBoolean(null, this.offset + offset, value);
-  }
-
-  @Override
-  public final byte getByte(long offset) {
-    return Platform.getByte(null, this.offset + offset);
-  }
-
-  @Override
-  public final void putByte(long offset, byte value) {
-    Platform.putByte(null, this.offset + offset, value);
-  }
-
-  @Override
-  public final short getShort(long offset) {
-    return Platform.getShort(null, this.offset + offset);
-  }
-
-  @Override
-  public final void putShort(long offset, short value) {
-    Platform.putShort(null, this.offset + offset, value);
-  }
-
-  @Override
-  public final long getLong(long offset) {
-    return Platform.getLong(null, this.offset + offset);
-  }
-
-  @Override
-  public final void putLong(long offset, long value) {
-    Platform.putLong(null, this.offset + offset, value);
-  }
-
-  @Override
-  public final float getFloat(long offset) {
-    return Platform.getFloat(null, this.offset + offset);
-  }
-
-  @Override
-  public final void putFloat(long offset, float value) {
-    Platform.putFloat(null, this.offset + offset, value);
-  }
-
-  @Override
-  public final double getDouble(long offset) {
-    return Platform.getDouble(null, this.offset + offset);
-  }
-
-  @Override
-  public final void putDouble(long offset, double value) {
-    Platform.putDouble(null, this.offset + offset, value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
deleted file mode 100644
index ee42bc2..0000000
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.memory;
-
-import com.google.common.primitives.Ints;
-
-import org.apache.spark.unsafe.Platform;
-
-/**
- * A consecutive block of memory with a long array on Java heap.
- */
-public final class OnHeapMemoryBlock extends MemoryBlock {
-
-  private final long[] array;
-
-  public OnHeapMemoryBlock(long[] obj, long offset, long size) {
-    super(obj, offset, size);
-    this.array = obj;
-    assert(offset + size <= obj.length * 8L + Platform.LONG_ARRAY_OFFSET) :
-      "The sum of size " + size + " and offset " + offset + " should not be larger than " +
-        "the size of the given memory space " + (obj.length * 8L + Platform.LONG_ARRAY_OFFSET);
-  }
-
-  public OnHeapMemoryBlock(long size) {
-    this(new long[Ints.checkedCast((size + 7) / 8)], Platform.LONG_ARRAY_OFFSET, size);
-  }
-
-  @Override
-  public MemoryBlock subBlock(long offset, long size) {
-    checkSubBlockRange(offset, size);
-    if (offset == 0 && size == this.size()) return this;
-    return new OnHeapMemoryBlock(array, this.offset + offset, size);
-  }
-
-  public long[] getLongArray() { return array; }
-
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static OnHeapMemoryBlock fromArray(final long[] array) {
-    return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
-  }
-
-  public static OnHeapMemoryBlock fromArray(final long[] array, long size) {
-    return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
-  }
-
-  @Override
-  public int getInt(long offset) {
-    return Platform.getInt(array, this.offset + offset);
-  }
-
-  @Override
-  public void putInt(long offset, int value) {
-    Platform.putInt(array, this.offset + offset, value);
-  }
-
-  @Override
-  public boolean getBoolean(long offset) {
-    return Platform.getBoolean(array, this.offset + offset);
-  }
-
-  @Override
-  public void putBoolean(long offset, boolean value) {
-    Platform.putBoolean(array, this.offset + offset, value);
-  }
-
-  @Override
-  public byte getByte(long offset) {
-    return Platform.getByte(array, this.offset + offset);
-  }
-
-  @Override
-  public void putByte(long offset, byte value) {
-    Platform.putByte(array, this.offset + offset, value);
-  }
-
-  @Override
-  public short getShort(long offset) {
-    return Platform.getShort(array, this.offset + offset);
-  }
-
-  @Override
-  public void putShort(long offset, short value) {
-    Platform.putShort(array, this.offset + offset, value);
-  }
-
-  @Override
-  public long getLong(long offset) {
-    return Platform.getLong(array, this.offset + offset);
-  }
-
-  @Override
-  public void putLong(long offset, long value) {
-    Platform.putLong(array, this.offset + offset, value);
-  }
-
-  @Override
-  public float getFloat(long offset) {
-    return Platform.getFloat(array, this.offset + offset);
-  }
-
-  @Override
-  public void putFloat(long offset, float value) {
-    Platform.putFloat(array, this.offset + offset, value);
-  }
-
-  @Override
-  public double getDouble(long offset) {
-    return Platform.getDouble(array, this.offset + offset);
-  }
-
-  @Override
-  public void putDouble(long offset, double value) {
-    Platform.putDouble(array, this.offset + offset, value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
index 5310bdf..4368fb6 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
@@ -25,9 +25,9 @@ import org.apache.spark.unsafe.Platform;
 public class UnsafeMemoryAllocator implements MemoryAllocator {
 
   @Override
-  public OffHeapMemoryBlock allocate(long size) throws OutOfMemoryError {
+  public MemoryBlock allocate(long size) throws OutOfMemoryError {
     long address = Platform.allocateMemory(size);
-    OffHeapMemoryBlock memory = new OffHeapMemoryBlock(address, size);
+    MemoryBlock memory = new MemoryBlock(null, address, size);
     if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
       memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
     }
@@ -36,25 +36,22 @@ public class UnsafeMemoryAllocator implements MemoryAllocator {
 
   @Override
   public void free(MemoryBlock memory) {
-    assert(memory instanceof OffHeapMemoryBlock) :
-      "UnsafeMemoryAllocator can only free OffHeapMemoryBlock.";
-    if (memory == OffHeapMemoryBlock.NULL) return;
-    assert (memory.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+    assert (memory.obj == null) :
+      "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?";
+    assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
       "page has already been freed";
-    assert ((memory.getPageNumber() == MemoryBlock.NO_PAGE_NUMBER)
-            || (memory.getPageNumber() == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+    assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
+            || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
       "TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()";
 
     if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
       memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
     }
-
     Platform.freeMemory(memory.offset);
-
     // As an additional layer of defense against use-after-free bugs, we mutate the
     // MemoryBlock to reset its pointer.
-    memory.resetObjAndOffset();
+    memory.offset = 0;
     // Mark the page as freed (so we can detect double-frees).
-    memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER);
+    memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index e91fc43..dff4a73 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -34,8 +34,6 @@ import com.google.common.primitives.Ints;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
-import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
-import org.apache.spark.unsafe.memory.MemoryBlock;
 
 import static org.apache.spark.unsafe.Platform.*;
 
@@ -53,13 +51,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
 
   // These are only updated by readExternal() or read()
   @Nonnull
-  private MemoryBlock base;
-  // While numBytes has the same value as base.size(), to keep as int avoids cast from long to int
+  private Object base;
+  private long offset;
   private int numBytes;
 
-  public MemoryBlock getMemoryBlock() { return base; }
-  public Object getBaseObject() { return base.getBaseObject(); }
-  public long getBaseOffset() { return base.getBaseOffset(); }
+  public Object getBaseObject() { return base; }
+  public long getBaseOffset() { return offset; }
 
   /**
    * A char in UTF-8 encoding can take 1-4 bytes depending on the first byte which
@@ -112,8 +109,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
    */
   public static UTF8String fromBytes(byte[] bytes) {
     if (bytes != null) {
-      return new UTF8String(
-        new ByteArrayMemoryBlock(bytes, BYTE_ARRAY_OFFSET, bytes.length));
+      return new UTF8String(bytes, BYTE_ARRAY_OFFSET, bytes.length);
     } else {
       return null;
     }
@@ -126,14 +122,20 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
    */
   public static UTF8String fromBytes(byte[] bytes, int offset, int numBytes) {
     if (bytes != null) {
-      return new UTF8String(
-        new ByteArrayMemoryBlock(bytes, BYTE_ARRAY_OFFSET + offset, numBytes));
+      return new UTF8String(bytes, BYTE_ARRAY_OFFSET + offset, numBytes);
     } else {
       return null;
     }
   }
 
   /**
+   * Creates an UTF8String from given address (base and offset) and length.
+   */
+  public static UTF8String fromAddress(Object base, long offset, int numBytes) {
+    return new UTF8String(base, offset, numBytes);
+  }
+
+  /**
    * Creates an UTF8String from String.
    */
   public static UTF8String fromString(String str) {
@@ -149,13 +151,16 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     return fromBytes(spaces);
   }
 
-  public UTF8String(MemoryBlock base) {
+  protected UTF8String(Object base, long offset, int numBytes) {
     this.base = base;
-    this.numBytes = Ints.checkedCast(base.size());
+    this.offset = offset;
+    this.numBytes = numBytes;
   }
 
   // for serialization
-  public UTF8String() {}
+  public UTF8String() {
+    this(null, 0, 0);
+  }
 
   /**
    * Writes the content of this string into a memory address, identified by an object and an offset.
@@ -163,7 +168,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
    * bytes in this string.
    */
   public void writeToMemory(Object target, long targetOffset) {
-    base.writeTo(0, target, targetOffset, numBytes);
+    Platform.copyMemory(base, offset, target, targetOffset, numBytes);
   }
 
   public void writeTo(ByteBuffer buffer) {
@@ -183,9 +188,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
    */
   @Nonnull
   public ByteBuffer getByteBuffer() {
-    long offset = base.getBaseOffset();
-    if (base instanceof ByteArrayMemoryBlock && offset >= BYTE_ARRAY_OFFSET) {
-      final byte[] bytes = ((ByteArrayMemoryBlock) base).getByteArray();
+    if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) {
+      final byte[] bytes = (byte[]) base;
 
       // the offset includes an object header... this is only needed for unsafe copies
       final long arrayOffset = offset - BYTE_ARRAY_OFFSET;
@@ -252,12 +256,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     long mask = 0;
     if (IS_LITTLE_ENDIAN) {
       if (numBytes >= 8) {
-        p = base.getLong(0);
+        p = Platform.getLong(base, offset);
       } else if (numBytes > 4) {
-        p = base.getLong(0);
+        p = Platform.getLong(base, offset);
         mask = (1L << (8 - numBytes) * 8) - 1;
       } else if (numBytes > 0) {
-        p = (long) base.getInt(0);
+        p = (long) Platform.getInt(base, offset);
         mask = (1L << (8 - numBytes) * 8) - 1;
       } else {
         p = 0;
@@ -266,12 +270,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     } else {
       // byteOrder == ByteOrder.BIG_ENDIAN
       if (numBytes >= 8) {
-        p = base.getLong(0);
+        p = Platform.getLong(base, offset);
       } else if (numBytes > 4) {
-        p = base.getLong(0);
+        p = Platform.getLong(base, offset);
         mask = (1L << (8 - numBytes) * 8) - 1;
       } else if (numBytes > 0) {
-        p = ((long) base.getInt(0)) << 32;
+        p = ((long) Platform.getInt(base, offset)) << 32;
         mask = (1L << (8 - numBytes) * 8) - 1;
       } else {
         p = 0;
@@ -286,13 +290,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
    */
   public byte[] getBytes() {
     // avoid copy if `base` is `byte[]`
-    long offset = base.getBaseOffset();
-    if (offset == BYTE_ARRAY_OFFSET && base instanceof ByteArrayMemoryBlock
-      && (((ByteArrayMemoryBlock) base).getByteArray()).length == numBytes) {
-      return ((ByteArrayMemoryBlock) base).getByteArray();
+    if (offset == BYTE_ARRAY_OFFSET && base instanceof byte[]
+      && ((byte[]) base).length == numBytes) {
+      return (byte[]) base;
     } else {
       byte[] bytes = new byte[numBytes];
-      base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, numBytes);
+      copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, numBytes);
       return bytes;
     }
   }
@@ -322,7 +325,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
 
     if (i > j) {
       byte[] bytes = new byte[i - j];
-      base.writeTo(j, bytes, BYTE_ARRAY_OFFSET, i - j);
+      copyMemory(base, offset + j, bytes, BYTE_ARRAY_OFFSET, i - j);
       return fromBytes(bytes);
     } else {
       return EMPTY_UTF8;
@@ -363,14 +366,14 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
    * Returns the byte at position `i`.
    */
   private byte getByte(int i) {
-    return base.getByte(i);
+    return Platform.getByte(base, offset + i);
   }
 
   private boolean matchAt(final UTF8String s, int pos) {
     if (s.numBytes + pos > numBytes || pos < 0) {
       return false;
     }
-    return ByteArrayMethods.arrayEqualsBlock(base, pos, s.base, 0, s.numBytes);
+    return ByteArrayMethods.arrayEquals(base, offset + pos, s.base, s.offset, s.numBytes);
   }
 
   public boolean startsWith(final UTF8String prefix) {
@@ -497,7 +500,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     for (int i = 0; i < numBytes; i++) {
       if (getByte(i) == (byte) ',') {
         if (i - (lastComma + 1) == match.numBytes &&
-          ByteArrayMethods.arrayEqualsBlock(base, lastComma + 1, match.base, 0, match.numBytes)) {
+          ByteArrayMethods.arrayEquals(base, offset + (lastComma + 1), match.base, match.offset,
+            match.numBytes)) {
           return n;
         }
         lastComma = i;
@@ -505,7 +509,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
       }
     }
     if (numBytes - (lastComma + 1) == match.numBytes &&
-      ByteArrayMethods.arrayEqualsBlock(base, lastComma + 1, match.base, 0, match.numBytes)) {
+      ByteArrayMethods.arrayEquals(base, offset + (lastComma + 1), match.base, match.offset,
+        match.numBytes)) {
       return n;
     }
     return 0;
@@ -520,7 +525,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
   private UTF8String copyUTF8String(int start, int end) {
     int len = end - start + 1;
     byte[] newBytes = new byte[len];
-    base.writeTo(start, newBytes, BYTE_ARRAY_OFFSET, len);
+    copyMemory(base, offset + start, newBytes, BYTE_ARRAY_OFFSET, len);
     return UTF8String.fromBytes(newBytes);
   }
 
@@ -667,7 +672,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     int i = 0; // position in byte
     while (i < numBytes) {
       int len = numBytesForFirstByte(getByte(i));
-      base.writeTo(i, result, BYTE_ARRAY_OFFSET + result.length - i - len, len);
+      copyMemory(this.base, this.offset + i, result,
+        BYTE_ARRAY_OFFSET + result.length - i - len, len);
 
       i += len;
     }
@@ -681,7 +687,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     }
 
     byte[] newBytes = new byte[numBytes * times];
-    base.writeTo(0, newBytes, BYTE_ARRAY_OFFSET, numBytes);
+    copyMemory(this.base, this.offset, newBytes, BYTE_ARRAY_OFFSET, numBytes);
 
     int copied = 1;
     while (copied < times) {
@@ -718,7 +724,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
       if (i + v.numBytes > numBytes) {
         return -1;
       }
-      if (ByteArrayMethods.arrayEqualsBlock(base, i, v.base, 0, v.numBytes)) {
+      if (ByteArrayMethods.arrayEquals(base, offset + i, v.base, v.offset, v.numBytes)) {
         return c;
       }
       i += numBytesForFirstByte(getByte(i));
@@ -734,7 +740,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
   private int find(UTF8String str, int start) {
     assert (str.numBytes > 0);
     while (start <= numBytes - str.numBytes) {
-      if (ByteArrayMethods.arrayEqualsBlock(base, start, str.base, 0, str.numBytes)) {
+      if (ByteArrayMethods.arrayEquals(base, offset + start, str.base, str.offset, str.numBytes)) {
         return start;
       }
       start += 1;
@@ -748,7 +754,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
   private int rfind(UTF8String str, int start) {
     assert (str.numBytes > 0);
     while (start >= 0) {
-      if (ByteArrayMethods.arrayEqualsBlock(base, start, str.base, 0, str.numBytes)) {
+      if (ByteArrayMethods.arrayEquals(base, offset + start, str.base, str.offset, str.numBytes)) {
         return start;
       }
       start -= 1;
@@ -781,7 +787,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
         return EMPTY_UTF8;
       }
       byte[] bytes = new byte[idx];
-      base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, idx);
+      copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, idx);
       return fromBytes(bytes);
 
     } else {
@@ -801,7 +807,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
       }
       int size = numBytes - delim.numBytes - idx;
       byte[] bytes = new byte[size];
-      base.writeTo(idx + delim.numBytes, bytes, BYTE_ARRAY_OFFSET, size);
+      copyMemory(base, offset + idx + delim.numBytes, bytes, BYTE_ARRAY_OFFSET, size);
       return fromBytes(bytes);
     }
   }
@@ -824,15 +830,15 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
       UTF8String remain = pad.substring(0, spaces - padChars * count);
 
       byte[] data = new byte[this.numBytes + pad.numBytes * count + remain.numBytes];
-      base.writeTo(0, data, BYTE_ARRAY_OFFSET, this.numBytes);
+      copyMemory(this.base, this.offset, data, BYTE_ARRAY_OFFSET, this.numBytes);
       int offset = this.numBytes;
       int idx = 0;
       while (idx < count) {
-        pad.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
+        copyMemory(pad.base, pad.offset, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
         ++ idx;
         offset += pad.numBytes;
       }
-      remain.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
+      copyMemory(remain.base, remain.offset, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
 
       return UTF8String.fromBytes(data);
     }
@@ -860,13 +866,13 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
       int offset = 0;
       int idx = 0;
       while (idx < count) {
-        pad.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
+        copyMemory(pad.base, pad.offset, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
         ++ idx;
         offset += pad.numBytes;
       }
-      remain.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
+      copyMemory(remain.base, remain.offset, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
       offset += remain.numBytes;
-      base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, numBytes());
+      copyMemory(this.base, this.offset, data, BYTE_ARRAY_OFFSET + offset, numBytes());
 
       return UTF8String.fromBytes(data);
     }
@@ -891,8 +897,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     int offset = 0;
     for (int i = 0; i < inputs.length; i++) {
       int len = inputs[i].numBytes;
-      inputs[i].base.writeTo(
-        0,
+      copyMemory(
+        inputs[i].base, inputs[i].offset,
         result, BYTE_ARRAY_OFFSET + offset,
         len);
       offset += len;
@@ -931,8 +937,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     for (int i = 0, j = 0; i < inputs.length; i++) {
       if (inputs[i] != null) {
         int len = inputs[i].numBytes;
-        inputs[i].base.writeTo(
-          0,
+        copyMemory(
+          inputs[i].base, inputs[i].offset,
           result, BYTE_ARRAY_OFFSET + offset,
           len);
         offset += len;
@@ -940,8 +946,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
         j++;
         // Add separator if this is not the last input.
         if (j < numInputs) {
-          separator.base.writeTo(
-            0,
+          copyMemory(
+            separator.base, separator.offset,
             result, BYTE_ARRAY_OFFSET + offset,
             separator.numBytes);
           offset += separator.numBytes;
@@ -1215,7 +1221,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
 
   public UTF8String copy() {
     byte[] bytes = new byte[numBytes];
-    base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, numBytes);
+    copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, numBytes);
     return fromBytes(bytes);
   }
 
@@ -1223,10 +1229,11 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
   public int compareTo(@Nonnull final UTF8String other) {
     int len = Math.min(numBytes, other.numBytes);
     int wordMax = (len / 8) * 8;
-    MemoryBlock rbase = other.base;
+    long roffset = other.offset;
+    Object rbase = other.base;
     for (int i = 0; i < wordMax; i += 8) {
-      long left = base.getLong(i);
-      long right = rbase.getLong(i);
+      long left = getLong(base, offset + i);
+      long right = getLong(rbase, roffset + i);
       if (left != right) {
         if (IS_LITTLE_ENDIAN) {
           return Long.compareUnsigned(Long.reverseBytes(left), Long.reverseBytes(right));
@@ -1237,7 +1244,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     }
     for (int i = wordMax; i < len; i++) {
       // In UTF-8, the byte should be unsigned, so we should compare them as unsigned int.
-      int res = (getByte(i) & 0xFF) - (rbase.getByte(i) & 0xFF);
+      int res = (getByte(i) & 0xFF) - (Platform.getByte(rbase, roffset + i) & 0xFF);
       if (res != 0) {
         return res;
       }
@@ -1256,7 +1263,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
       if (numBytes != o.numBytes) {
         return false;
       }
-      return ByteArrayMethods.arrayEqualsBlock(base, 0, o.base, 0, numBytes);
+      return ByteArrayMethods.arrayEquals(base, offset, o.base, o.offset, numBytes);
     } else {
       return false;
     }
@@ -1312,8 +1319,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
               num_bytes_j != numBytesForFirstByte(s.getByte(i_bytes))) {
           cost = 1;
         } else {
-          cost = (ByteArrayMethods.arrayEqualsBlock(t.base, j_bytes, s.base,
-            i_bytes, num_bytes_j)) ? 0 : 1;
+          cost = (ByteArrayMethods.arrayEquals(t.base, t.offset + j_bytes, s.base,
+              s.offset + i_bytes, num_bytes_j)) ? 0 : 1;
         }
         d[i + 1] = Math.min(Math.min(d[i] + 1, p[i + 1] + 1), p[i] + cost);
       }
@@ -1328,7 +1335,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
 
   @Override
   public int hashCode() {
-    return Murmur3_x86_32.hashUnsafeBytesBlock(base,42);
+    return Murmur3_x86_32.hashUnsafeBytes(base, offset, numBytes, 42);
   }
 
   /**
@@ -1391,10 +1398,10 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
   }
 
   public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    offset = BYTE_ARRAY_OFFSET;
     numBytes = in.readInt();
-    byte[] bytes = new byte[numBytes];
-    in.readFully(bytes);
-    base = ByteArrayMemoryBlock.fromArray(bytes);
+    base = new byte[numBytes];
+    in.readFully((byte[]) base);
   }
 
   @Override
@@ -1406,10 +1413,10 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
 
   @Override
   public void read(Kryo kryo, Input in) {
-    numBytes = in.readInt();
-    byte[] bytes = new byte[numBytes];
-    in.read(bytes);
-    base = ByteArrayMemoryBlock.fromArray(bytes);
+    this.offset = BYTE_ARRAY_OFFSET;
+    this.numBytes = in.readInt();
+    this.base = new byte[numBytes];
+    in.read((byte[]) base);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
index 583a148..3ad9ac7 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
@@ -81,7 +81,7 @@ public class PlatformUtilSuite {
     MemoryAllocator.HEAP.free(block);
     Assert.assertNull(block.getBaseObject());
     Assert.assertEquals(0, block.getBaseOffset());
-    Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.getPageNumber());
+    Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.pageNumber);
   }
 
   @Test
@@ -92,7 +92,7 @@ public class PlatformUtilSuite {
     MemoryAllocator.UNSAFE.free(block);
     Assert.assertNull(block.getBaseObject());
     Assert.assertEquals(0, block.getBaseOffset());
-    Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.getPageNumber());
+    Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.pageNumber);
   }
 
   @Test(expected = AssertionError.class)

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
index 8c2e98c..fb8e53b 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
@@ -20,13 +20,14 @@ package org.apache.spark.unsafe.array;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.spark.unsafe.memory.OnHeapMemoryBlock;
+import org.apache.spark.unsafe.memory.MemoryBlock;
 
 public class LongArraySuite {
 
   @Test
   public void basicTest() {
-    LongArray arr = new LongArray(new OnHeapMemoryBlock(16));
+    long[] bytes = new long[2];
+    LongArray arr = new LongArray(MemoryBlock.fromLongArray(bytes));
     arr.set(0, 1L);
     arr.set(1, 2L);
     arr.set(1, 3L);

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
index d989877..6348a73 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
@@ -71,24 +71,6 @@ public class Murmur3_x86_32Suite {
   }
 
   @Test
-  public void testKnownWordsInputs() {
-    byte[] bytes = new byte[16];
-    long offset = Platform.BYTE_ARRAY_OFFSET;
-    for (int i = 0; i < 16; i++) {
-      bytes[i] = 0;
-    }
-    Assert.assertEquals(-300363099, Murmur3_x86_32.hashUnsafeWords(bytes, offset, 16, 42));
-    for (int i = 0; i < 16; i++) {
-      bytes[i] = -1;
-    }
-    Assert.assertEquals(-1210324667, Murmur3_x86_32.hashUnsafeWords(bytes, offset, 16, 42));
-    for (int i = 0; i < 16; i++) {
-      bytes[i] = (byte)i;
-    }
-    Assert.assertEquals(-634919701, Murmur3_x86_32.hashUnsafeWords(bytes, offset, 16, 42));
-  }
-
-  @Test
   public void randomizedStressTest() {
     int size = 65536;
     Random rand = new Random();

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java
deleted file mode 100644
index ef5ff8e..0000000
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.memory;
-
-import org.apache.spark.unsafe.Platform;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.nio.ByteOrder;
-
-import static org.hamcrest.core.StringContains.containsString;
-
-public class MemoryBlockSuite {
-  private static final boolean bigEndianPlatform =
-    ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
-
-  private void check(MemoryBlock memory, Object obj, long offset, int length) {
-    memory.setPageNumber(1);
-    memory.fill((byte)-1);
-    memory.putBoolean(0, true);
-    memory.putByte(1, (byte)127);
-    memory.putShort(2, (short)257);
-    memory.putInt(4, 0x20000002);
-    memory.putLong(8, 0x1234567089ABCDEFL);
-    memory.putFloat(16, 1.0F);
-    memory.putLong(20, 0x1234567089ABCDEFL);
-    memory.putDouble(28, 2.0);
-    MemoryBlock.copyMemory(memory, 0L, memory, 36, 4);
-    int[] a = new int[2];
-    a[0] = 0x12345678;
-    a[1] = 0x13579BDF;
-    memory.copyFrom(a, Platform.INT_ARRAY_OFFSET, 40, 8);
-    byte[] b = new byte[8];
-    memory.writeTo(40, b, Platform.BYTE_ARRAY_OFFSET, 8);
-
-    Assert.assertEquals(obj, memory.getBaseObject());
-    Assert.assertEquals(offset, memory.getBaseOffset());
-    Assert.assertEquals(length, memory.size());
-    Assert.assertEquals(1, memory.getPageNumber());
-    Assert.assertEquals(true, memory.getBoolean(0));
-    Assert.assertEquals((byte)127, memory.getByte(1 ));
-    Assert.assertEquals((short)257, memory.getShort(2));
-    Assert.assertEquals(0x20000002, memory.getInt(4));
-    Assert.assertEquals(0x1234567089ABCDEFL, memory.getLong(8));
-    Assert.assertEquals(1.0F, memory.getFloat(16), 0);
-    Assert.assertEquals(0x1234567089ABCDEFL, memory.getLong(20));
-    Assert.assertEquals(2.0, memory.getDouble(28), 0);
-    Assert.assertEquals(true, memory.getBoolean(36));
-    Assert.assertEquals((byte)127, memory.getByte(37 ));
-    Assert.assertEquals((short)257, memory.getShort(38));
-    Assert.assertEquals(a[0], memory.getInt(40));
-    Assert.assertEquals(a[1], memory.getInt(44));
-    if (bigEndianPlatform) {
-      Assert.assertEquals(a[0],
-        ((int)b[0] & 0xff) << 24 | ((int)b[1] & 0xff) << 16 |
-        ((int)b[2] & 0xff) << 8 | ((int)b[3] & 0xff));
-      Assert.assertEquals(a[1],
-        ((int)b[4] & 0xff) << 24 | ((int)b[5] & 0xff) << 16 |
-        ((int)b[6] & 0xff) << 8 | ((int)b[7] & 0xff));
-    } else {
-      Assert.assertEquals(a[0],
-        ((int)b[3] & 0xff) << 24 | ((int)b[2] & 0xff) << 16 |
-        ((int)b[1] & 0xff) << 8 | ((int)b[0] & 0xff));
-      Assert.assertEquals(a[1],
-        ((int)b[7] & 0xff) << 24 | ((int)b[6] & 0xff) << 16 |
-        ((int)b[5] & 0xff) << 8 | ((int)b[4] & 0xff));
-    }
-    for (int i = 48; i < memory.size(); i++) {
-      Assert.assertEquals((byte) -1, memory.getByte(i));
-    }
-
-    assert(memory.subBlock(0, memory.size()) == memory);
-
-    try {
-      memory.subBlock(-8, 8);
-      Assert.fail();
-    } catch (Exception expected) {
-      Assert.assertThat(expected.getMessage(), containsString("non-negative"));
-    }
-
-    try {
-      memory.subBlock(0, -8);
-      Assert.fail();
-    } catch (Exception expected) {
-      Assert.assertThat(expected.getMessage(), containsString("non-negative"));
-    }
-
-    try {
-      memory.subBlock(0, length + 8);
-      Assert.fail();
-    } catch (Exception expected) {
-      Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
-    }
-
-    try {
-      memory.subBlock(8, length - 4);
-      Assert.fail();
-    } catch (Exception expected) {
-      Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
-    }
-
-    try {
-      memory.subBlock(length + 8, 4);
-      Assert.fail();
-    } catch (Exception expected) {
-      Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
-    }
-
-    memory.setPageNumber(MemoryBlock.NO_PAGE_NUMBER);
-  }
-
-  @Test
-  public void testByteArrayMemoryBlock() {
-    byte[] obj = new byte[56];
-    long offset = Platform.BYTE_ARRAY_OFFSET;
-    int length = obj.length;
-
-    MemoryBlock memory = new ByteArrayMemoryBlock(obj, offset, length);
-    check(memory, obj, offset, length);
-
-    memory = ByteArrayMemoryBlock.fromArray(obj);
-    check(memory, obj, offset, length);
-
-    obj = new byte[112];
-    memory = new ByteArrayMemoryBlock(obj, offset, length);
-    check(memory, obj, offset, length);
-  }
-
-  @Test
-  public void testOnHeapMemoryBlock() {
-    long[] obj = new long[7];
-    long offset = Platform.LONG_ARRAY_OFFSET;
-    int length = obj.length * 8;
-
-    MemoryBlock memory = new OnHeapMemoryBlock(obj, offset, length);
-    check(memory, obj, offset, length);
-
-    memory = OnHeapMemoryBlock.fromArray(obj);
-    check(memory, obj, offset, length);
-
-    obj = new long[14];
-    memory = new OnHeapMemoryBlock(obj, offset, length);
-    check(memory, obj, offset, length);
-  }
-
-  @Test
-  public void testOffHeapArrayMemoryBlock() {
-    MemoryAllocator memoryAllocator = new UnsafeMemoryAllocator();
-    MemoryBlock memory = memoryAllocator.allocate(56);
-    Object obj = memory.getBaseObject();
-    long offset = memory.getBaseOffset();
-    int length = 56;
-
-    check(memory, obj, offset, length);
-    memoryAllocator.free(memory);
-
-    long address = Platform.allocateMemory(112);
-    memory = new OffHeapMemoryBlock(address, length);
-    obj = memory.getBaseObject();
-    offset = memory.getBaseOffset();
-    check(memory, obj, offset, length);
-    Platform.freeMemory(address);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
index 42dda30..dae13f0 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
@@ -25,8 +25,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.*;
 
 import com.google.common.collect.ImmutableMap;
-import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
-import org.apache.spark.unsafe.memory.OnHeapMemoryBlock;
+import org.apache.spark.unsafe.Platform;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -514,13 +513,28 @@ public class UTF8StringSuite {
   }
 
   @Test
+  public void writeToOutputStreamUnderflow() throws IOException {
+    // offset underflow is apparently supported?
+    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
+
+    for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
+      UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i)
+          .writeTo(outputStream);
+      final ByteBuffer buffer = ByteBuffer.wrap(outputStream.toByteArray(), i, test.length);
+      assertEquals("01234567", StandardCharsets.UTF_8.decode(buffer).toString());
+      outputStream.reset();
+    }
+  }
+
+  @Test
   public void writeToOutputStreamSlice() throws IOException {
     final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
     final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
 
     for (int i = 0; i < test.length; ++i) {
       for (int j = 0; j < test.length - i; ++j) {
-        new UTF8String(ByteArrayMemoryBlock.fromArray(test).subBlock(i, j))
+        UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET + i, j)
             .writeTo(outputStream);
 
         assertArrayEquals(Arrays.copyOfRange(test, i, i + j), outputStream.toByteArray());
@@ -551,7 +565,7 @@ public class UTF8StringSuite {
 
     for (final long offset : offsets) {
       try {
-        new UTF8String(ByteArrayMemoryBlock.fromArray(test).subBlock(offset, test.length))
+        fromAddress(test, BYTE_ARRAY_OFFSET + offset, test.length)
             .writeTo(outputStream);
 
         throw new IllegalStateException(Long.toString(offset));
@@ -578,25 +592,26 @@ public class UTF8StringSuite {
   }
 
   @Test
-  public void writeToOutputStreamLongArray() throws IOException {
+  public void writeToOutputStreamIntArray() throws IOException {
     // verify that writes work on objects that are not byte arrays
-    final ByteBuffer buffer = StandardCharsets.UTF_8.encode("3千大千世界");
+    final ByteBuffer buffer = StandardCharsets.UTF_8.encode("大千世界");
     buffer.position(0);
     buffer.order(ByteOrder.nativeOrder());
 
     final int length = buffer.limit();
-    assertEquals(16, length);
+    assertEquals(12, length);
 
-    final int longs = length / 8;
-    final long[] array = new long[longs];
+    final int ints = length / 4;
+    final int[] array = new int[ints];
 
-    for (int i = 0; i < longs; ++i) {
-      array[i] = buffer.getLong();
+    for (int i = 0; i < ints; ++i) {
+      array[i] = buffer.getInt();
     }
 
     final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    new UTF8String(OnHeapMemoryBlock.fromArray(array)).writeTo(outputStream);
-    assertEquals("3千大千世界", outputStream.toString("UTF-8"));
+    fromAddress(array, Platform.INT_ARRAY_OFFSET, length)
+        .writeTo(outputStream);
+    assertEquals("大千世界", outputStream.toString("UTF-8"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 8651a63..d07faf1 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -311,7 +311,7 @@ public class TaskMemoryManager {
       // this could trigger spilling to free some pages.
       return allocatePage(size, consumer);
     }
-    page.setPageNumber(pageNumber);
+    page.pageNumber = pageNumber;
     pageTable[pageNumber] = page;
     if (logger.isTraceEnabled()) {
       logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
@@ -323,25 +323,25 @@ public class TaskMemoryManager {
    * Free a block of memory allocated via {@link TaskMemoryManager#allocatePage}.
    */
   public void freePage(MemoryBlock page, MemoryConsumer consumer) {
-    assert (page.getPageNumber() != MemoryBlock.NO_PAGE_NUMBER) :
+    assert (page.pageNumber != MemoryBlock.NO_PAGE_NUMBER) :
       "Called freePage() on memory that wasn't allocated with allocatePage()";
-    assert (page.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+    assert (page.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
       "Called freePage() on a memory block that has already been freed";
-    assert (page.getPageNumber() != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) :
+    assert (page.pageNumber != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) :
             "Called freePage() on a memory block that has already been freed";
-    assert(allocatedPages.get(page.getPageNumber()));
-    pageTable[page.getPageNumber()] = null;
+    assert(allocatedPages.get(page.pageNumber));
+    pageTable[page.pageNumber] = null;
     synchronized (this) {
-      allocatedPages.clear(page.getPageNumber());
+      allocatedPages.clear(page.pageNumber);
     }
     if (logger.isTraceEnabled()) {
-      logger.trace("Freed page number {} ({} bytes)", page.getPageNumber(), page.size());
+      logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
     }
     long pageSize = page.size();
     // Clear the page number before passing the block to the MemoryAllocator's free().
     // Doing this allows the MemoryAllocator to detect when a TaskMemoryManager-managed
     // page has been inappropriately directly freed without calling TMM.freePage().
-    page.setPageNumber(MemoryBlock.FREED_IN_TMM_PAGE_NUMBER);
+    page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
     memoryManager.tungstenMemoryAllocator().free(page);
     releaseExecutionMemory(pageSize, consumer);
   }
@@ -363,7 +363,7 @@ public class TaskMemoryManager {
       // relative to the page's base offset; this relative offset will fit in 51 bits.
       offsetInPage -= page.getBaseOffset();
     }
-    return encodePageNumberAndOffset(page.getPageNumber(), offsetInPage);
+    return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
   }
 
   @VisibleForTesting
@@ -434,7 +434,7 @@ public class TaskMemoryManager {
       for (MemoryBlock page : pageTable) {
         if (page != null) {
           logger.debug("unreleased page: " + page + " in task " + taskAttemptId);
-          page.setPageNumber(MemoryBlock.FREED_IN_TMM_PAGE_NUMBER);
+          page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
           memoryManager.tungstenMemoryAllocator().free(page);
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index 4b48599..0d06912 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -20,6 +20,7 @@ package org.apache.spark.shuffle.sort;
 import java.util.Comparator;
 
 import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.util.collection.Sorter;
@@ -112,7 +113,13 @@ final class ShuffleInMemorySorter {
 
   public void expandPointerArray(LongArray newArray) {
     assert(newArray.size() > array.size());
-    MemoryBlock.copyMemory(array.memoryBlock(), newArray.memoryBlock(), pos * 8L);
+    Platform.copyMemory(
+      array.getBaseObject(),
+      array.getBaseOffset(),
+      newArray.getBaseObject(),
+      newArray.getBaseOffset(),
+      pos * 8L
+    );
     consumer.freeArray(array);
     array = newArray;
     usableCapacity = getUsableCapacity();
@@ -181,7 +188,10 @@ final class ShuffleInMemorySorter {
         PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,
         PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
     } else {
-      MemoryBlock unused = array.memoryBlock().subBlock(pos * 8L, (array.size() - pos) * 8L);
+      MemoryBlock unused = new MemoryBlock(
+        array.getBaseObject(),
+        array.getBaseOffset() + pos * 8L,
+        (array.size() - pos) * 8L);
       LongArray buffer = new LongArray(unused);
       Sorter<PackedRecordPointer, LongArray> sorter =
         new Sorter<>(new ShuffleSortDataFormat(buffer));

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
index 254449e..717bdd7 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
@@ -17,8 +17,8 @@
 
 package org.apache.spark.shuffle.sort;
 
+import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.LongArray;
-import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.util.collection.SortDataFormat;
 
 final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, LongArray> {
@@ -60,8 +60,13 @@ final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, Lo
 
   @Override
   public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) {
-    MemoryBlock.copyMemory(src.memoryBlock(), srcPos * 8L,
-      dst.memoryBlock(),dstPos * 8L,length * 8L);
+    Platform.copyMemory(
+      src.getBaseObject(),
+      src.getBaseOffset() + srcPos * 8L,
+      dst.getBaseObject(),
+      dst.getBaseOffset() + dstPos * 8L,
+      length * 8L
+    );
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 399251b..5056652 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -544,7 +544,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
           // is accessing the current record. We free this page in that caller's next loadNext()
           // call.
           for (MemoryBlock page : allocatedPages) {
-            if (!loaded || page.getPageNumber() !=
+            if (!loaded || page.pageNumber !=
                     ((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) {
               released += page.size();
               freePage(page);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org