You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/09 13:25:37 UTC

[2/2] spark git commit: Revert [SPARK-10399] [SPARK-23879] [SPARK-23762] [SPARK-25317]

Revert [SPARK-10399] [SPARK-23879] [SPARK-23762] [SPARK-25317]

## What changes were proposed in this pull request?

When running TPC-DS benchmarks on 2.4 release, npoggi and winglungngai  saw more than 10% performance regression on the following queries: q67, q24a and q24b. After we applying the PR https://github.com/apache/spark/pull/22338, the performance regression still exists. If we revert the changes in https://github.com/apache/spark/pull/19222, npoggi and winglungngai  found the performance regression was resolved. Thus, this PR is to revert the related changes for unblocking the 2.4 release.

In the future release, we still can continue the investigation and find out the root cause of the regression.

## How was this patch tested?

The existing test cases

Closes #22361 from gatorsmile/revertMemoryBlock.

Authored-by: gatorsmile <ga...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b9ccd55
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b9ccd55
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b9ccd55

Branch: refs/heads/master
Commit: 0b9ccd55c2986957863dcad3b44ce80403eecfa1
Parents: 1cfda44
Author: gatorsmile <ga...@gmail.com>
Authored: Sun Sep 9 21:25:19 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sun Sep 9 21:25:19 2018 +0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/HiveHasher.java    |  18 +-
 .../java/org/apache/spark/unsafe/Platform.java  |   2 +-
 .../spark/unsafe/array/ByteArrayMethods.java    |  15 +-
 .../apache/spark/unsafe/array/LongArray.java    |  17 +-
 .../spark/unsafe/hash/Murmur3_x86_32.java       |  53 ++----
 .../unsafe/memory/ByteArrayMemoryBlock.java     | 128 -------------
 .../unsafe/memory/HeapMemoryAllocator.java      |  21 +--
 .../spark/unsafe/memory/MemoryAllocator.java    |   4 +-
 .../apache/spark/unsafe/memory/MemoryBlock.java | 157 ++--------------
 .../spark/unsafe/memory/MemoryLocation.java     |  54 ++++++
 .../spark/unsafe/memory/OffHeapMemoryBlock.java | 105 -----------
 .../spark/unsafe/memory/OnHeapMemoryBlock.java  | 132 --------------
 .../unsafe/memory/UnsafeMemoryAllocator.java    |  21 +--
 .../apache/spark/unsafe/types/UTF8String.java   | 147 +++++++--------
 .../apache/spark/unsafe/PlatformUtilSuite.java  |   4 +-
 .../spark/unsafe/array/LongArraySuite.java      |   5 +-
 .../spark/unsafe/hash/Murmur3_x86_32Suite.java  |  18 --
 .../spark/unsafe/memory/MemoryBlockSuite.java   | 179 -------------------
 .../spark/unsafe/types/UTF8StringSuite.java     |  41 +++--
 .../apache/spark/memory/TaskMemoryManager.java  |  22 +--
 .../shuffle/sort/ShuffleInMemorySorter.java     |  14 +-
 .../shuffle/sort/ShuffleSortDataFormat.java     |  11 +-
 .../unsafe/sort/UnsafeExternalSorter.java       |   2 +-
 .../unsafe/sort/UnsafeInMemorySorter.java       |  13 +-
 .../spark/memory/TaskMemoryManagerSuite.java    |   2 +-
 .../util/collection/ExternalSorterSuite.scala   |   7 +-
 .../collection/unsafe/sort/RadixSortSuite.scala |  10 +-
 .../apache/spark/ml/feature/FeatureHasher.scala |   5 +-
 .../apache/spark/mllib/feature/HashingTF.scala  |   2 +-
 .../catalyst/expressions/UnsafeArrayData.java   |   4 +-
 .../sql/catalyst/expressions/UnsafeRow.java     |   4 +-
 .../spark/sql/catalyst/expressions/XXH64.java   |  47 +++--
 .../expressions/codegen/UTF8StringBuilder.java  |  35 ++--
 .../spark/sql/catalyst/expressions/hash.scala   |  37 ++--
 .../catalyst/expressions/HiveHasherSuite.java   |  21 ++-
 .../sql/catalyst/expressions/XXH64Suite.java    |  18 +-
 .../codegen/UTF8StringBuilderSuite.scala        |  42 -----
 .../vectorized/OffHeapColumnVector.java         |   3 +-
 .../spark/sql/vectorized/ArrowColumnVector.java |   6 +-
 .../sql/execution/benchmark/SortBenchmark.scala |  16 +-
 .../sql/execution/python/RowQueueSuite.scala    |   4 +-
 41 files changed, 376 insertions(+), 1070 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
index 62b75ae..7357743 100644
--- a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
+++ b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java
@@ -17,8 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions;
 
-import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.unsafe.Platform;
 
 /**
  * Simulates Hive's hashing function from Hive v1.2.1
@@ -39,21 +38,12 @@ public class HiveHasher {
     return (int) ((input >>> 32) ^ input);
   }
 
-  public static int hashUnsafeBytesBlock(MemoryBlock mb) {
-    long lengthInBytes = mb.size();
+  public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
     assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
     int result = 0;
-    for (long i = 0; i < lengthInBytes; i++) {
-      result = (result * 31) + (int) mb.getByte(i);
+    for (int i = 0; i < lengthInBytes; i++) {
+      result = (result * 31) + (int) Platform.getByte(base, offset + i);
     }
     return result;
   }
-
-  public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes) {
-    return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes));
-  }
-
-  public static int hashUTF8String(UTF8String str) {
-    return hashUnsafeBytesBlock(str.getMemoryBlock());
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 54dcadf..aca6fca 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -187,7 +187,7 @@ public final class Platform {
   }
 
   public static void copyMemory(
-      Object src, long srcOffset, Object dst, long dstOffset, long length) {
+    Object src, long srcOffset, Object dst, long dstOffset, long length) {
     // Check if dstOffset is before or after srcOffset to determine if we should copy
     // forward or backwards. This is necessary in case src and dst overlap.
     if (dstOffset < srcOffset) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
index ef0f78d..cec8c30 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java
@@ -18,7 +18,6 @@
 package org.apache.spark.unsafe.array;
 
 import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.memory.MemoryBlock;
 
 public class ByteArrayMethods {
 
@@ -54,24 +53,14 @@ public class ByteArrayMethods {
 
   private static final boolean unaligned = Platform.unaligned();
   /**
-   * MemoryBlock equality check for MemoryBlocks.
-   * @return true if the arrays are equal, false otherwise
-   */
-  public static boolean arrayEqualsBlock(
-      MemoryBlock leftBase, long leftOffset, MemoryBlock rightBase, long rightOffset, long length) {
-    return arrayEquals(leftBase.getBaseObject(), leftBase.getBaseOffset() + leftOffset,
-      rightBase.getBaseObject(), rightBase.getBaseOffset() + rightOffset, length);
-  }
-
-  /**
    * Optimized byte array equality check for byte arrays.
    * @return true if the arrays are equal, false otherwise
    */
   public static boolean arrayEquals(
-      Object leftBase, long leftOffset, Object rightBase, long rightOffset, long length) {
+      Object leftBase, long leftOffset, Object rightBase, long rightOffset, final long length) {
     int i = 0;
 
-    // check if starts align and we can get both offsets to be aligned
+    // check if stars align and we can get both offsets to be aligned
     if ((leftOffset % 8) == (rightOffset % 8)) {
       while ((leftOffset + i) % 8 != 0 && i < length) {
         if (Platform.getByte(leftBase, leftOffset + i) !=

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
index b74d2de..2cd39bd 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.unsafe.array;
 
+import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 
 /**
@@ -32,12 +33,16 @@ public final class LongArray {
   private static final long WIDTH = 8;
 
   private final MemoryBlock memory;
+  private final Object baseObj;
+  private final long baseOffset;
 
   private final long length;
 
   public LongArray(MemoryBlock memory) {
     assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
     this.memory = memory;
+    this.baseObj = memory.getBaseObject();
+    this.baseOffset = memory.getBaseOffset();
     this.length = memory.size() / WIDTH;
   }
 
@@ -46,11 +51,11 @@ public final class LongArray {
   }
 
   public Object getBaseObject() {
-    return memory.getBaseObject();
+    return baseObj;
   }
 
   public long getBaseOffset() {
-    return memory.getBaseOffset();
+    return baseOffset;
   }
 
   /**
@@ -64,8 +69,8 @@ public final class LongArray {
    * Fill this all with 0L.
    */
   public void zeroOut() {
-    for (long off = 0; off < length * WIDTH; off += WIDTH) {
-      memory.putLong(off, 0);
+    for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) {
+      Platform.putLong(baseObj, off, 0);
     }
   }
 
@@ -75,7 +80,7 @@ public final class LongArray {
   public void set(int index, long value) {
     assert index >= 0 : "index (" + index + ") should >= 0";
     assert index < length : "index (" + index + ") should < length (" + length + ")";
-    memory.putLong(index * WIDTH, value);
+    Platform.putLong(baseObj, baseOffset + index * WIDTH, value);
   }
 
   /**
@@ -84,6 +89,6 @@ public final class LongArray {
   public long get(int index) {
     assert index >= 0 : "index (" + index + ") should >= 0";
     assert index < length : "index (" + index + ") should < length (" + length + ")";
-    return memory.getLong(index * WIDTH);
+    return Platform.getLong(baseObj, baseOffset + index * WIDTH);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
index 566f116..d239de6 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java
@@ -17,11 +17,7 @@
 
 package org.apache.spark.unsafe.hash;
 
-import com.google.common.primitives.Ints;
-
 import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * 32-bit Murmur3 hasher.  This is based on Guava's Murmur3_32HashFunction.
@@ -53,74 +49,49 @@ public final class Murmur3_x86_32 {
   }
 
   public int hashUnsafeWords(Object base, long offset, int lengthInBytes) {
-    return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
+    return hashUnsafeWords(base, offset, lengthInBytes, seed);
   }
 
-  public static int hashUnsafeWordsBlock(MemoryBlock base, int seed) {
+  public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
     // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
-    int lengthInBytes = Ints.checkedCast(base.size());
     assert (lengthInBytes % 8 == 0): "lengthInBytes must be a multiple of 8 (word-aligned)";
-    int h1 = hashBytesByIntBlock(base, lengthInBytes, seed);
+    int h1 = hashBytesByInt(base, offset, lengthInBytes, seed);
     return fmix(h1, lengthInBytes);
   }
 
-  public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, int seed) {
-    // This is based on Guava's `Murmur32_Hasher.processRemaining(ByteBuffer)` method.
-    return hashUnsafeWordsBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
-  }
-
-  public static int hashUnsafeBytesBlock(MemoryBlock base, int seed) {
-    return hashUnsafeBytesBlock(base, Ints.checkedCast(base.size()), seed);
-  }
-
-  private static int hashUnsafeBytesBlock(MemoryBlock base, int lengthInBytes, int seed) {
+  public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
     // This is not compatible with original and another implementations.
     // But remain it for backward compatibility for the components existing before 2.3.
     assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
     int lengthAligned = lengthInBytes - lengthInBytes % 4;
-    int h1 = hashBytesByIntBlock(base, lengthAligned, seed);
-    long offset = base.getBaseOffset();
-    Object o = base.getBaseObject();
+    int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
     for (int i = lengthAligned; i < lengthInBytes; i++) {
-      int halfWord = Platform.getByte(o, offset + i);
+      int halfWord = Platform.getByte(base, offset + i);
       int k1 = mixK1(halfWord);
       h1 = mixH1(h1, k1);
     }
     return fmix(h1, lengthInBytes);
   }
 
-  public static int hashUTF8String(UTF8String str, int seed) {
-    return hashUnsafeBytesBlock(str.getMemoryBlock(), str.numBytes(), seed);
-  }
-
-  public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
-    return hashUnsafeBytesBlock(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
-  }
-
   public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
-    return hashUnsafeBytes2Block(MemoryBlock.allocateFromObject(base, offset, lengthInBytes), seed);
-  }
-
-  public static int hashUnsafeBytes2Block(MemoryBlock base, int seed) {
-    // This is compatible with original and other implementations.
+    // This is compatible with original and another implementations.
     // Use this method for new components after Spark 2.3.
-    int lengthInBytes = Ints.checkedCast(base.size());
-    assert (lengthInBytes >= 0) : "lengthInBytes cannot be negative";
+    assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
     int lengthAligned = lengthInBytes - lengthInBytes % 4;
-    int h1 = hashBytesByIntBlock(base, lengthAligned, seed);
+    int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
     int k1 = 0;
     for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
-      k1 ^= (base.getByte(i) & 0xFF) << shift;
+      k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
     }
     h1 ^= mixK1(k1);
     return fmix(h1, lengthInBytes);
   }
 
-  private static int hashBytesByIntBlock(MemoryBlock base, int lengthInBytes, int seed) {
+  private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
     assert (lengthInBytes % 4 == 0);
     int h1 = seed;
     for (int i = 0; i < lengthInBytes; i += 4) {
-      int halfWord = base.getInt(i);
+      int halfWord = Platform.getInt(base, offset + i);
       int k1 = mixK1(halfWord);
       h1 = mixH1(h1, k1);
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
deleted file mode 100644
index 9f23863..0000000
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.memory;
-
-import com.google.common.primitives.Ints;
-
-import org.apache.spark.unsafe.Platform;
-
-/**
- * A consecutive block of memory with a byte array on Java heap.
- */
-public final class ByteArrayMemoryBlock extends MemoryBlock {
-
-  private final byte[] array;
-
-  public ByteArrayMemoryBlock(byte[] obj, long offset, long size) {
-    super(obj, offset, size);
-    this.array = obj;
-    assert(offset + size <= Platform.BYTE_ARRAY_OFFSET + obj.length) :
-      "The sum of size " + size + " and offset " + offset + " should not be larger than " +
-        "the size of the given memory space " + (obj.length + Platform.BYTE_ARRAY_OFFSET);
-  }
-
-  public ByteArrayMemoryBlock(long length) {
-    this(new byte[Ints.checkedCast(length)], Platform.BYTE_ARRAY_OFFSET, length);
-  }
-
-  @Override
-  public MemoryBlock subBlock(long offset, long size) {
-    checkSubBlockRange(offset, size);
-    if (offset == 0 && size == this.size()) return this;
-    return new ByteArrayMemoryBlock(array, this.offset + offset, size);
-  }
-
-  public byte[] getByteArray() { return array; }
-
-  /**
-   * Creates a memory block pointing to the memory used by the byte array.
-   */
-  public static ByteArrayMemoryBlock fromArray(final byte[] array) {
-    return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length);
-  }
-
-  @Override
-  public int getInt(long offset) {
-    return Platform.getInt(array, this.offset + offset);
-  }
-
-  @Override
-  public void putInt(long offset, int value) {
-    Platform.putInt(array, this.offset + offset, value);
-  }
-
-  @Override
-  public boolean getBoolean(long offset) {
-    return Platform.getBoolean(array, this.offset + offset);
-  }
-
-  @Override
-  public void putBoolean(long offset, boolean value) {
-    Platform.putBoolean(array, this.offset + offset, value);
-  }
-
-  @Override
-  public byte getByte(long offset) {
-    return array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)];
-  }
-
-  @Override
-  public void putByte(long offset, byte value) {
-    array[(int)(this.offset + offset - Platform.BYTE_ARRAY_OFFSET)] = value;
-  }
-
-  @Override
-  public short getShort(long offset) {
-    return Platform.getShort(array, this.offset + offset);
-  }
-
-  @Override
-  public void putShort(long offset, short value) {
-    Platform.putShort(array, this.offset + offset, value);
-  }
-
-  @Override
-  public long getLong(long offset) {
-    return Platform.getLong(array, this.offset + offset);
-  }
-
-  @Override
-  public void putLong(long offset, long value) {
-    Platform.putLong(array, this.offset + offset, value);
-  }
-
-  @Override
-  public float getFloat(long offset) {
-    return Platform.getFloat(array, this.offset + offset);
-  }
-
-  @Override
-  public void putFloat(long offset, float value) {
-    Platform.putFloat(array, this.offset + offset, value);
-  }
-
-  @Override
-  public double getDouble(long offset) {
-    return Platform.getDouble(array, this.offset + offset);
-  }
-
-  @Override
-  public void putDouble(long offset, double value) {
-    Platform.putDouble(array, this.offset + offset, value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
index 36caf80..2733760 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 
+import org.apache.spark.unsafe.Platform;
+
 /**
  * A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array.
  */
@@ -56,7 +58,7 @@ public class HeapMemoryAllocator implements MemoryAllocator {
             final long[] array = arrayReference.get();
             if (array != null) {
               assert (array.length * 8L >= size);
-              MemoryBlock memory = OnHeapMemoryBlock.fromArray(array, size);
+              MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
               if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
                 memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
               }
@@ -68,7 +70,7 @@ public class HeapMemoryAllocator implements MemoryAllocator {
       }
     }
     long[] array = new long[numWords];
-    MemoryBlock memory = OnHeapMemoryBlock.fromArray(array, size);
+    MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
     if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
       memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
     }
@@ -77,13 +79,12 @@ public class HeapMemoryAllocator implements MemoryAllocator {
 
   @Override
   public void free(MemoryBlock memory) {
-    assert(memory instanceof OnHeapMemoryBlock);
-    assert (memory.getBaseObject() != null) :
+    assert (memory.obj != null) :
       "baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?";
-    assert (memory.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+    assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
       "page has already been freed";
-    assert ((memory.getPageNumber() == MemoryBlock.NO_PAGE_NUMBER)
-            || (memory.getPageNumber() == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+    assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
+            || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
       "TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator " +
         "free()";
 
@@ -93,12 +94,12 @@ public class HeapMemoryAllocator implements MemoryAllocator {
     }
 
     // Mark the page as freed (so we can detect double-frees).
-    memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER);
+    memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
 
     // As an additional layer of defense against use-after-free bugs, we mutate the
     // MemoryBlock to null out its reference to the long[] array.
-    long[] array = ((OnHeapMemoryBlock)memory).getLongArray();
-    memory.resetObjAndOffset();
+    long[] array = (long[]) memory.obj;
+    memory.setObjAndOffset(null, 0);
 
     long alignedSize = ((size + 7) / 8) * 8;
     if (shouldPool(alignedSize)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
index 38315fb..7b58868 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
@@ -38,7 +38,7 @@ public interface MemoryAllocator {
 
   void free(MemoryBlock memory);
 
-  UnsafeMemoryAllocator UNSAFE = new UnsafeMemoryAllocator();
+  MemoryAllocator UNSAFE = new UnsafeMemoryAllocator();
 
-  HeapMemoryAllocator HEAP = new HeapMemoryAllocator();
+  MemoryAllocator HEAP = new HeapMemoryAllocator();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
index ca7213b..c333857 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java
@@ -22,10 +22,10 @@ import javax.annotation.Nullable;
 import org.apache.spark.unsafe.Platform;
 
 /**
- * A representation of a consecutive memory block in Spark. It defines the common interfaces
- * for memory accessing and mutating.
+ * A consecutive block of memory, starting at a {@link MemoryLocation} with a fixed size.
  */
-public abstract class MemoryBlock {
+public class MemoryBlock extends MemoryLocation {
+
   /** Special `pageNumber` value for pages which were not allocated by TaskMemoryManagers */
   public static final int NO_PAGE_NUMBER = -1;
 
@@ -45,163 +45,38 @@ public abstract class MemoryBlock {
    */
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  @Nullable
-  protected Object obj;
-
-  protected long offset;
-
-  protected long length;
+  private final long length;
 
   /**
    * Optional page number; used when this MemoryBlock represents a page allocated by a
-   * TaskMemoryManager. This field can be updated using setPageNumber method so that
-   * this can be modified by the TaskMemoryManager, which lives in a different package.
+   * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager,
+   * which lives in a different package.
    */
-  private int pageNumber = NO_PAGE_NUMBER;
+  public int pageNumber = NO_PAGE_NUMBER;
 
-  protected MemoryBlock(@Nullable Object obj, long offset, long length) {
-    if (offset < 0 || length < 0) {
-      throw new IllegalArgumentException(
-        "Length " + length + " and offset " + offset + "must be non-negative");
-    }
-    this.obj = obj;
-    this.offset = offset;
+  public MemoryBlock(@Nullable Object obj, long offset, long length) {
+    super(obj, offset);
     this.length = length;
   }
 
-  protected MemoryBlock() {
-    this(null, 0, 0);
-  }
-
-  public final Object getBaseObject() {
-    return obj;
-  }
-
-  public final long getBaseOffset() {
-    return offset;
-  }
-
-  public void resetObjAndOffset() {
-    this.obj = null;
-    this.offset = 0;
-  }
-
   /**
    * Returns the size of the memory block.
    */
-  public final long size() {
+  public long size() {
     return length;
   }
 
-  public final void setPageNumber(int pageNum) {
-    pageNumber = pageNum;
-  }
-
-  public final int getPageNumber() {
-    return pageNumber;
-  }
-
-  /**
-   * Fills the memory block with the specified byte value.
-   */
-  public final void fill(byte value) {
-    Platform.setMemory(obj, offset, length, value);
-  }
-
-  /**
-   * Instantiate MemoryBlock for given object type with new offset
-   */
-  public static final MemoryBlock allocateFromObject(Object obj, long offset, long length) {
-    MemoryBlock mb = null;
-    if (obj instanceof byte[]) {
-      byte[] array = (byte[])obj;
-      mb = new ByteArrayMemoryBlock(array, offset, length);
-    } else if (obj instanceof long[]) {
-      long[] array = (long[])obj;
-      mb = new OnHeapMemoryBlock(array, offset, length);
-    } else if (obj == null) {
-      // we assume that to pass null pointer means off-heap
-      mb = new OffHeapMemoryBlock(offset, length);
-    } else {
-      throw new UnsupportedOperationException(
-        "Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now");
-    }
-    return mb;
-  }
-
   /**
-   * Just instantiate the sub-block with the same type of MemoryBlock with the new size and relative
-   * offset from the original offset. The data is not copied.
-   * If parameters are invalid, an exception is thrown.
+   * Creates a memory block pointing to the memory used by the long array.
    */
-  public abstract MemoryBlock subBlock(long offset, long size);
-
-  protected void checkSubBlockRange(long offset, long size) {
-    if (offset < 0 || size < 0) {
-      throw new ArrayIndexOutOfBoundsException(
-        "Size " + size + " and offset " + offset + " must be non-negative");
-    }
-    if (offset + size > length) {
-      throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " +
-        offset + " should not be larger than the length " + length + " in the MemoryBlock");
-    }
+  public static MemoryBlock fromLongArray(final long[] array) {
+    return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
   }
 
   /**
-   * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g  cause illegal
-   * memory access, throw an exception, or etc.
-   * getXXX/putXXX uses an index based on this.offset that includes the size of metadata such as
-   * JVM object header. The offset is 0-based and is expected as an logical offset in the memory
-   * block.
+   * Fills the memory block with the specified byte value.
    */
-  public abstract int getInt(long offset);
-
-  public abstract void putInt(long offset, int value);
-
-  public abstract boolean getBoolean(long offset);
-
-  public abstract void putBoolean(long offset, boolean value);
-
-  public abstract byte getByte(long offset);
-
-  public abstract void putByte(long offset, byte value);
-
-  public abstract short getShort(long offset);
-
-  public abstract void putShort(long offset, short value);
-
-  public abstract long getLong(long offset);
-
-  public abstract void putLong(long offset, long value);
-
-  public abstract float getFloat(long offset);
-
-  public abstract void putFloat(long offset, float value);
-
-  public abstract double getDouble(long offset);
-
-  public abstract void putDouble(long offset, double value);
-
-  public static final void copyMemory(
-      MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset, long length) {
-    assert(srcOffset + length <= src.length && dstOffset + length <= dst.length);
-    Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() + srcOffset,
-      dst.getBaseObject(), dst.getBaseOffset() + dstOffset, length);
-  }
-
-  public static final void copyMemory(MemoryBlock src, MemoryBlock dst, long length) {
-    assert(length <= src.length && length <= dst.length);
-    Platform.copyMemory(src.getBaseObject(), src.getBaseOffset(),
-      dst.getBaseObject(), dst.getBaseOffset(), length);
-  }
-
-  public final void copyFrom(Object src, long srcOffset, long dstOffset, long length) {
-    assert(length <= this.length - srcOffset);
-    Platform.copyMemory(src, srcOffset, obj, offset + dstOffset, length);
-  }
-
-  public final void writeTo(long srcOffset, Object dst, long dstOffset, long length) {
-    assert(length <= this.length - srcOffset);
-    Platform.copyMemory(obj, offset + srcOffset, dst, dstOffset, length);
+  public void fill(byte value) {
+    Platform.setMemory(obj, offset, length, value);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java
new file mode 100644
index 0000000..74ebc87
--- /dev/null
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryLocation.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import javax.annotation.Nullable;
+
+/**
+ * A memory location. Tracked either by a memory address (with off-heap allocation),
+ * or by an offset from a JVM object (in-heap allocation).
+ */
+public class MemoryLocation {
+
+  @Nullable
+  Object obj;
+
+  long offset;
+
+  public MemoryLocation(@Nullable Object obj, long offset) {
+    this.obj = obj;
+    this.offset = offset;
+  }
+
+  public MemoryLocation() {
+    this(null, 0);
+  }
+
+  public void setObjAndOffset(Object newObj, long newOffset) {
+    this.obj = newObj;
+    this.offset = newOffset;
+  }
+
+  public final Object getBaseObject() {
+    return obj;
+  }
+
+  public final long getBaseOffset() {
+    return offset;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java
deleted file mode 100644
index 3431b08..0000000
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.memory;
-
-import org.apache.spark.unsafe.Platform;
-
-public class OffHeapMemoryBlock extends MemoryBlock {
-  public static final OffHeapMemoryBlock NULL = new OffHeapMemoryBlock(0, 0);
-
-  public OffHeapMemoryBlock(long address, long size) {
-    super(null, address, size);
-  }
-
-  @Override
-  public MemoryBlock subBlock(long offset, long size) {
-    checkSubBlockRange(offset, size);
-    if (offset == 0 && size == this.size()) return this;
-    return new OffHeapMemoryBlock(this.offset + offset, size);
-  }
-
-  @Override
-  public final int getInt(long offset) {
-    return Platform.getInt(null, this.offset + offset);
-  }
-
-  @Override
-  public final void putInt(long offset, int value) {
-    Platform.putInt(null, this.offset + offset, value);
-  }
-
-  @Override
-  public final boolean getBoolean(long offset) {
-    return Platform.getBoolean(null, this.offset + offset);
-  }
-
-  @Override
-  public final void putBoolean(long offset, boolean value) {
-    Platform.putBoolean(null, this.offset + offset, value);
-  }
-
-  @Override
-  public final byte getByte(long offset) {
-    return Platform.getByte(null, this.offset + offset);
-  }
-
-  @Override
-  public final void putByte(long offset, byte value) {
-    Platform.putByte(null, this.offset + offset, value);
-  }
-
-  @Override
-  public final short getShort(long offset) {
-    return Platform.getShort(null, this.offset + offset);
-  }
-
-  @Override
-  public final void putShort(long offset, short value) {
-    Platform.putShort(null, this.offset + offset, value);
-  }
-
-  @Override
-  public final long getLong(long offset) {
-    return Platform.getLong(null, this.offset + offset);
-  }
-
-  @Override
-  public final void putLong(long offset, long value) {
-    Platform.putLong(null, this.offset + offset, value);
-  }
-
-  @Override
-  public final float getFloat(long offset) {
-    return Platform.getFloat(null, this.offset + offset);
-  }
-
-  @Override
-  public final void putFloat(long offset, float value) {
-    Platform.putFloat(null, this.offset + offset, value);
-  }
-
-  @Override
-  public final double getDouble(long offset) {
-    return Platform.getDouble(null, this.offset + offset);
-  }
-
-  @Override
-  public final void putDouble(long offset, double value) {
-    Platform.putDouble(null, this.offset + offset, value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
deleted file mode 100644
index ee42bc2..0000000
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.memory;
-
-import com.google.common.primitives.Ints;
-
-import org.apache.spark.unsafe.Platform;
-
-/**
- * A consecutive block of memory with a long array on Java heap.
- */
-public final class OnHeapMemoryBlock extends MemoryBlock {
-
-  private final long[] array;
-
-  public OnHeapMemoryBlock(long[] obj, long offset, long size) {
-    super(obj, offset, size);
-    this.array = obj;
-    assert(offset + size <= obj.length * 8L + Platform.LONG_ARRAY_OFFSET) :
-      "The sum of size " + size + " and offset " + offset + " should not be larger than " +
-        "the size of the given memory space " + (obj.length * 8L + Platform.LONG_ARRAY_OFFSET);
-  }
-
-  public OnHeapMemoryBlock(long size) {
-    this(new long[Ints.checkedCast((size + 7) / 8)], Platform.LONG_ARRAY_OFFSET, size);
-  }
-
-  @Override
-  public MemoryBlock subBlock(long offset, long size) {
-    checkSubBlockRange(offset, size);
-    if (offset == 0 && size == this.size()) return this;
-    return new OnHeapMemoryBlock(array, this.offset + offset, size);
-  }
-
-  public long[] getLongArray() { return array; }
-
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static OnHeapMemoryBlock fromArray(final long[] array) {
-    return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L);
-  }
-
-  public static OnHeapMemoryBlock fromArray(final long[] array, long size) {
-    return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
-  }
-
-  @Override
-  public int getInt(long offset) {
-    return Platform.getInt(array, this.offset + offset);
-  }
-
-  @Override
-  public void putInt(long offset, int value) {
-    Platform.putInt(array, this.offset + offset, value);
-  }
-
-  @Override
-  public boolean getBoolean(long offset) {
-    return Platform.getBoolean(array, this.offset + offset);
-  }
-
-  @Override
-  public void putBoolean(long offset, boolean value) {
-    Platform.putBoolean(array, this.offset + offset, value);
-  }
-
-  @Override
-  public byte getByte(long offset) {
-    return Platform.getByte(array, this.offset + offset);
-  }
-
-  @Override
-  public void putByte(long offset, byte value) {
-    Platform.putByte(array, this.offset + offset, value);
-  }
-
-  @Override
-  public short getShort(long offset) {
-    return Platform.getShort(array, this.offset + offset);
-  }
-
-  @Override
-  public void putShort(long offset, short value) {
-    Platform.putShort(array, this.offset + offset, value);
-  }
-
-  @Override
-  public long getLong(long offset) {
-    return Platform.getLong(array, this.offset + offset);
-  }
-
-  @Override
-  public void putLong(long offset, long value) {
-    Platform.putLong(array, this.offset + offset, value);
-  }
-
-  @Override
-  public float getFloat(long offset) {
-    return Platform.getFloat(array, this.offset + offset);
-  }
-
-  @Override
-  public void putFloat(long offset, float value) {
-    Platform.putFloat(array, this.offset + offset, value);
-  }
-
-  @Override
-  public double getDouble(long offset) {
-    return Platform.getDouble(array, this.offset + offset);
-  }
-
-  @Override
-  public void putDouble(long offset, double value) {
-    Platform.putDouble(array, this.offset + offset, value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
index 5310bdf..4368fb6 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
@@ -25,9 +25,9 @@ import org.apache.spark.unsafe.Platform;
 public class UnsafeMemoryAllocator implements MemoryAllocator {
 
   @Override
-  public OffHeapMemoryBlock allocate(long size) throws OutOfMemoryError {
+  public MemoryBlock allocate(long size) throws OutOfMemoryError {
     long address = Platform.allocateMemory(size);
-    OffHeapMemoryBlock memory = new OffHeapMemoryBlock(address, size);
+    MemoryBlock memory = new MemoryBlock(null, address, size);
     if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
       memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
     }
@@ -36,25 +36,22 @@ public class UnsafeMemoryAllocator implements MemoryAllocator {
 
   @Override
   public void free(MemoryBlock memory) {
-    assert(memory instanceof OffHeapMemoryBlock) :
-      "UnsafeMemoryAllocator can only free OffHeapMemoryBlock.";
-    if (memory == OffHeapMemoryBlock.NULL) return;
-    assert (memory.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+    assert (memory.obj == null) :
+      "baseObject not null; are you trying to use the off-heap allocator to free on-heap memory?";
+    assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
       "page has already been freed";
-    assert ((memory.getPageNumber() == MemoryBlock.NO_PAGE_NUMBER)
-            || (memory.getPageNumber() == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
+    assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
+            || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
       "TMM-allocated pages must be freed via TMM.freePage(), not directly in allocator free()";
 
     if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
       memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
     }
-
     Platform.freeMemory(memory.offset);
-
     // As an additional layer of defense against use-after-free bugs, we mutate the
     // MemoryBlock to reset its pointer.
-    memory.resetObjAndOffset();
+    memory.offset = 0;
     // Mark the page as freed (so we can detect double-frees).
-    memory.setPageNumber(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER);
+    memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index e91fc43..dff4a73 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -34,8 +34,6 @@ import com.google.common.primitives.Ints;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.ByteArrayMethods;
 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
-import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
-import org.apache.spark.unsafe.memory.MemoryBlock;
 
 import static org.apache.spark.unsafe.Platform.*;
 
@@ -53,13 +51,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
 
   // These are only updated by readExternal() or read()
   @Nonnull
-  private MemoryBlock base;
-  // While numBytes has the same value as base.size(), to keep as int avoids cast from long to int
+  private Object base;
+  private long offset;
   private int numBytes;
 
-  public MemoryBlock getMemoryBlock() { return base; }
-  public Object getBaseObject() { return base.getBaseObject(); }
-  public long getBaseOffset() { return base.getBaseOffset(); }
+  public Object getBaseObject() { return base; }
+  public long getBaseOffset() { return offset; }
 
   /**
    * A char in UTF-8 encoding can take 1-4 bytes depending on the first byte which
@@ -112,8 +109,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
    */
   public static UTF8String fromBytes(byte[] bytes) {
     if (bytes != null) {
-      return new UTF8String(
-        new ByteArrayMemoryBlock(bytes, BYTE_ARRAY_OFFSET, bytes.length));
+      return new UTF8String(bytes, BYTE_ARRAY_OFFSET, bytes.length);
     } else {
       return null;
     }
@@ -126,14 +122,20 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
    */
   public static UTF8String fromBytes(byte[] bytes, int offset, int numBytes) {
     if (bytes != null) {
-      return new UTF8String(
-        new ByteArrayMemoryBlock(bytes, BYTE_ARRAY_OFFSET + offset, numBytes));
+      return new UTF8String(bytes, BYTE_ARRAY_OFFSET + offset, numBytes);
     } else {
       return null;
     }
   }
 
   /**
+   * Creates an UTF8String from given address (base and offset) and length.
+   */
+  public static UTF8String fromAddress(Object base, long offset, int numBytes) {
+    return new UTF8String(base, offset, numBytes);
+  }
+
+  /**
    * Creates an UTF8String from String.
    */
   public static UTF8String fromString(String str) {
@@ -149,13 +151,16 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     return fromBytes(spaces);
   }
 
-  public UTF8String(MemoryBlock base) {
+  protected UTF8String(Object base, long offset, int numBytes) {
     this.base = base;
-    this.numBytes = Ints.checkedCast(base.size());
+    this.offset = offset;
+    this.numBytes = numBytes;
   }
 
   // for serialization
-  public UTF8String() {}
+  public UTF8String() {
+    this(null, 0, 0);
+  }
 
   /**
    * Writes the content of this string into a memory address, identified by an object and an offset.
@@ -163,7 +168,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
    * bytes in this string.
    */
   public void writeToMemory(Object target, long targetOffset) {
-    base.writeTo(0, target, targetOffset, numBytes);
+    Platform.copyMemory(base, offset, target, targetOffset, numBytes);
   }
 
   public void writeTo(ByteBuffer buffer) {
@@ -183,9 +188,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
    */
   @Nonnull
   public ByteBuffer getByteBuffer() {
-    long offset = base.getBaseOffset();
-    if (base instanceof ByteArrayMemoryBlock && offset >= BYTE_ARRAY_OFFSET) {
-      final byte[] bytes = ((ByteArrayMemoryBlock) base).getByteArray();
+    if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) {
+      final byte[] bytes = (byte[]) base;
 
       // the offset includes an object header... this is only needed for unsafe copies
       final long arrayOffset = offset - BYTE_ARRAY_OFFSET;
@@ -252,12 +256,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     long mask = 0;
     if (IS_LITTLE_ENDIAN) {
       if (numBytes >= 8) {
-        p = base.getLong(0);
+        p = Platform.getLong(base, offset);
       } else if (numBytes > 4) {
-        p = base.getLong(0);
+        p = Platform.getLong(base, offset);
         mask = (1L << (8 - numBytes) * 8) - 1;
       } else if (numBytes > 0) {
-        p = (long) base.getInt(0);
+        p = (long) Platform.getInt(base, offset);
         mask = (1L << (8 - numBytes) * 8) - 1;
       } else {
         p = 0;
@@ -266,12 +270,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     } else {
       // byteOrder == ByteOrder.BIG_ENDIAN
       if (numBytes >= 8) {
-        p = base.getLong(0);
+        p = Platform.getLong(base, offset);
       } else if (numBytes > 4) {
-        p = base.getLong(0);
+        p = Platform.getLong(base, offset);
         mask = (1L << (8 - numBytes) * 8) - 1;
       } else if (numBytes > 0) {
-        p = ((long) base.getInt(0)) << 32;
+        p = ((long) Platform.getInt(base, offset)) << 32;
         mask = (1L << (8 - numBytes) * 8) - 1;
       } else {
         p = 0;
@@ -286,13 +290,12 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
    */
   public byte[] getBytes() {
     // avoid copy if `base` is `byte[]`
-    long offset = base.getBaseOffset();
-    if (offset == BYTE_ARRAY_OFFSET && base instanceof ByteArrayMemoryBlock
-      && (((ByteArrayMemoryBlock) base).getByteArray()).length == numBytes) {
-      return ((ByteArrayMemoryBlock) base).getByteArray();
+    if (offset == BYTE_ARRAY_OFFSET && base instanceof byte[]
+      && ((byte[]) base).length == numBytes) {
+      return (byte[]) base;
     } else {
       byte[] bytes = new byte[numBytes];
-      base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, numBytes);
+      copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, numBytes);
       return bytes;
     }
   }
@@ -322,7 +325,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
 
     if (i > j) {
       byte[] bytes = new byte[i - j];
-      base.writeTo(j, bytes, BYTE_ARRAY_OFFSET, i - j);
+      copyMemory(base, offset + j, bytes, BYTE_ARRAY_OFFSET, i - j);
       return fromBytes(bytes);
     } else {
       return EMPTY_UTF8;
@@ -363,14 +366,14 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
    * Returns the byte at position `i`.
    */
   private byte getByte(int i) {
-    return base.getByte(i);
+    return Platform.getByte(base, offset + i);
   }
 
   private boolean matchAt(final UTF8String s, int pos) {
     if (s.numBytes + pos > numBytes || pos < 0) {
       return false;
     }
-    return ByteArrayMethods.arrayEqualsBlock(base, pos, s.base, 0, s.numBytes);
+    return ByteArrayMethods.arrayEquals(base, offset + pos, s.base, s.offset, s.numBytes);
   }
 
   public boolean startsWith(final UTF8String prefix) {
@@ -497,7 +500,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     for (int i = 0; i < numBytes; i++) {
       if (getByte(i) == (byte) ',') {
         if (i - (lastComma + 1) == match.numBytes &&
-          ByteArrayMethods.arrayEqualsBlock(base, lastComma + 1, match.base, 0, match.numBytes)) {
+          ByteArrayMethods.arrayEquals(base, offset + (lastComma + 1), match.base, match.offset,
+            match.numBytes)) {
           return n;
         }
         lastComma = i;
@@ -505,7 +509,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
       }
     }
     if (numBytes - (lastComma + 1) == match.numBytes &&
-      ByteArrayMethods.arrayEqualsBlock(base, lastComma + 1, match.base, 0, match.numBytes)) {
+      ByteArrayMethods.arrayEquals(base, offset + (lastComma + 1), match.base, match.offset,
+        match.numBytes)) {
       return n;
     }
     return 0;
@@ -520,7 +525,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
   private UTF8String copyUTF8String(int start, int end) {
     int len = end - start + 1;
     byte[] newBytes = new byte[len];
-    base.writeTo(start, newBytes, BYTE_ARRAY_OFFSET, len);
+    copyMemory(base, offset + start, newBytes, BYTE_ARRAY_OFFSET, len);
     return UTF8String.fromBytes(newBytes);
   }
 
@@ -667,7 +672,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     int i = 0; // position in byte
     while (i < numBytes) {
       int len = numBytesForFirstByte(getByte(i));
-      base.writeTo(i, result, BYTE_ARRAY_OFFSET + result.length - i - len, len);
+      copyMemory(this.base, this.offset + i, result,
+        BYTE_ARRAY_OFFSET + result.length - i - len, len);
 
       i += len;
     }
@@ -681,7 +687,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     }
 
     byte[] newBytes = new byte[numBytes * times];
-    base.writeTo(0, newBytes, BYTE_ARRAY_OFFSET, numBytes);
+    copyMemory(this.base, this.offset, newBytes, BYTE_ARRAY_OFFSET, numBytes);
 
     int copied = 1;
     while (copied < times) {
@@ -718,7 +724,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
       if (i + v.numBytes > numBytes) {
         return -1;
       }
-      if (ByteArrayMethods.arrayEqualsBlock(base, i, v.base, 0, v.numBytes)) {
+      if (ByteArrayMethods.arrayEquals(base, offset + i, v.base, v.offset, v.numBytes)) {
         return c;
       }
       i += numBytesForFirstByte(getByte(i));
@@ -734,7 +740,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
   private int find(UTF8String str, int start) {
     assert (str.numBytes > 0);
     while (start <= numBytes - str.numBytes) {
-      if (ByteArrayMethods.arrayEqualsBlock(base, start, str.base, 0, str.numBytes)) {
+      if (ByteArrayMethods.arrayEquals(base, offset + start, str.base, str.offset, str.numBytes)) {
         return start;
       }
       start += 1;
@@ -748,7 +754,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
   private int rfind(UTF8String str, int start) {
     assert (str.numBytes > 0);
     while (start >= 0) {
-      if (ByteArrayMethods.arrayEqualsBlock(base, start, str.base, 0, str.numBytes)) {
+      if (ByteArrayMethods.arrayEquals(base, offset + start, str.base, str.offset, str.numBytes)) {
         return start;
       }
       start -= 1;
@@ -781,7 +787,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
         return EMPTY_UTF8;
       }
       byte[] bytes = new byte[idx];
-      base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, idx);
+      copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, idx);
       return fromBytes(bytes);
 
     } else {
@@ -801,7 +807,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
       }
       int size = numBytes - delim.numBytes - idx;
       byte[] bytes = new byte[size];
-      base.writeTo(idx + delim.numBytes, bytes, BYTE_ARRAY_OFFSET, size);
+      copyMemory(base, offset + idx + delim.numBytes, bytes, BYTE_ARRAY_OFFSET, size);
       return fromBytes(bytes);
     }
   }
@@ -824,15 +830,15 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
       UTF8String remain = pad.substring(0, spaces - padChars * count);
 
       byte[] data = new byte[this.numBytes + pad.numBytes * count + remain.numBytes];
-      base.writeTo(0, data, BYTE_ARRAY_OFFSET, this.numBytes);
+      copyMemory(this.base, this.offset, data, BYTE_ARRAY_OFFSET, this.numBytes);
       int offset = this.numBytes;
       int idx = 0;
       while (idx < count) {
-        pad.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
+        copyMemory(pad.base, pad.offset, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
         ++ idx;
         offset += pad.numBytes;
       }
-      remain.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
+      copyMemory(remain.base, remain.offset, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
 
       return UTF8String.fromBytes(data);
     }
@@ -860,13 +866,13 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
       int offset = 0;
       int idx = 0;
       while (idx < count) {
-        pad.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
+        copyMemory(pad.base, pad.offset, data, BYTE_ARRAY_OFFSET + offset, pad.numBytes);
         ++ idx;
         offset += pad.numBytes;
       }
-      remain.base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
+      copyMemory(remain.base, remain.offset, data, BYTE_ARRAY_OFFSET + offset, remain.numBytes);
       offset += remain.numBytes;
-      base.writeTo(0, data, BYTE_ARRAY_OFFSET + offset, numBytes());
+      copyMemory(this.base, this.offset, data, BYTE_ARRAY_OFFSET + offset, numBytes());
 
       return UTF8String.fromBytes(data);
     }
@@ -891,8 +897,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     int offset = 0;
     for (int i = 0; i < inputs.length; i++) {
       int len = inputs[i].numBytes;
-      inputs[i].base.writeTo(
-        0,
+      copyMemory(
+        inputs[i].base, inputs[i].offset,
         result, BYTE_ARRAY_OFFSET + offset,
         len);
       offset += len;
@@ -931,8 +937,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     for (int i = 0, j = 0; i < inputs.length; i++) {
       if (inputs[i] != null) {
         int len = inputs[i].numBytes;
-        inputs[i].base.writeTo(
-          0,
+        copyMemory(
+          inputs[i].base, inputs[i].offset,
           result, BYTE_ARRAY_OFFSET + offset,
           len);
         offset += len;
@@ -940,8 +946,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
         j++;
         // Add separator if this is not the last input.
         if (j < numInputs) {
-          separator.base.writeTo(
-            0,
+          copyMemory(
+            separator.base, separator.offset,
             result, BYTE_ARRAY_OFFSET + offset,
             separator.numBytes);
           offset += separator.numBytes;
@@ -1215,7 +1221,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
 
   public UTF8String copy() {
     byte[] bytes = new byte[numBytes];
-    base.writeTo(0, bytes, BYTE_ARRAY_OFFSET, numBytes);
+    copyMemory(base, offset, bytes, BYTE_ARRAY_OFFSET, numBytes);
     return fromBytes(bytes);
   }
 
@@ -1223,10 +1229,11 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
   public int compareTo(@Nonnull final UTF8String other) {
     int len = Math.min(numBytes, other.numBytes);
     int wordMax = (len / 8) * 8;
-    MemoryBlock rbase = other.base;
+    long roffset = other.offset;
+    Object rbase = other.base;
     for (int i = 0; i < wordMax; i += 8) {
-      long left = base.getLong(i);
-      long right = rbase.getLong(i);
+      long left = getLong(base, offset + i);
+      long right = getLong(rbase, roffset + i);
       if (left != right) {
         if (IS_LITTLE_ENDIAN) {
           return Long.compareUnsigned(Long.reverseBytes(left), Long.reverseBytes(right));
@@ -1237,7 +1244,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
     }
     for (int i = wordMax; i < len; i++) {
       // In UTF-8, the byte should be unsigned, so we should compare them as unsigned int.
-      int res = (getByte(i) & 0xFF) - (rbase.getByte(i) & 0xFF);
+      int res = (getByte(i) & 0xFF) - (Platform.getByte(rbase, roffset + i) & 0xFF);
       if (res != 0) {
         return res;
       }
@@ -1256,7 +1263,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
       if (numBytes != o.numBytes) {
         return false;
       }
-      return ByteArrayMethods.arrayEqualsBlock(base, 0, o.base, 0, numBytes);
+      return ByteArrayMethods.arrayEquals(base, offset, o.base, o.offset, numBytes);
     } else {
       return false;
     }
@@ -1312,8 +1319,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
               num_bytes_j != numBytesForFirstByte(s.getByte(i_bytes))) {
           cost = 1;
         } else {
-          cost = (ByteArrayMethods.arrayEqualsBlock(t.base, j_bytes, s.base,
-            i_bytes, num_bytes_j)) ? 0 : 1;
+          cost = (ByteArrayMethods.arrayEquals(t.base, t.offset + j_bytes, s.base,
+              s.offset + i_bytes, num_bytes_j)) ? 0 : 1;
         }
         d[i + 1] = Math.min(Math.min(d[i] + 1, p[i + 1] + 1), p[i] + cost);
       }
@@ -1328,7 +1335,7 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
 
   @Override
   public int hashCode() {
-    return Murmur3_x86_32.hashUnsafeBytesBlock(base,42);
+    return Murmur3_x86_32.hashUnsafeBytes(base, offset, numBytes, 42);
   }
 
   /**
@@ -1391,10 +1398,10 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
   }
 
   public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    offset = BYTE_ARRAY_OFFSET;
     numBytes = in.readInt();
-    byte[] bytes = new byte[numBytes];
-    in.readFully(bytes);
-    base = ByteArrayMemoryBlock.fromArray(bytes);
+    base = new byte[numBytes];
+    in.readFully((byte[]) base);
   }
 
   @Override
@@ -1406,10 +1413,10 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
 
   @Override
   public void read(Kryo kryo, Input in) {
-    numBytes = in.readInt();
-    byte[] bytes = new byte[numBytes];
-    in.read(bytes);
-    base = ByteArrayMemoryBlock.fromArray(bytes);
+    this.offset = BYTE_ARRAY_OFFSET;
+    this.numBytes = in.readInt();
+    this.base = new byte[numBytes];
+    in.read((byte[]) base);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
index 583a148..3ad9ac7 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
@@ -81,7 +81,7 @@ public class PlatformUtilSuite {
     MemoryAllocator.HEAP.free(block);
     Assert.assertNull(block.getBaseObject());
     Assert.assertEquals(0, block.getBaseOffset());
-    Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.getPageNumber());
+    Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.pageNumber);
   }
 
   @Test
@@ -92,7 +92,7 @@ public class PlatformUtilSuite {
     MemoryAllocator.UNSAFE.free(block);
     Assert.assertNull(block.getBaseObject());
     Assert.assertEquals(0, block.getBaseOffset());
-    Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.getPageNumber());
+    Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, block.pageNumber);
   }
 
   @Test(expected = AssertionError.class)

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
index 8c2e98c..fb8e53b 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/array/LongArraySuite.java
@@ -20,13 +20,14 @@ package org.apache.spark.unsafe.array;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.spark.unsafe.memory.OnHeapMemoryBlock;
+import org.apache.spark.unsafe.memory.MemoryBlock;
 
 public class LongArraySuite {
 
   @Test
   public void basicTest() {
-    LongArray arr = new LongArray(new OnHeapMemoryBlock(16));
+    long[] bytes = new long[2];
+    LongArray arr = new LongArray(MemoryBlock.fromLongArray(bytes));
     arr.set(0, 1L);
     arr.set(1, 2L);
     arr.set(1, 3L);

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
index d989877..6348a73 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java
@@ -71,24 +71,6 @@ public class Murmur3_x86_32Suite {
   }
 
   @Test
-  public void testKnownWordsInputs() {
-    byte[] bytes = new byte[16];
-    long offset = Platform.BYTE_ARRAY_OFFSET;
-    for (int i = 0; i < 16; i++) {
-      bytes[i] = 0;
-    }
-    Assert.assertEquals(-300363099, Murmur3_x86_32.hashUnsafeWords(bytes, offset, 16, 42));
-    for (int i = 0; i < 16; i++) {
-      bytes[i] = -1;
-    }
-    Assert.assertEquals(-1210324667, Murmur3_x86_32.hashUnsafeWords(bytes, offset, 16, 42));
-    for (int i = 0; i < 16; i++) {
-      bytes[i] = (byte)i;
-    }
-    Assert.assertEquals(-634919701, Murmur3_x86_32.hashUnsafeWords(bytes, offset, 16, 42));
-  }
-
-  @Test
   public void randomizedStressTest() {
     int size = 65536;
     Random rand = new Random();

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java
deleted file mode 100644
index ef5ff8e..0000000
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.unsafe.memory;
-
-import org.apache.spark.unsafe.Platform;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.nio.ByteOrder;
-
-import static org.hamcrest.core.StringContains.containsString;
-
-public class MemoryBlockSuite {
-  private static final boolean bigEndianPlatform =
-    ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
-
-  private void check(MemoryBlock memory, Object obj, long offset, int length) {
-    memory.setPageNumber(1);
-    memory.fill((byte)-1);
-    memory.putBoolean(0, true);
-    memory.putByte(1, (byte)127);
-    memory.putShort(2, (short)257);
-    memory.putInt(4, 0x20000002);
-    memory.putLong(8, 0x1234567089ABCDEFL);
-    memory.putFloat(16, 1.0F);
-    memory.putLong(20, 0x1234567089ABCDEFL);
-    memory.putDouble(28, 2.0);
-    MemoryBlock.copyMemory(memory, 0L, memory, 36, 4);
-    int[] a = new int[2];
-    a[0] = 0x12345678;
-    a[1] = 0x13579BDF;
-    memory.copyFrom(a, Platform.INT_ARRAY_OFFSET, 40, 8);
-    byte[] b = new byte[8];
-    memory.writeTo(40, b, Platform.BYTE_ARRAY_OFFSET, 8);
-
-    Assert.assertEquals(obj, memory.getBaseObject());
-    Assert.assertEquals(offset, memory.getBaseOffset());
-    Assert.assertEquals(length, memory.size());
-    Assert.assertEquals(1, memory.getPageNumber());
-    Assert.assertEquals(true, memory.getBoolean(0));
-    Assert.assertEquals((byte)127, memory.getByte(1 ));
-    Assert.assertEquals((short)257, memory.getShort(2));
-    Assert.assertEquals(0x20000002, memory.getInt(4));
-    Assert.assertEquals(0x1234567089ABCDEFL, memory.getLong(8));
-    Assert.assertEquals(1.0F, memory.getFloat(16), 0);
-    Assert.assertEquals(0x1234567089ABCDEFL, memory.getLong(20));
-    Assert.assertEquals(2.0, memory.getDouble(28), 0);
-    Assert.assertEquals(true, memory.getBoolean(36));
-    Assert.assertEquals((byte)127, memory.getByte(37 ));
-    Assert.assertEquals((short)257, memory.getShort(38));
-    Assert.assertEquals(a[0], memory.getInt(40));
-    Assert.assertEquals(a[1], memory.getInt(44));
-    if (bigEndianPlatform) {
-      Assert.assertEquals(a[0],
-        ((int)b[0] & 0xff) << 24 | ((int)b[1] & 0xff) << 16 |
-        ((int)b[2] & 0xff) << 8 | ((int)b[3] & 0xff));
-      Assert.assertEquals(a[1],
-        ((int)b[4] & 0xff) << 24 | ((int)b[5] & 0xff) << 16 |
-        ((int)b[6] & 0xff) << 8 | ((int)b[7] & 0xff));
-    } else {
-      Assert.assertEquals(a[0],
-        ((int)b[3] & 0xff) << 24 | ((int)b[2] & 0xff) << 16 |
-        ((int)b[1] & 0xff) << 8 | ((int)b[0] & 0xff));
-      Assert.assertEquals(a[1],
-        ((int)b[7] & 0xff) << 24 | ((int)b[6] & 0xff) << 16 |
-        ((int)b[5] & 0xff) << 8 | ((int)b[4] & 0xff));
-    }
-    for (int i = 48; i < memory.size(); i++) {
-      Assert.assertEquals((byte) -1, memory.getByte(i));
-    }
-
-    assert(memory.subBlock(0, memory.size()) == memory);
-
-    try {
-      memory.subBlock(-8, 8);
-      Assert.fail();
-    } catch (Exception expected) {
-      Assert.assertThat(expected.getMessage(), containsString("non-negative"));
-    }
-
-    try {
-      memory.subBlock(0, -8);
-      Assert.fail();
-    } catch (Exception expected) {
-      Assert.assertThat(expected.getMessage(), containsString("non-negative"));
-    }
-
-    try {
-      memory.subBlock(0, length + 8);
-      Assert.fail();
-    } catch (Exception expected) {
-      Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
-    }
-
-    try {
-      memory.subBlock(8, length - 4);
-      Assert.fail();
-    } catch (Exception expected) {
-      Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
-    }
-
-    try {
-      memory.subBlock(length + 8, 4);
-      Assert.fail();
-    } catch (Exception expected) {
-      Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
-    }
-
-    memory.setPageNumber(MemoryBlock.NO_PAGE_NUMBER);
-  }
-
-  @Test
-  public void testByteArrayMemoryBlock() {
-    byte[] obj = new byte[56];
-    long offset = Platform.BYTE_ARRAY_OFFSET;
-    int length = obj.length;
-
-    MemoryBlock memory = new ByteArrayMemoryBlock(obj, offset, length);
-    check(memory, obj, offset, length);
-
-    memory = ByteArrayMemoryBlock.fromArray(obj);
-    check(memory, obj, offset, length);
-
-    obj = new byte[112];
-    memory = new ByteArrayMemoryBlock(obj, offset, length);
-    check(memory, obj, offset, length);
-  }
-
-  @Test
-  public void testOnHeapMemoryBlock() {
-    long[] obj = new long[7];
-    long offset = Platform.LONG_ARRAY_OFFSET;
-    int length = obj.length * 8;
-
-    MemoryBlock memory = new OnHeapMemoryBlock(obj, offset, length);
-    check(memory, obj, offset, length);
-
-    memory = OnHeapMemoryBlock.fromArray(obj);
-    check(memory, obj, offset, length);
-
-    obj = new long[14];
-    memory = new OnHeapMemoryBlock(obj, offset, length);
-    check(memory, obj, offset, length);
-  }
-
-  @Test
-  public void testOffHeapArrayMemoryBlock() {
-    MemoryAllocator memoryAllocator = new UnsafeMemoryAllocator();
-    MemoryBlock memory = memoryAllocator.allocate(56);
-    Object obj = memory.getBaseObject();
-    long offset = memory.getBaseOffset();
-    int length = 56;
-
-    check(memory, obj, offset, length);
-    memoryAllocator.free(memory);
-
-    long address = Platform.allocateMemory(112);
-    memory = new OffHeapMemoryBlock(address, length);
-    obj = memory.getBaseObject();
-    offset = memory.getBaseOffset();
-    check(memory, obj, offset, length);
-    Platform.freeMemory(address);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
----------------------------------------------------------------------
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
index 42dda30..dae13f0 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
@@ -25,8 +25,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.*;
 
 import com.google.common.collect.ImmutableMap;
-import org.apache.spark.unsafe.memory.ByteArrayMemoryBlock;
-import org.apache.spark.unsafe.memory.OnHeapMemoryBlock;
+import org.apache.spark.unsafe.Platform;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -514,13 +513,28 @@ public class UTF8StringSuite {
   }
 
   @Test
+  public void writeToOutputStreamUnderflow() throws IOException {
+    // offset underflow is apparently supported?
+    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
+
+    for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
+      UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i)
+          .writeTo(outputStream);
+      final ByteBuffer buffer = ByteBuffer.wrap(outputStream.toByteArray(), i, test.length);
+      assertEquals("01234567", StandardCharsets.UTF_8.decode(buffer).toString());
+      outputStream.reset();
+    }
+  }
+
+  @Test
   public void writeToOutputStreamSlice() throws IOException {
     final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
     final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
 
     for (int i = 0; i < test.length; ++i) {
       for (int j = 0; j < test.length - i; ++j) {
-        new UTF8String(ByteArrayMemoryBlock.fromArray(test).subBlock(i, j))
+        UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET + i, j)
             .writeTo(outputStream);
 
         assertArrayEquals(Arrays.copyOfRange(test, i, i + j), outputStream.toByteArray());
@@ -551,7 +565,7 @@ public class UTF8StringSuite {
 
     for (final long offset : offsets) {
       try {
-        new UTF8String(ByteArrayMemoryBlock.fromArray(test).subBlock(offset, test.length))
+        fromAddress(test, BYTE_ARRAY_OFFSET + offset, test.length)
             .writeTo(outputStream);
 
         throw new IllegalStateException(Long.toString(offset));
@@ -578,25 +592,26 @@ public class UTF8StringSuite {
   }
 
   @Test
-  public void writeToOutputStreamLongArray() throws IOException {
+  public void writeToOutputStreamIntArray() throws IOException {
     // verify that writes work on objects that are not byte arrays
-    final ByteBuffer buffer = StandardCharsets.UTF_8.encode("3千大千世界");
+    final ByteBuffer buffer = StandardCharsets.UTF_8.encode("大千世界");
     buffer.position(0);
     buffer.order(ByteOrder.nativeOrder());
 
     final int length = buffer.limit();
-    assertEquals(16, length);
+    assertEquals(12, length);
 
-    final int longs = length / 8;
-    final long[] array = new long[longs];
+    final int ints = length / 4;
+    final int[] array = new int[ints];
 
-    for (int i = 0; i < longs; ++i) {
-      array[i] = buffer.getLong();
+    for (int i = 0; i < ints; ++i) {
+      array[i] = buffer.getInt();
     }
 
     final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-    new UTF8String(OnHeapMemoryBlock.fromArray(array)).writeTo(outputStream);
-    assertEquals("3千大千世界", outputStream.toString("UTF-8"));
+    fromAddress(array, Platform.INT_ARRAY_OFFSET, length)
+        .writeTo(outputStream);
+    assertEquals("大千世界", outputStream.toString("UTF-8"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 8651a63..d07faf1 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -311,7 +311,7 @@ public class TaskMemoryManager {
       // this could trigger spilling to free some pages.
       return allocatePage(size, consumer);
     }
-    page.setPageNumber(pageNumber);
+    page.pageNumber = pageNumber;
     pageTable[pageNumber] = page;
     if (logger.isTraceEnabled()) {
       logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
@@ -323,25 +323,25 @@ public class TaskMemoryManager {
    * Free a block of memory allocated via {@link TaskMemoryManager#allocatePage}.
    */
   public void freePage(MemoryBlock page, MemoryConsumer consumer) {
-    assert (page.getPageNumber() != MemoryBlock.NO_PAGE_NUMBER) :
+    assert (page.pageNumber != MemoryBlock.NO_PAGE_NUMBER) :
       "Called freePage() on memory that wasn't allocated with allocatePage()";
-    assert (page.getPageNumber() != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
+    assert (page.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
       "Called freePage() on a memory block that has already been freed";
-    assert (page.getPageNumber() != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) :
+    assert (page.pageNumber != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) :
             "Called freePage() on a memory block that has already been freed";
-    assert(allocatedPages.get(page.getPageNumber()));
-    pageTable[page.getPageNumber()] = null;
+    assert(allocatedPages.get(page.pageNumber));
+    pageTable[page.pageNumber] = null;
     synchronized (this) {
-      allocatedPages.clear(page.getPageNumber());
+      allocatedPages.clear(page.pageNumber);
     }
     if (logger.isTraceEnabled()) {
-      logger.trace("Freed page number {} ({} bytes)", page.getPageNumber(), page.size());
+      logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
     }
     long pageSize = page.size();
     // Clear the page number before passing the block to the MemoryAllocator's free().
     // Doing this allows the MemoryAllocator to detect when a TaskMemoryManager-managed
     // page has been inappropriately directly freed without calling TMM.freePage().
-    page.setPageNumber(MemoryBlock.FREED_IN_TMM_PAGE_NUMBER);
+    page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
     memoryManager.tungstenMemoryAllocator().free(page);
     releaseExecutionMemory(pageSize, consumer);
   }
@@ -363,7 +363,7 @@ public class TaskMemoryManager {
       // relative to the page's base offset; this relative offset will fit in 51 bits.
       offsetInPage -= page.getBaseOffset();
     }
-    return encodePageNumberAndOffset(page.getPageNumber(), offsetInPage);
+    return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
   }
 
   @VisibleForTesting
@@ -434,7 +434,7 @@ public class TaskMemoryManager {
       for (MemoryBlock page : pageTable) {
         if (page != null) {
           logger.debug("unreleased page: " + page + " in task " + taskAttemptId);
-          page.setPageNumber(MemoryBlock.FREED_IN_TMM_PAGE_NUMBER);
+          page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
           memoryManager.tungstenMemoryAllocator().free(page);
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index 4b48599..0d06912 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -20,6 +20,7 @@ package org.apache.spark.shuffle.sort;
 import java.util.Comparator;
 
 import org.apache.spark.memory.MemoryConsumer;
+import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.util.collection.Sorter;
@@ -112,7 +113,13 @@ final class ShuffleInMemorySorter {
 
   public void expandPointerArray(LongArray newArray) {
     assert(newArray.size() > array.size());
-    MemoryBlock.copyMemory(array.memoryBlock(), newArray.memoryBlock(), pos * 8L);
+    Platform.copyMemory(
+      array.getBaseObject(),
+      array.getBaseOffset(),
+      newArray.getBaseObject(),
+      newArray.getBaseOffset(),
+      pos * 8L
+    );
     consumer.freeArray(array);
     array = newArray;
     usableCapacity = getUsableCapacity();
@@ -181,7 +188,10 @@ final class ShuffleInMemorySorter {
         PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,
         PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
     } else {
-      MemoryBlock unused = array.memoryBlock().subBlock(pos * 8L, (array.size() - pos) * 8L);
+      MemoryBlock unused = new MemoryBlock(
+        array.getBaseObject(),
+        array.getBaseOffset() + pos * 8L,
+        (array.size() - pos) * 8L);
       LongArray buffer = new LongArray(unused);
       Sorter<PackedRecordPointer, LongArray> sorter =
         new Sorter<>(new ShuffleSortDataFormat(buffer));

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
index 254449e..717bdd7 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
@@ -17,8 +17,8 @@
 
 package org.apache.spark.shuffle.sort;
 
+import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.LongArray;
-import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.util.collection.SortDataFormat;
 
 final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, LongArray> {
@@ -60,8 +60,13 @@ final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, Lo
 
   @Override
   public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int length) {
-    MemoryBlock.copyMemory(src.memoryBlock(), srcPos * 8L,
-      dst.memoryBlock(),dstPos * 8L,length * 8L);
+    Platform.copyMemory(
+      src.getBaseObject(),
+      src.getBaseOffset() + srcPos * 8L,
+      dst.getBaseObject(),
+      dst.getBaseOffset() + dstPos * 8L,
+      length * 8L
+    );
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/0b9ccd55/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 399251b..5056652 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -544,7 +544,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
           // is accessing the current record. We free this page in that caller's next loadNext()
           // call.
           for (MemoryBlock page : allocatedPages) {
-            if (!loaded || page.getPageNumber() !=
+            if (!loaded || page.pageNumber !=
                     ((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) {
               released += page.size();
               freePage(page);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org