You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/29 10:07:39 UTC

[3/3] spark git commit: [SPARK-7076][SPARK-7077][SPARK-7080][SQL] Use managed memory for aggregations

[SPARK-7076][SPARK-7077][SPARK-7080][SQL] Use managed memory for aggregations

This patch adds managed-memory-based aggregation to Spark SQL / DataFrames. Instead of working with Java objects, this new aggregation path uses `sun.misc.Unsafe` to manipulate raw memory.  This reduces the memory footprint for aggregations, resulting in fewer spills, OutOfMemoryErrors, and garbage collection pauses.  As a result, this allows for higher memory utilization.  It can also result in better cache locality since objects will be stored closer together in memory.

This feature can be eanbled by setting `spark.sql.unsafe.enabled=true`.  For now, this feature is only supported when codegen is enabled and only supports aggregations for which the grouping columns are primitive numeric types or strings and aggregated values are numeric.

### Managing memory with sun.misc.Unsafe

This patch supports both on- and off-heap managed memory.

- In on-heap mode, memory addresses are identified by the combination of a base Object and an offset within that object.
- In off-heap mode, memory is addressed directly with 64-bit long addresses.

To support both modes, functions that manipulate memory accept both `baseObject` and `baseOffset` fields.  In off-heap mode, we simply pass `null` as `baseObject`.

We allocate memory in large chunks, so memory fragmentation and allocation speed are not significant bottlenecks.

By default, we use on-heap mode.  To enable off-heap mode, set `spark.unsafe.offHeap=true`.

To track allocated memory, this patch extends `SparkEnv` with an `ExecutorMemoryManager` and supplies each `TaskContext` with a `TaskMemoryManager`.  These classes work together to track allocations and detect memory leaks.

### Compact tuple format

This patch introduces `UnsafeRow`, a compact row layout.  In this format, each tuple has three parts: a null bit set, fixed length values, and variable-length values:

![image](https://cloud.githubusercontent.com/assets/50748/7328538/2fdb65ce-ea8b-11e4-9743-6c0f02bb7d1f.png)

- Rows are always 8-byte word aligned (so their sizes will always be a multiple of 8 bytes)
- The bit set is used for null tracking:
	- Position _i_ is set if and only if field _i_ is null
	- The bit set is aligned to an 8-byte word boundary.
- Every field appears as an 8-byte word in the fixed-length values part:
	- If a field is null, we zero out the values.
	- If a field is variable-length, the word stores a relative offset (w.r.t. the base of the tuple) that points to the beginning of the field's data in the variable-length part.
- Each variable-length data type can have its own encoding:
	- For strings, the first word stores the length of the string and is followed by UTF-8 encoded bytes.  If necessary, the end of the string is padded with empty bytes in order to ensure word-alignment.

For example, a tuple that consists 3 fields of type (int, string, string), with value (null, “data”, “bricks”) would look like this:

![image](https://cloud.githubusercontent.com/assets/50748/7328526/1e21959c-ea8b-11e4-9a28-a4350fe4a7b5.png)

This format allows us to compare tuples for equality by directly comparing their raw bytes.  This also enables fast hashing of tuples.

### Hash map for performing aggregations

This patch introduces `UnsafeFixedWidthAggregationMap`, a hash map for performing aggregations where the aggregation result columns are fixed-with.  This map's keys and values are `Row` objects. `UnsafeFixedWidthAggregationMap` is implemented on top of `BytesToBytesMap`, an append-only map which supports byte-array keys and values.

`BytesToBytesMap` stores pointers to key and value tuples.  For each record with a new key, we copy the key and create the aggregation value buffer for that key and put them in a buffer. The hash table then simply stores pointers to the key and value. For each record with an existing key, we simply run the aggregation function to update the values in place.

This map is implemented using open hashing with triangular sequence probing.  Each entry stores two words in a long array: the first word stores the address of the key and the second word stores the relative offset from the key tuple to the value tuple, as well as the key's 32-bit hashcode.  By storing the full hashcode, we reduce the number of equality checks that need to be performed to handle position collisions ()since the chance of hashcode collision is much lower than position collision).

`UnsafeFixedWidthAggregationMap` allows regular Spark SQL `Row` objects to be used when probing the map.  Internally, it encodes these rows into `UnsafeRow` format using `UnsafeRowConverter`.  This conversion has a small overhead that can be eliminated in the future once we use UnsafeRows in other operators.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5725)
<!-- Reviewable:end -->

Author: Josh Rosen <jo...@databricks.com>

Closes #5725 from JoshRosen/unsafe and squashes the following commits:

eeee512 [Josh Rosen] Add converters for Null, Boolean, Byte, and Short columns.
81f34f8 [Josh Rosen] Follow 'place children last' convention for GeneratedAggregate
1bc36cc [Josh Rosen] Refactor UnsafeRowConverter to avoid unnecessary boxing.
017b2dc [Josh Rosen] Remove BytesToBytesMap.finalize()
50e9671 [Josh Rosen] Throw memory leak warning even in case of error; add warning about code duplication
70a39e4 [Josh Rosen] Split MemoryManager into ExecutorMemoryManager and TaskMemoryManager:
6e4b192 [Josh Rosen] Remove an unused method from ByteArrayMethods.
de5e001 [Josh Rosen] Fix debug vs. trace in logging message.
a19e066 [Josh Rosen] Rename unsafe Java test suites to match Scala test naming convention.
78a5b84 [Josh Rosen] Add logging to MemoryManager
ce3c565 [Josh Rosen] More comments, formatting, and code cleanup.
529e571 [Josh Rosen] Measure timeSpentResizing in nanoseconds instead of milliseconds.
3ca84b2 [Josh Rosen] Only zero the used portion of groupingKeyConversionScratchSpace
162caf7 [Josh Rosen] Fix test compilation
b45f070 [Josh Rosen] Don't redundantly store the offset from key to value, since we can compute this from the key size.
a8e4a3f [Josh Rosen] Introduce MemoryManager interface; add to SparkEnv.
0925847 [Josh Rosen] Disable MiMa checks for new unsafe module
cde4132 [Josh Rosen] Add missing pom.xml
9c19fc0 [Josh Rosen] Add configuration options for heap vs. offheap
6ffdaa1 [Josh Rosen] Null handling improvements in UnsafeRow.
31eaabc [Josh Rosen] Lots of TODO and doc cleanup.
a95291e [Josh Rosen] Cleanups to string handling code
afe8dca [Josh Rosen] Some Javadoc cleanup
f3dcbfe [Josh Rosen] More mod replacement
854201a [Josh Rosen] Import and comment cleanup
06e929d [Josh Rosen] More warning cleanup
ef6b3d3 [Josh Rosen] Fix a bunch of FindBugs and IntelliJ inspections
29a7575 [Josh Rosen] Remove debug logging
49aed30 [Josh Rosen] More long -> int conversion.
b26f1d3 [Josh Rosen] Fix bug in murmur hash implementation.
765243d [Josh Rosen] Enable optional performance metrics for hash map.
23a440a [Josh Rosen] Bump up default hash map size
628f936 [Josh Rosen] Use ints intead of longs for indexing.
92d5a06 [Josh Rosen] Address a number of minor code review comments.
1f4b716 [Josh Rosen] Merge Unsafe code into the regular GeneratedAggregate, guarded by a configuration flag; integrate planner support and re-enable all tests.
d85eeff [Josh Rosen] Add basic sanity test for UnsafeFixedWidthAggregationMap
bade966 [Josh Rosen] Comment update (bumping to refresh GitHub cache...)
b3eaccd [Josh Rosen] Extract aggregation map into its own class.
d2bb986 [Josh Rosen] Update to implement new Row methods added upstream
58ac393 [Josh Rosen] Use UNSAFE allocator in GeneratedAggregate (TODO: make this configurable)
7df6008 [Josh Rosen] Optimizations related to zeroing out memory:
c1b3813 [Josh Rosen] Fix bug in UnsafeMemoryAllocator.free():
738fa33 [Josh Rosen] Add feature flag to guard UnsafeGeneratedAggregate
c55bf66 [Josh Rosen] Free buffer once iterator has been fully consumed.
62ab054 [Josh Rosen] Optimize for fact that get() is only called on String columns.
c7f0b56 [Josh Rosen] Reuse UnsafeRow pointer in UnsafeRowConverter
ae39694 [Josh Rosen] Add finalizer as "cleanup method of last resort"
c754ae1 [Josh Rosen] Now that the store*() contract has been stregthened, we can remove an extra lookup
f764d13 [Josh Rosen] Simplify address + length calculation in Location.
079f1bf [Josh Rosen] Some clarification of the BytesToBytesMap.lookup() / set() contract.
1a483c5 [Josh Rosen] First version that passes some aggregation tests:
fc4c3a8 [Josh Rosen] Sketch how the converters will be used in UnsafeGeneratedAggregate
53ba9b7 [Josh Rosen] Start prototyping Java Row -> UnsafeRow converters
1ff814d [Josh Rosen] Add reminder to free memory on iterator completion
8a8f9df [Josh Rosen] Add skeleton for GeneratedAggregate integration.
5d55cef [Josh Rosen] Add skeleton for Row implementation.
f03e9c1 [Josh Rosen] Play around with Unsafe implementations of more string methods.
ab68e08 [Josh Rosen] Begin merging the UTF8String implementations.
480a74a [Josh Rosen] Initial import of code from Databricks unsafe utils repo.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f49284b5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f49284b5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f49284b5

Branch: refs/heads/master
Commit: f49284b5bf3a69ed91a5e3e6e0ed3be93a6ab9e4
Parents: 1fd6ed9
Author: Josh Rosen <jo...@databricks.com>
Authored: Wed Apr 29 01:07:26 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Apr 29 01:07:26 2015 -0700

----------------------------------------------------------------------
 core/pom.xml                                    |   5 +
 .../main/scala/org/apache/spark/SparkEnv.scala  |  12 +
 .../scala/org/apache/spark/TaskContext.scala    |   6 +
 .../org/apache/spark/TaskContextImpl.scala      |   2 +
 .../org/apache/spark/executor/Executor.scala    |  19 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  22 +-
 .../scala/org/apache/spark/scheduler/Task.scala |  16 +-
 .../java/org/apache/spark/JavaAPISuite.java     |   2 +-
 .../org/apache/spark/CacheManagerSuite.scala    |   8 +-
 .../org/apache/spark/rdd/PipedRDDSuite.scala    |   2 +-
 .../spark/scheduler/TaskContextSuite.scala      |   2 +-
 .../ShuffleBlockFetcherIteratorSuite.scala      |   6 +-
 pom.xml                                         |   2 +
 project/SparkBuild.scala                        |   7 +-
 sql/catalyst/pom.xml                            |   5 +
 .../UnsafeFixedWidthAggregationMap.java         | 259 +++++++++
 .../sql/catalyst/expressions/UnsafeRow.java     | 435 +++++++++++++++
 .../expressions/UnsafeRowConverter.scala        | 223 ++++++++
 .../UnsafeFixedWidthAggregationMapSuite.scala   | 119 ++++
 .../expressions/UnsafeRowConverterSuite.scala   | 153 ++++++
 .../scala/org/apache/spark/sql/SQLConf.scala    |   9 +
 .../scala/org/apache/spark/sql/SQLContext.scala |   2 +
 .../sql/execution/GeneratedAggregate.scala      |  60 ++
 .../spark/sql/execution/SparkStrategies.scala   |   2 +
 unsafe/pom.xml                                  |  69 +++
 .../apache/spark/unsafe/PlatformDependent.java  |  87 +++
 .../spark/unsafe/array/ByteArrayMethods.java    |  56 ++
 .../apache/spark/unsafe/array/LongArray.java    |  78 +++
 .../org/apache/spark/unsafe/bitset/BitSet.java  | 105 ++++
 .../spark/unsafe/bitset/BitSetMethods.java      | 129 +++++
 .../spark/unsafe/hash/Murmur3_x86_32.java       |  96 ++++
 .../spark/unsafe/map/BytesToBytesMap.java       | 549 +++++++++++++++++++
 .../spark/unsafe/map/HashMapGrowthStrategy.java |  39 ++
 .../unsafe/memory/ExecutorMemoryManager.java    |  58 ++
 .../unsafe/memory/HeapMemoryAllocator.java      |  35 ++
 .../spark/unsafe/memory/MemoryAllocator.java    |  33 ++
 .../apache/spark/unsafe/memory/MemoryBlock.java |  63 +++
 .../spark/unsafe/memory/MemoryLocation.java     |  54 ++
 .../spark/unsafe/memory/TaskMemoryManager.java  | 237 ++++++++
 .../unsafe/memory/UnsafeMemoryAllocator.java    |  39 ++
 .../spark/unsafe/array/LongArraySuite.java      |  38 ++
 .../apache/spark/unsafe/bitset/BitSetSuite.java |  82 +++
 .../spark/unsafe/hash/Murmur3_x86_32Suite.java  | 119 ++++
 .../map/AbstractBytesToBytesMapSuite.java       | 250 +++++++++
 .../unsafe/map/BytesToBytesMapOffHeapSuite.java |  29 +
 .../unsafe/map/BytesToBytesMapOnHeapSuite.java  |  29 +
 .../unsafe/memory/TaskMemoryManagerSuite.java   |  41 ++
 47 files changed, 3675 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 459ef66..2dfb00d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -96,6 +96,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-unsafe_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>net.java.dev.jets3t</groupId>
       <artifactId>jets3t</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 959aefa..0c4d28f 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -40,6 +40,7 @@ import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinato
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
 import org.apache.spark.storage._
+import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator}
 import org.apache.spark.util.{RpcUtils, Utils}
 
 /**
@@ -69,6 +70,7 @@ class SparkEnv (
     val sparkFilesDir: String,
     val metricsSystem: MetricsSystem,
     val shuffleMemoryManager: ShuffleMemoryManager,
+    val executorMemoryManager: ExecutorMemoryManager,
     val outputCommitCoordinator: OutputCommitCoordinator,
     val conf: SparkConf) extends Logging {
 
@@ -382,6 +384,15 @@ object SparkEnv extends Logging {
       new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
     outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
 
+    val executorMemoryManager: ExecutorMemoryManager = {
+      val allocator = if (conf.getBoolean("spark.unsafe.offHeap", false)) {
+        MemoryAllocator.UNSAFE
+      } else {
+        MemoryAllocator.HEAP
+      }
+      new ExecutorMemoryManager(allocator)
+    }
+
     val envInstance = new SparkEnv(
       executorId,
       rpcEnv,
@@ -398,6 +409,7 @@ object SparkEnv extends Logging {
       sparkFilesDir,
       metricsSystem,
       shuffleMemoryManager,
+      executorMemoryManager,
       outputCommitCoordinator,
       conf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 7d7fe1a..d09e17d 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -21,6 +21,7 @@ import java.io.Serializable
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.unsafe.memory.TaskMemoryManager
 import org.apache.spark.util.TaskCompletionListener
 
 
@@ -133,4 +134,9 @@ abstract class TaskContext extends Serializable {
   /** ::DeveloperApi:: */
   @DeveloperApi
   def taskMetrics(): TaskMetrics
+
+  /**
+   * Returns the manager for this task's managed memory.
+   */
+  private[spark] def taskMemoryManager(): TaskMemoryManager
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 337c8e4..b4d572c 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -18,6 +18,7 @@
 package org.apache.spark
 
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.unsafe.memory.TaskMemoryManager
 import org.apache.spark.util.{TaskCompletionListener, TaskCompletionListenerException}
 
 import scala.collection.mutable.ArrayBuffer
@@ -27,6 +28,7 @@ private[spark] class TaskContextImpl(
     val partitionId: Int,
     override val taskAttemptId: Long,
     override val attemptNumber: Int,
+    override val taskMemoryManager: TaskMemoryManager,
     val runningLocally: Boolean = false,
     val taskMetrics: TaskMetrics = TaskMetrics.empty)
   extends TaskContext

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index f57e215..dd1c48e 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -32,6 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
+import org.apache.spark.unsafe.memory.TaskMemoryManager
 import org.apache.spark.util._
 
 /**
@@ -178,6 +179,7 @@ private[spark] class Executor(
     }
 
     override def run(): Unit = {
+      val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)
       val deserializeStartTime = System.currentTimeMillis()
       Thread.currentThread.setContextClassLoader(replClassLoader)
       val ser = env.closureSerializer.newInstance()
@@ -190,6 +192,7 @@ private[spark] class Executor(
         val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
         updateDependencies(taskFiles, taskJars)
         task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
+        task.setTaskMemoryManager(taskMemoryManager)
 
         // If this task has been killed before we deserialized it, let's quit now. Otherwise,
         // continue executing the task.
@@ -206,7 +209,21 @@ private[spark] class Executor(
 
         // Run the actual task and measure its runtime.
         taskStart = System.currentTimeMillis()
-        val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
+        val value = try {
+          task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
+        } finally {
+          // Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
+          // when changing this, make sure to update both copies.
+          val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
+          if (freedMemory > 0) {
+            val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
+            if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
+              throw new SparkException(errMsg)
+            } else {
+              logError(errMsg)
+            }
+          }
+        }
         val taskFinish = System.currentTimeMillis()
 
         // If the task has been killed, let's fail it.

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8c4bff4..b7901c0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -34,6 +34,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage._
+import org.apache.spark.unsafe.memory.TaskMemoryManager
 import org.apache.spark.util._
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
 
@@ -643,8 +644,15 @@ class DAGScheduler(
     try {
       val rdd = job.finalStage.rdd
       val split = rdd.partitions(job.partitions(0))
-      val taskContext = new TaskContextImpl(job.finalStage.id, job.partitions(0), taskAttemptId = 0,
-        attemptNumber = 0, runningLocally = true)
+      val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)
+      val taskContext =
+        new TaskContextImpl(
+          job.finalStage.id,
+          job.partitions(0),
+          taskAttemptId = 0,
+          attemptNumber = 0,
+          taskMemoryManager = taskMemoryManager,
+          runningLocally = true)
       TaskContext.setTaskContext(taskContext)
       try {
         val result = job.func(taskContext, rdd.iterator(split, taskContext))
@@ -652,6 +660,16 @@ class DAGScheduler(
       } finally {
         taskContext.markTaskCompleted()
         TaskContext.unset()
+        // Note: this memory freeing logic is duplicated in Executor.run(); when changing this,
+        // make sure to update both copies.
+        val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
+        if (freedMemory > 0) {
+          if (sc.getConf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
+            throw new SparkException(s"Managed memory leak detected; size = $freedMemory bytes")
+          } else {
+            logError(s"Managed memory leak detected; size = $freedMemory bytes")
+          }
+        }
       }
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index b09b19e..586d1e0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.HashMap
 import org.apache.spark.{TaskContextImpl, TaskContext}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.serializer.SerializerInstance
+import org.apache.spark.unsafe.memory.TaskMemoryManager
 import org.apache.spark.util.ByteBufferInputStream
 import org.apache.spark.util.Utils
 
@@ -52,8 +53,13 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
    * @return the result of the task
    */
   final def run(taskAttemptId: Long, attemptNumber: Int): T = {
-    context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
-      taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
+    context = new TaskContextImpl(
+      stageId = stageId,
+      partitionId = partitionId,
+      taskAttemptId = taskAttemptId,
+      attemptNumber = attemptNumber,
+      taskMemoryManager = taskMemoryManager,
+      runningLocally = false)
     TaskContext.setTaskContext(context)
     context.taskMetrics.setHostname(Utils.localHostName())
     taskThread = Thread.currentThread()
@@ -68,6 +74,12 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
     }
   }
 
+  private var taskMemoryManager: TaskMemoryManager = _
+
+  def setTaskMemoryManager(taskMemoryManager: TaskMemoryManager): Unit = {
+    this.taskMemoryManager = taskMemoryManager
+  }
+
   def runTask(context: TaskContext): T
 
   def preferredLocations: Seq[TaskLocation] = Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 8a4f2a0..34ac936 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1009,7 +1009,7 @@ public class JavaAPISuite implements Serializable {
   @Test
   public void iterator() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
-    TaskContext context = new TaskContextImpl(0, 0, 0L, 0, false, new TaskMetrics());
+    TaskContext context = new TaskContextImpl(0, 0, 0L, 0, null, false, new TaskMetrics());
     Assert.assertEquals(1, rdd.iterator(rdd.partitions().get(0), context).next().intValue());
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 70529d9..668ddf9 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -65,7 +65,7 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf
     // in blockManager.put is a losing battle. You have been warned.
     blockManager = sc.env.blockManager
     cacheManager = sc.env.cacheManager
-    val context = new TaskContextImpl(0, 0, 0, 0)
+    val context = new TaskContextImpl(0, 0, 0, 0, null)
     val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
     val getValue = blockManager.get(RDDBlockId(rdd.id, split.index))
     assert(computeValue.toList === List(1, 2, 3, 4))
@@ -77,7 +77,7 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf
     val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
     when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result))
 
-    val context = new TaskContextImpl(0, 0, 0, 0)
+    val context = new TaskContextImpl(0, 0, 0, 0, null)
     val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
     assert(value.toList === List(5, 6, 7))
   }
@@ -86,14 +86,14 @@ class CacheManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAf
     // Local computation should not persist the resulting value, so don't expect a put().
     when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None)
 
-    val context = new TaskContextImpl(0, 0, 0, 0, true)
+    val context = new TaskContextImpl(0, 0, 0, 0, null, true)
     val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
     assert(value.toList === List(1, 2, 3, 4))
   }
 
   test("verify task metrics updated correctly") {
     cacheManager = sc.env.cacheManager
-    val context = new TaskContextImpl(0, 0, 0, 0)
+    val context = new TaskContextImpl(0, 0, 0, 0, null)
     cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
     assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size === 2)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index aea76c1..85eb2a1 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -176,7 +176,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
       }
       val hadoopPart1 = generateFakeHadoopPartition()
       val pipedRdd = new PipedRDD(nums, "printenv " + varName)
-      val tContext = new TaskContextImpl(0, 0, 0, 0)
+      val tContext = new TaskContextImpl(0, 0, 0, 0, null)
       val rddIter = pipedRdd.compute(hadoopPart1, tContext)
       val arr = rddIter.toArray
       assert(arr(0) == "/some/path")

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 057e226..83ae870 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -51,7 +51,7 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte
   }
 
   test("all TaskCompletionListeners should be called even if some fail") {
-    val context = new TaskContextImpl(0, 0, 0, 0)
+    val context = new TaskContextImpl(0, 0, 0, 0, null)
     val listener = mock(classOf[TaskCompletionListener])
     context.addTaskCompletionListener(_ => throw new Exception("blah"))
     context.addTaskCompletionListener(listener)

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 37b593b..2080c43 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -89,7 +89,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
     )
 
     val iterator = new ShuffleBlockFetcherIterator(
-      new TaskContextImpl(0, 0, 0, 0),
+      new TaskContextImpl(0, 0, 0, 0, null),
       transfer,
       blockManager,
       blocksByAddress,
@@ -154,7 +154,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
       (remoteBmId, blocks.keys.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq))
 
-    val taskContext = new TaskContextImpl(0, 0, 0, 0)
+    val taskContext = new TaskContextImpl(0, 0, 0, 0, null)
     val iterator = new ShuffleBlockFetcherIterator(
       taskContext,
       transfer,
@@ -217,7 +217,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
       (remoteBmId, blocks.keys.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq))
 
-    val taskContext = new TaskContextImpl(0, 0, 0, 0)
+    val taskContext = new TaskContextImpl(0, 0, 0, 0, null)
     val iterator = new ShuffleBlockFetcherIterator(
       taskContext,
       transfer,

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 928f5d0..c85c5fe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,6 +97,7 @@
     <module>sql/catalyst</module>
     <module>sql/core</module>
     <module>sql/hive</module>
+    <module>unsafe</module>
     <module>assembly</module>
     <module>external/twitter</module>
     <module>external/flume</module>
@@ -1215,6 +1216,7 @@
               <spark.ui.enabled>false</spark.ui.enabled>
               <spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
               <spark.driver.allowMultipleContexts>true</spark.driver.allowMultipleContexts>
+              <spark.unsafe.exceptionOnMemoryLeak>true</spark.unsafe.exceptionOnMemoryLeak>
             </systemProperties>
             <failIfNoTests>false</failIfNoTests>
           </configuration>

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 09b4976..b7dbcd9 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -34,11 +34,11 @@ object BuildCommons {
 
   val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl,
     sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka,
-    streamingMqtt, streamingTwitter, streamingZeromq, launcher) =
+    streamingMqtt, streamingTwitter, streamingZeromq, launcher, unsafe) =
     Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl",
       "sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink",
       "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
-      "streaming-zeromq", "launcher").map(ProjectRef(buildLocation, _))
+      "streaming-zeromq", "launcher", "unsafe").map(ProjectRef(buildLocation, _))
 
   val optionallyEnabledProjects@Seq(yarn, yarnStable, java8Tests, sparkGangliaLgpl,
     sparkKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl",
@@ -159,7 +159,7 @@ object SparkBuild extends PomBuild {
   // TODO: Add Sql to mima checks
   // TODO: remove launcher from this list after 1.3.
   allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl,
-    networkCommon, networkShuffle, networkYarn, launcher).contains(x)).foreach {
+    networkCommon, networkShuffle, networkYarn, launcher, unsafe).contains(x)).foreach {
       x => enable(MimaBuild.mimaSettings(sparkHome, x))(x)
     }
 
@@ -496,6 +496,7 @@ object TestSettings {
     javaOptions in Test += "-Dspark.ui.enabled=false",
     javaOptions in Test += "-Dspark.ui.showConsoleProgress=false",
     javaOptions in Test += "-Dspark.driver.allowMultipleContexts=true",
+    javaOptions in Test += "-Dspark.unsafe.exceptionOnMemoryLeak=true",
     javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
     javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
       .map { case (k,v) => s"-D$k=$v" }.toSeq,

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/sql/catalyst/pom.xml
----------------------------------------------------------------------
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 3dea2ee..5c322d0 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -51,6 +51,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-unsafe_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.scalacheck</groupId>
       <artifactId>scalacheck_${scala.binary.version}</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
new file mode 100644
index 0000000..299ff37
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.PlatformDependent;
+import org.apache.spark.unsafe.map.BytesToBytesMap;
+import org.apache.spark.unsafe.memory.MemoryLocation;
+import org.apache.spark.unsafe.memory.TaskMemoryManager;
+
+/**
+ * Unsafe-based HashMap for performing aggregations where the aggregated values are fixed-width.
+ *
+ * This map supports a maximum of 2 billion keys.
+ */
+public final class UnsafeFixedWidthAggregationMap {
+
+  /**
+   * An empty aggregation buffer, encoded in UnsafeRow format. When inserting a new key into the
+   * map, we copy this buffer and use it as the value.
+   */
+  private final long[] emptyAggregationBuffer;
+
+  private final StructType aggregationBufferSchema;
+
+  private final StructType groupingKeySchema;
+
+  /**
+   * Encodes grouping keys as UnsafeRows.
+   */
+  private final UnsafeRowConverter groupingKeyToUnsafeRowConverter;
+
+  /**
+   * A hashmap which maps from opaque bytearray keys to bytearray values.
+   */
+  private final BytesToBytesMap map;
+
+  /**
+   * Re-used pointer to the current aggregation buffer
+   */
+  private final UnsafeRow currentAggregationBuffer = new UnsafeRow();
+
+  /**
+   * Scratch space that is used when encoding grouping keys into UnsafeRow format.
+   *
+   * By default, this is a 1MB array, but it will grow as necessary in case larger keys are
+   * encountered.
+   */
+  private long[] groupingKeyConversionScratchSpace = new long[1024 / 8];
+
+  private final boolean enablePerfMetrics;
+
+  /**
+   * @return true if UnsafeFixedWidthAggregationMap supports grouping keys with the given schema,
+   *         false otherwise.
+   */
+  public static boolean supportsGroupKeySchema(StructType schema) {
+    for (StructField field: schema.fields()) {
+      if (!UnsafeRow.readableFieldTypes.contains(field.dataType())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * @return true if UnsafeFixedWidthAggregationMap supports aggregation buffers with the given
+   *         schema, false otherwise.
+   */
+  public static boolean supportsAggregationBufferSchema(StructType schema) {
+    for (StructField field: schema.fields()) {
+      if (!UnsafeRow.settableFieldTypes.contains(field.dataType())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Create a new UnsafeFixedWidthAggregationMap.
+   *
+   * @param emptyAggregationBuffer the default value for new keys (a "zero" of the agg. function)
+   * @param aggregationBufferSchema the schema of the aggregation buffer, used for row conversion.
+   * @param groupingKeySchema the schema of the grouping key, used for row conversion.
+   * @param memoryManager the memory manager used to allocate our Unsafe memory structures.
+   * @param initialCapacity the initial capacity of the map (a sizing hint to avoid re-hashing).
+   * @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact)
+   */
+  public UnsafeFixedWidthAggregationMap(
+      Row emptyAggregationBuffer,
+      StructType aggregationBufferSchema,
+      StructType groupingKeySchema,
+      TaskMemoryManager memoryManager,
+      int initialCapacity,
+      boolean enablePerfMetrics) {
+    this.emptyAggregationBuffer =
+      convertToUnsafeRow(emptyAggregationBuffer, aggregationBufferSchema);
+    this.aggregationBufferSchema = aggregationBufferSchema;
+    this.groupingKeyToUnsafeRowConverter = new UnsafeRowConverter(groupingKeySchema);
+    this.groupingKeySchema = groupingKeySchema;
+    this.map = new BytesToBytesMap(memoryManager, initialCapacity, enablePerfMetrics);
+    this.enablePerfMetrics = enablePerfMetrics;
+  }
+
+  /**
+   * Convert a Java object row into an UnsafeRow, allocating it into a new long array.
+   */
+  private static long[] convertToUnsafeRow(Row javaRow, StructType schema) {
+    final UnsafeRowConverter converter = new UnsafeRowConverter(schema);
+    final long[] unsafeRow = new long[converter.getSizeRequirement(javaRow)];
+    final long writtenLength =
+      converter.writeRow(javaRow, unsafeRow, PlatformDependent.LONG_ARRAY_OFFSET);
+    assert (writtenLength == unsafeRow.length): "Size requirement calculation was wrong!";
+    return unsafeRow;
+  }
+
+  /**
+   * Return the aggregation buffer for the current group. For efficiency, all calls to this method
+   * return the same object.
+   */
+  public UnsafeRow getAggregationBuffer(Row groupingKey) {
+    final int groupingKeySize = groupingKeyToUnsafeRowConverter.getSizeRequirement(groupingKey);
+    // Make sure that the buffer is large enough to hold the key. If it's not, grow it:
+    if (groupingKeySize > groupingKeyConversionScratchSpace.length) {
+      // This new array will be initially zero, so there's no need to zero it out here
+      groupingKeyConversionScratchSpace = new long[groupingKeySize];
+    } else {
+      // Zero out the buffer that's used to hold the current row. This is necessary in order
+      // to ensure that rows hash properly, since garbage data from the previous row could
+      // otherwise end up as padding in this row. As a performance optimization, we only zero out
+      // the portion of the buffer that we'll actually write to.
+      Arrays.fill(groupingKeyConversionScratchSpace, 0, groupingKeySize, 0);
+    }
+    final long actualGroupingKeySize = groupingKeyToUnsafeRowConverter.writeRow(
+      groupingKey,
+      groupingKeyConversionScratchSpace,
+      PlatformDependent.LONG_ARRAY_OFFSET);
+    assert (groupingKeySize == actualGroupingKeySize) : "Size requirement calculation was wrong!";
+
+    // Probe our map using the serialized key
+    final BytesToBytesMap.Location loc = map.lookup(
+      groupingKeyConversionScratchSpace,
+      PlatformDependent.LONG_ARRAY_OFFSET,
+      groupingKeySize);
+    if (!loc.isDefined()) {
+      // This is the first time that we've seen this grouping key, so we'll insert a copy of the
+      // empty aggregation buffer into the map:
+      loc.putNewKey(
+        groupingKeyConversionScratchSpace,
+        PlatformDependent.LONG_ARRAY_OFFSET,
+        groupingKeySize,
+        emptyAggregationBuffer,
+        PlatformDependent.LONG_ARRAY_OFFSET,
+        emptyAggregationBuffer.length
+      );
+    }
+
+    // Reset the pointer to point to the value that we just stored or looked up:
+    final MemoryLocation address = loc.getValueAddress();
+    currentAggregationBuffer.pointTo(
+      address.getBaseObject(),
+      address.getBaseOffset(),
+      aggregationBufferSchema.length(),
+      aggregationBufferSchema
+    );
+    return currentAggregationBuffer;
+  }
+
+  /**
+   * Mutable pair object returned by {@link UnsafeFixedWidthAggregationMap#iterator()}.
+   */
+  public static class MapEntry {
+    private MapEntry() { };
+    public final UnsafeRow key = new UnsafeRow();
+    public final UnsafeRow value = new UnsafeRow();
+  }
+
+  /**
+   * Returns an iterator over the keys and values in this map.
+   *
+   * For efficiency, each call returns the same object.
+   */
+  public Iterator<MapEntry> iterator() {
+    return new Iterator<MapEntry>() {
+
+      private final MapEntry entry = new MapEntry();
+      private final Iterator<BytesToBytesMap.Location> mapLocationIterator = map.iterator();
+
+      @Override
+      public boolean hasNext() {
+        return mapLocationIterator.hasNext();
+      }
+
+      @Override
+      public MapEntry next() {
+        final BytesToBytesMap.Location loc = mapLocationIterator.next();
+        final MemoryLocation keyAddress = loc.getKeyAddress();
+        final MemoryLocation valueAddress = loc.getValueAddress();
+        entry.key.pointTo(
+          keyAddress.getBaseObject(),
+          keyAddress.getBaseOffset(),
+          groupingKeySchema.length(),
+          groupingKeySchema
+        );
+        entry.value.pointTo(
+          valueAddress.getBaseObject(),
+          valueAddress.getBaseOffset(),
+          aggregationBufferSchema.length(),
+          aggregationBufferSchema
+        );
+        return entry;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  /**
+   * Free the unsafe memory associated with this map.
+   */
+  public void free() {
+    map.free();
+  }
+
+  @SuppressWarnings("UseOfSystemOutOrSystemErr")
+  public void printPerfMetrics() {
+    if (!enablePerfMetrics) {
+      throw new IllegalStateException("Perf metrics not enabled");
+    }
+    System.out.println("Average probes per lookup: " + map.getAverageProbesPerLookup());
+    System.out.println("Number of hash collisions: " + map.getNumHashCollisions());
+    System.out.println("Time spent resizing (ns): " + map.getTimeSpentResizingNs());
+    System.out.println("Total memory consumption (bytes): " + map.getTotalMemoryConsumption());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
new file mode 100644
index 0000000..0a358ed
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions;
+
+import scala.collection.Map;
+import scala.collection.Seq;
+import scala.collection.mutable.ArraySeq;
+
+import javax.annotation.Nullable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.util.*;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.DataType;
+import static org.apache.spark.sql.types.DataTypes.*;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.UTF8String;
+import org.apache.spark.unsafe.PlatformDependent;
+import org.apache.spark.unsafe.bitset.BitSetMethods;
+
+/**
+ * An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
+ *
+ * Each tuple has three parts: [null bit set] [values] [variable length portion]
+ *
+ * The bit set is used for null tracking and is aligned to 8-byte word boundaries.  It stores
+ * one bit per field.
+ *
+ * In the `values` region, we store one 8-byte word per field. For fields that hold fixed-length
+ * primitive types, such as long, double, or int, we store the value directly in the word. For
+ * fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the
+ * base address of the row) that points to the beginning of the variable-length field.
+ *
+ * Instances of `UnsafeRow` act as pointers to row data stored in this format.
+ */
+public final class UnsafeRow implements MutableRow {
+
+  private Object baseObject;
+  private long baseOffset;
+
+  Object getBaseObject() { return baseObject; }
+  long getBaseOffset() { return baseOffset; }
+
+  /** The number of fields in this row, used for calculating the bitset width (and in assertions) */
+  private int numFields;
+
+  /** The width of the null tracking bit set, in bytes */
+  private int bitSetWidthInBytes;
+  /**
+   * This optional schema is required if you want to call generic get() and set() methods on
+   * this UnsafeRow, but is optional if callers will only use type-specific getTYPE() and setTYPE()
+   * methods. This should be removed after the planned InternalRow / Row split; right now, it's only
+   * needed by the generic get() method, which is only called internally by code that accesses
+   * UTF8String-typed columns.
+   */
+  @Nullable
+  private StructType schema;
+
+  private long getFieldOffset(int ordinal) {
+   return baseOffset + bitSetWidthInBytes + ordinal * 8L;
+  }
+
+  public static int calculateBitSetWidthInBytes(int numFields) {
+    return ((numFields / 64) + (numFields % 64 == 0 ? 0 : 1)) * 8;
+  }
+
+  /**
+   * Field types that can be updated in place in UnsafeRows (e.g. we support set() for these types)
+   */
+  public static final Set<DataType> settableFieldTypes;
+
+  /**
+   * Fields types can be read(but not set (e.g. set() will throw UnsupportedOperationException).
+   */
+  public static final Set<DataType> readableFieldTypes;
+
+  static {
+    settableFieldTypes = Collections.unmodifiableSet(
+      new HashSet<DataType>(
+        Arrays.asList(new DataType[] {
+          NullType,
+          BooleanType,
+          ByteType,
+          ShortType,
+          IntegerType,
+          LongType,
+          FloatType,
+          DoubleType
+    })));
+
+    // We support get() on a superset of the types for which we support set():
+    final Set<DataType> _readableFieldTypes = new HashSet<DataType>(
+      Arrays.asList(new DataType[]{
+        StringType
+      }));
+    _readableFieldTypes.addAll(settableFieldTypes);
+    readableFieldTypes = Collections.unmodifiableSet(_readableFieldTypes);
+  }
+
+  /**
+   * Construct a new UnsafeRow. The resulting row won't be usable until `pointTo()` has been called,
+   * since the value returned by this constructor is equivalent to a null pointer.
+   */
+  public UnsafeRow() { }
+
+  /**
+   * Update this UnsafeRow to point to different backing data.
+   *
+   * @param baseObject the base object
+   * @param baseOffset the offset within the base object
+   * @param numFields the number of fields in this row
+   * @param schema an optional schema; this is necessary if you want to call generic get() or set()
+   *               methods on this row, but is optional if the caller will only use type-specific
+   *               getTYPE() and setTYPE() methods.
+   */
+  public void pointTo(
+      Object baseObject,
+      long baseOffset,
+      int numFields,
+      @Nullable StructType schema) {
+    assert numFields >= 0 : "numFields should >= 0";
+    assert schema == null || schema.fields().length == numFields;
+    this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
+    this.baseObject = baseObject;
+    this.baseOffset = baseOffset;
+    this.numFields = numFields;
+    this.schema = schema;
+  }
+
+  private void assertIndexIsValid(int index) {
+    assert index >= 0 : "index (" + index + ") should >= 0";
+    assert index < numFields : "index (" + index + ") should <= " + numFields;
+  }
+
+  @Override
+  public void setNullAt(int i) {
+    assertIndexIsValid(i);
+    BitSetMethods.set(baseObject, baseOffset, i);
+    // To preserve row equality, zero out the value when setting the column to null.
+    // Since this row does does not currently support updates to variable-length values, we don't
+    // have to worry about zeroing out that data.
+    PlatformDependent.UNSAFE.putLong(baseObject, getFieldOffset(i), 0);
+  }
+
+  private void setNotNullAt(int i) {
+    assertIndexIsValid(i);
+    BitSetMethods.unset(baseObject, baseOffset, i);
+  }
+
+  @Override
+  public void update(int ordinal, Object value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setInt(int ordinal, int value) {
+    assertIndexIsValid(ordinal);
+    setNotNullAt(ordinal);
+    PlatformDependent.UNSAFE.putInt(baseObject, getFieldOffset(ordinal), value);
+  }
+
+  @Override
+  public void setLong(int ordinal, long value) {
+    assertIndexIsValid(ordinal);
+    setNotNullAt(ordinal);
+    PlatformDependent.UNSAFE.putLong(baseObject, getFieldOffset(ordinal), value);
+  }
+
+  @Override
+  public void setDouble(int ordinal, double value) {
+    assertIndexIsValid(ordinal);
+    setNotNullAt(ordinal);
+    PlatformDependent.UNSAFE.putDouble(baseObject, getFieldOffset(ordinal), value);
+  }
+
+  @Override
+  public void setBoolean(int ordinal, boolean value) {
+    assertIndexIsValid(ordinal);
+    setNotNullAt(ordinal);
+    PlatformDependent.UNSAFE.putBoolean(baseObject, getFieldOffset(ordinal), value);
+  }
+
+  @Override
+  public void setShort(int ordinal, short value) {
+    assertIndexIsValid(ordinal);
+    setNotNullAt(ordinal);
+    PlatformDependent.UNSAFE.putShort(baseObject, getFieldOffset(ordinal), value);
+  }
+
+  @Override
+  public void setByte(int ordinal, byte value) {
+    assertIndexIsValid(ordinal);
+    setNotNullAt(ordinal);
+    PlatformDependent.UNSAFE.putByte(baseObject, getFieldOffset(ordinal), value);
+  }
+
+  @Override
+  public void setFloat(int ordinal, float value) {
+    assertIndexIsValid(ordinal);
+    setNotNullAt(ordinal);
+    PlatformDependent.UNSAFE.putFloat(baseObject, getFieldOffset(ordinal), value);
+  }
+
+  @Override
+  public void setString(int ordinal, String value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int size() {
+    return numFields;
+  }
+
+  @Override
+  public int length() {
+    return size();
+  }
+
+  @Override
+  public StructType schema() {
+    return schema;
+  }
+
+  @Override
+  public Object apply(int i) {
+    return get(i);
+  }
+
+  @Override
+  public Object get(int i) {
+    assertIndexIsValid(i);
+    assert (schema != null) : "Schema must be defined when calling generic get() method";
+    final DataType dataType = schema.fields()[i].dataType();
+    // UnsafeRow is only designed to be invoked by internal code, which only invokes this generic
+    // get() method when trying to access UTF8String-typed columns. If we refactor the codebase to
+    // separate the internal and external row interfaces, then internal code can fetch strings via
+    // a new getUTF8String() method and we'll be able to remove this method.
+    if (isNullAt(i)) {
+      return null;
+    } else if (dataType == StringType) {
+      return getUTF8String(i);
+    } else {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public boolean isNullAt(int i) {
+    assertIndexIsValid(i);
+    return BitSetMethods.isSet(baseObject, baseOffset, i);
+  }
+
+  @Override
+  public boolean getBoolean(int i) {
+    assertIndexIsValid(i);
+    return PlatformDependent.UNSAFE.getBoolean(baseObject, getFieldOffset(i));
+  }
+
+  @Override
+  public byte getByte(int i) {
+    assertIndexIsValid(i);
+    return PlatformDependent.UNSAFE.getByte(baseObject, getFieldOffset(i));
+  }
+
+  @Override
+  public short getShort(int i) {
+    assertIndexIsValid(i);
+    return PlatformDependent.UNSAFE.getShort(baseObject, getFieldOffset(i));
+  }
+
+  @Override
+  public int getInt(int i) {
+    assertIndexIsValid(i);
+    return PlatformDependent.UNSAFE.getInt(baseObject, getFieldOffset(i));
+  }
+
+  @Override
+  public long getLong(int i) {
+    assertIndexIsValid(i);
+    return PlatformDependent.UNSAFE.getLong(baseObject, getFieldOffset(i));
+  }
+
+  @Override
+  public float getFloat(int i) {
+    assertIndexIsValid(i);
+    if (isNullAt(i)) {
+      return Float.NaN;
+    } else {
+      return PlatformDependent.UNSAFE.getFloat(baseObject, getFieldOffset(i));
+    }
+  }
+
+  @Override
+  public double getDouble(int i) {
+    assertIndexIsValid(i);
+    if (isNullAt(i)) {
+      return Float.NaN;
+    } else {
+      return PlatformDependent.UNSAFE.getDouble(baseObject, getFieldOffset(i));
+    }
+  }
+
+  public UTF8String getUTF8String(int i) {
+    assertIndexIsValid(i);
+    final UTF8String str = new UTF8String();
+    final long offsetToStringSize = getLong(i);
+    final int stringSizeInBytes =
+      (int) PlatformDependent.UNSAFE.getLong(baseObject, baseOffset + offsetToStringSize);
+    final byte[] strBytes = new byte[stringSizeInBytes];
+    PlatformDependent.copyMemory(
+      baseObject,
+      baseOffset + offsetToStringSize + 8,  // The `+ 8` is to skip past the size to get the data
+      strBytes,
+      PlatformDependent.BYTE_ARRAY_OFFSET,
+      stringSizeInBytes
+    );
+    str.set(strBytes);
+    return str;
+  }
+
+  @Override
+  public String getString(int i) {
+    return getUTF8String(i).toString();
+  }
+
+  @Override
+  public BigDecimal getDecimal(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Date getDate(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> Seq<T> getSeq(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> List<T> getList(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <K, V> Map<K, V> getMap(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> scala.collection.immutable.Map<String, T> getValuesMap(Seq<String> fieldNames) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <K, V> java.util.Map<K, V> getJavaMap(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Row getStruct(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> T getAs(int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> T getAs(String fieldName) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int fieldIndex(String name) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Row copy() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean anyNull() {
+    return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes);
+  }
+
+  @Override
+  public Seq<Object> toSeq() {
+    final ArraySeq<Object> values = new ArraySeq<Object>(numFields);
+    for (int fieldNumber = 0; fieldNumber < numFields; fieldNumber++) {
+      values.update(fieldNumber, get(fieldNumber));
+    }
+    return values;
+  }
+
+  @Override
+  public String toString() {
+    return mkString("[", ",", "]");
+  }
+
+  @Override
+  public String mkString() {
+    return toSeq().mkString();
+  }
+
+  @Override
+  public String mkString(String sep) {
+    return toSeq().mkString(sep);
+  }
+
+  @Override
+  public String mkString(String start, String sep, String end) {
+    return toSeq().mkString(start, sep, end);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
new file mode 100644
index 0000000..5b2c857
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.PlatformDependent
+import org.apache.spark.unsafe.array.ByteArrayMethods
+
+/**
+ * Converts Rows into UnsafeRow format. This class is NOT thread-safe.
+ *
+ * @param fieldTypes the data types of the row's columns.
+ */
+class UnsafeRowConverter(fieldTypes: Array[DataType]) {
+
+  def this(schema: StructType) {
+    this(schema.fields.map(_.dataType))
+  }
+
+  /** Re-used pointer to the unsafe row being written */
+  private[this] val unsafeRow = new UnsafeRow()
+
+  /** Functions for encoding each column */
+  private[this] val writers: Array[UnsafeColumnWriter] = {
+    fieldTypes.map(t => UnsafeColumnWriter.forType(t))
+  }
+
+  /** The size, in bytes, of the fixed-length portion of the row, including the null bitmap */
+  private[this] val fixedLengthSize: Int =
+    (8 * fieldTypes.length) + UnsafeRow.calculateBitSetWidthInBytes(fieldTypes.length)
+
+  /**
+   * Compute the amount of space, in bytes, required to encode the given row.
+   */
+  def getSizeRequirement(row: Row): Int = {
+    var fieldNumber = 0
+    var variableLengthFieldSize: Int = 0
+    while (fieldNumber < writers.length) {
+      if (!row.isNullAt(fieldNumber)) {
+        variableLengthFieldSize += writers(fieldNumber).getSize(row, fieldNumber)
+      }
+      fieldNumber += 1
+    }
+    fixedLengthSize + variableLengthFieldSize
+  }
+
+  /**
+   * Convert the given row into UnsafeRow format.
+   *
+   * @param row the row to convert
+   * @param baseObject the base object of the destination address
+   * @param baseOffset the base offset of the destination address
+   * @return the number of bytes written. This should be equal to `getSizeRequirement(row)`.
+   */
+  def writeRow(row: Row, baseObject: Object, baseOffset: Long): Long = {
+    unsafeRow.pointTo(baseObject, baseOffset, writers.length, null)
+    var fieldNumber = 0
+    var appendCursor: Int = fixedLengthSize
+    while (fieldNumber < writers.length) {
+      if (row.isNullAt(fieldNumber)) {
+        unsafeRow.setNullAt(fieldNumber)
+      } else {
+        appendCursor += writers(fieldNumber).write(row, unsafeRow, fieldNumber, appendCursor)
+      }
+      fieldNumber += 1
+    }
+    appendCursor
+  }
+
+}
+
+/**
+ * Function for writing a column into an UnsafeRow.
+ */
+private abstract class UnsafeColumnWriter {
+  /**
+   * Write a value into an UnsafeRow.
+   *
+   * @param source the row being converted
+   * @param target a pointer to the converted unsafe row
+   * @param column the column to write
+   * @param appendCursor the offset from the start of the unsafe row to the end of the row;
+   *                     used for calculating where variable-length data should be written
+   * @return the number of variable-length bytes written
+   */
+  def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int
+
+  /**
+   * Return the number of bytes that are needed to write this variable-length value.
+   */
+  def getSize(source: Row, column: Int): Int
+}
+
+private object UnsafeColumnWriter {
+
+  def forType(dataType: DataType): UnsafeColumnWriter = {
+    dataType match {
+      case NullType => NullUnsafeColumnWriter
+      case BooleanType => BooleanUnsafeColumnWriter
+      case ByteType => ByteUnsafeColumnWriter
+      case ShortType => ShortUnsafeColumnWriter
+      case IntegerType => IntUnsafeColumnWriter
+      case LongType => LongUnsafeColumnWriter
+      case FloatType => FloatUnsafeColumnWriter
+      case DoubleType => DoubleUnsafeColumnWriter
+      case StringType => StringUnsafeColumnWriter
+      case t =>
+        throw new UnsupportedOperationException(s"Do not know how to write columns of type $t")
+    }
+  }
+}
+
+// ------------------------------------------------------------------------------------------------
+
+private object NullUnsafeColumnWriter extends NullUnsafeColumnWriter
+private object BooleanUnsafeColumnWriter extends BooleanUnsafeColumnWriter
+private object ByteUnsafeColumnWriter extends ByteUnsafeColumnWriter
+private object ShortUnsafeColumnWriter extends ShortUnsafeColumnWriter
+private object IntUnsafeColumnWriter extends IntUnsafeColumnWriter
+private object LongUnsafeColumnWriter extends LongUnsafeColumnWriter
+private object FloatUnsafeColumnWriter extends FloatUnsafeColumnWriter
+private object DoubleUnsafeColumnWriter extends DoubleUnsafeColumnWriter
+private object StringUnsafeColumnWriter extends StringUnsafeColumnWriter
+
+private abstract class PrimitiveUnsafeColumnWriter extends UnsafeColumnWriter {
+  // Primitives don't write to the variable-length region:
+  def getSize(sourceRow: Row, column: Int): Int = 0
+}
+
+private class NullUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
+  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+    target.setNullAt(column)
+    0
+  }
+}
+
+private class BooleanUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
+  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+    target.setBoolean(column, source.getBoolean(column))
+    0
+  }
+}
+
+private class ByteUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
+  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+    target.setByte(column, source.getByte(column))
+    0
+  }
+}
+
+private class ShortUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
+  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+    target.setShort(column, source.getShort(column))
+    0
+  }
+}
+
+private class IntUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
+  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+    target.setInt(column, source.getInt(column))
+    0
+  }
+}
+
+private class LongUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
+  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+    target.setLong(column, source.getLong(column))
+    0
+  }
+}
+
+private class FloatUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
+  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+    target.setFloat(column, source.getFloat(column))
+    0
+  }
+}
+
+private class DoubleUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter {
+  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+    target.setDouble(column, source.getDouble(column))
+    0
+  }
+}
+
+private class StringUnsafeColumnWriter private() extends UnsafeColumnWriter {
+  def getSize(source: Row, column: Int): Int = {
+    val numBytes = source.get(column).asInstanceOf[UTF8String].getBytes.length
+    8 + ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+  }
+
+  override def write(source: Row, target: UnsafeRow, column: Int, appendCursor: Int): Int = {
+    val value = source.get(column).asInstanceOf[UTF8String]
+    val baseObject = target.getBaseObject
+    val baseOffset = target.getBaseOffset
+    val numBytes = value.getBytes.length
+    PlatformDependent.UNSAFE.putLong(baseObject, baseOffset + appendCursor, numBytes)
+    PlatformDependent.copyMemory(
+      value.getBytes,
+      PlatformDependent.BYTE_ARRAY_OFFSET,
+      baseObject,
+      baseOffset + appendCursor + 8,
+      numBytes
+    )
+    target.setLong(column, appendCursor)
+    8 + ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
new file mode 100644
index 0000000..7a19e51
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, TaskMemoryManager, MemoryAllocator}
+import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers}
+
+import org.apache.spark.sql.types._
+
+class UnsafeFixedWidthAggregationMapSuite extends FunSuite with Matchers with BeforeAndAfterEach {
+
+  import UnsafeFixedWidthAggregationMap._
+
+  private val groupKeySchema = StructType(StructField("product", StringType) :: Nil)
+  private val aggBufferSchema = StructType(StructField("salePrice", IntegerType) :: Nil)
+  private def emptyAggregationBuffer: Row = new GenericRow(Array[Any](0))
+
+  private var memoryManager: TaskMemoryManager = null
+
+  override def beforeEach(): Unit = {
+    memoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP))
+  }
+
+  override def afterEach(): Unit = {
+    if (memoryManager != null) {
+      memoryManager.cleanUpAllAllocatedMemory()
+      memoryManager = null
+    }
+  }
+
+  test("supported schemas") {
+    assert(!supportsAggregationBufferSchema(StructType(StructField("x", StringType) :: Nil)))
+    assert(supportsGroupKeySchema(StructType(StructField("x", StringType) :: Nil)))
+
+    assert(
+      !supportsAggregationBufferSchema(StructType(StructField("x", ArrayType(IntegerType)) :: Nil)))
+    assert(
+      !supportsGroupKeySchema(StructType(StructField("x", ArrayType(IntegerType)) :: Nil)))
+  }
+
+  test("empty map") {
+    val map = new UnsafeFixedWidthAggregationMap(
+      emptyAggregationBuffer,
+      aggBufferSchema,
+      groupKeySchema,
+      memoryManager,
+      1024, // initial capacity
+      false // disable perf metrics
+    )
+    assert(!map.iterator().hasNext)
+    map.free()
+  }
+
+  test("updating values for a single key") {
+    val map = new UnsafeFixedWidthAggregationMap(
+      emptyAggregationBuffer,
+      aggBufferSchema,
+      groupKeySchema,
+      memoryManager,
+      1024, // initial capacity
+      false // disable perf metrics
+    )
+    val groupKey = new GenericRow(Array[Any](UTF8String("cats")))
+
+    // Looking up a key stores a zero-entry in the map (like Python Counters or DefaultDicts)
+    map.getAggregationBuffer(groupKey)
+    val iter = map.iterator()
+    val entry = iter.next()
+    assert(!iter.hasNext)
+    entry.key.getString(0) should be ("cats")
+    entry.value.getInt(0) should be (0)
+
+    // Modifications to rows retrieved from the map should update the values in the map
+    entry.value.setInt(0, 42)
+    map.getAggregationBuffer(groupKey).getInt(0) should be (42)
+
+    map.free()
+  }
+
+  test("inserting large random keys") {
+    val map = new UnsafeFixedWidthAggregationMap(
+      emptyAggregationBuffer,
+      aggBufferSchema,
+      groupKeySchema,
+      memoryManager,
+      128, // initial capacity
+      false // disable perf metrics
+    )
+    val rand = new Random(42)
+    val groupKeys: Set[String] = Seq.fill(512)(rand.nextString(1024)).toSet
+    groupKeys.foreach { keyString =>
+      map.getAggregationBuffer(new GenericRow(Array[Any](UTF8String(keyString))))
+    }
+    val seenKeys: Set[String] = map.iterator().asScala.map { entry =>
+      entry.key.getString(0)
+    }.toSet
+    seenKeys.size should be (groupKeys.size)
+    seenKeys should be (groupKeys)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
new file mode 100644
index 0000000..3a60c7f
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.Arrays
+
+import org.scalatest.{FunSuite, Matchers}
+
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.PlatformDependent
+import org.apache.spark.unsafe.array.ByteArrayMethods
+
+class UnsafeRowConverterSuite extends FunSuite with Matchers {
+
+  test("basic conversion with only primitive types") {
+    val fieldTypes: Array[DataType] = Array(LongType, LongType, IntegerType)
+    val converter = new UnsafeRowConverter(fieldTypes)
+
+    val row = new SpecificMutableRow(fieldTypes)
+    row.setLong(0, 0)
+    row.setLong(1, 1)
+    row.setInt(2, 2)
+
+    val sizeRequired: Int = converter.getSizeRequirement(row)
+    sizeRequired should be (8 + (3 * 8))
+    val buffer: Array[Long] = new Array[Long](sizeRequired / 8)
+    val numBytesWritten = converter.writeRow(row, buffer, PlatformDependent.LONG_ARRAY_OFFSET)
+    numBytesWritten should be (sizeRequired)
+
+    val unsafeRow = new UnsafeRow()
+    unsafeRow.pointTo(buffer, PlatformDependent.LONG_ARRAY_OFFSET, fieldTypes.length, null)
+    unsafeRow.getLong(0) should be (0)
+    unsafeRow.getLong(1) should be (1)
+    unsafeRow.getInt(2) should be (2)
+  }
+
+  test("basic conversion with primitive and string types") {
+    val fieldTypes: Array[DataType] = Array(LongType, StringType, StringType)
+    val converter = new UnsafeRowConverter(fieldTypes)
+
+    val row = new SpecificMutableRow(fieldTypes)
+    row.setLong(0, 0)
+    row.setString(1, "Hello")
+    row.setString(2, "World")
+
+    val sizeRequired: Int = converter.getSizeRequirement(row)
+    sizeRequired should be (8 + (8 * 3) +
+      ByteArrayMethods.roundNumberOfBytesToNearestWord("Hello".getBytes.length + 8) +
+      ByteArrayMethods.roundNumberOfBytesToNearestWord("World".getBytes.length + 8))
+    val buffer: Array[Long] = new Array[Long](sizeRequired / 8)
+    val numBytesWritten = converter.writeRow(row, buffer, PlatformDependent.LONG_ARRAY_OFFSET)
+    numBytesWritten should be (sizeRequired)
+
+    val unsafeRow = new UnsafeRow()
+    unsafeRow.pointTo(buffer, PlatformDependent.LONG_ARRAY_OFFSET, fieldTypes.length, null)
+    unsafeRow.getLong(0) should be (0)
+    unsafeRow.getString(1) should be ("Hello")
+    unsafeRow.getString(2) should be ("World")
+  }
+
+  test("null handling") {
+    val fieldTypes: Array[DataType] = Array(
+      NullType,
+      BooleanType,
+      ByteType,
+      ShortType,
+      IntegerType,
+      LongType,
+      FloatType,
+      DoubleType)
+    val converter = new UnsafeRowConverter(fieldTypes)
+
+    val rowWithAllNullColumns: Row = {
+      val r = new SpecificMutableRow(fieldTypes)
+      for (i <- 0 to fieldTypes.length - 1) {
+        r.setNullAt(i)
+      }
+      r
+    }
+
+    val sizeRequired: Int = converter.getSizeRequirement(rowWithAllNullColumns)
+    val createdFromNullBuffer: Array[Long] = new Array[Long](sizeRequired / 8)
+    val numBytesWritten = converter.writeRow(
+      rowWithAllNullColumns, createdFromNullBuffer, PlatformDependent.LONG_ARRAY_OFFSET)
+    numBytesWritten should be (sizeRequired)
+
+    val createdFromNull = new UnsafeRow()
+    createdFromNull.pointTo(
+      createdFromNullBuffer, PlatformDependent.LONG_ARRAY_OFFSET, fieldTypes.length, null)
+    for (i <- 0 to fieldTypes.length - 1) {
+      assert(createdFromNull.isNullAt(i))
+    }
+    createdFromNull.getBoolean(1) should be (false)
+    createdFromNull.getByte(2) should be (0)
+    createdFromNull.getShort(3) should be (0)
+    createdFromNull.getInt(4) should be (0)
+    createdFromNull.getLong(5) should be (0)
+    assert(java.lang.Float.isNaN(createdFromNull.getFloat(6)))
+    assert(java.lang.Double.isNaN(createdFromNull.getFloat(7)))
+
+    // If we have an UnsafeRow with columns that are initially non-null and we null out those
+    // columns, then the serialized row representation should be identical to what we would get by
+    // creating an entirely null row via the converter
+    val rowWithNoNullColumns: Row = {
+      val r = new SpecificMutableRow(fieldTypes)
+      r.setNullAt(0)
+      r.setBoolean(1, false)
+      r.setByte(2, 20)
+      r.setShort(3, 30)
+      r.setInt(4, 400)
+      r.setLong(5, 500)
+      r.setFloat(6, 600)
+      r.setDouble(7, 700)
+      r
+    }
+    val setToNullAfterCreationBuffer: Array[Long] = new Array[Long](sizeRequired / 8)
+    converter.writeRow(
+      rowWithNoNullColumns, setToNullAfterCreationBuffer, PlatformDependent.LONG_ARRAY_OFFSET)
+    val setToNullAfterCreation = new UnsafeRow()
+    setToNullAfterCreation.pointTo(
+      setToNullAfterCreationBuffer, PlatformDependent.LONG_ARRAY_OFFSET, fieldTypes.length, null)
+
+    setToNullAfterCreation.isNullAt(0) should be (rowWithNoNullColumns.isNullAt(0))
+    setToNullAfterCreation.getBoolean(1) should be (rowWithNoNullColumns.getBoolean(1))
+    setToNullAfterCreation.getByte(2) should be (rowWithNoNullColumns.getByte(2))
+    setToNullAfterCreation.getShort(3) should be (rowWithNoNullColumns.getShort(3))
+    setToNullAfterCreation.getInt(4) should be (rowWithNoNullColumns.getInt(4))
+    setToNullAfterCreation.getLong(5) should be (rowWithNoNullColumns.getLong(5))
+    setToNullAfterCreation.getFloat(6) should be (rowWithNoNullColumns.getFloat(6))
+    setToNullAfterCreation.getDouble(7) should be (rowWithNoNullColumns.getDouble(7))
+
+    for (i <- 0 to fieldTypes.length - 1) {
+      setToNullAfterCreation.setNullAt(i)
+    }
+    assert(Arrays.equals(createdFromNullBuffer, setToNullAfterCreationBuffer))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 4fc5de7..2fa602a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -30,6 +30,7 @@ private[spark] object SQLConf {
   val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
   val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
   val CODEGEN_ENABLED = "spark.sql.codegen"
+  val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
   val DIALECT = "spark.sql.dialect"
 
   val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
@@ -149,6 +150,14 @@ private[sql] class SQLConf extends Serializable {
    */
   private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "false").toBoolean
 
+  /**
+   * When set to true, Spark SQL will use managed memory for certain operations.  This option only
+   * takes effect if codegen is enabled.
+   *
+   * Defaults to false as this feature is currently experimental.
+   */
+  private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED, "false").toBoolean
+
   private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2, "true").toBoolean
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index a279b0f..bd4a55f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -1011,6 +1011,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
     def codegenEnabled: Boolean = self.conf.codegenEnabled
 
+    def unsafeEnabled: Boolean = self.conf.unsafeEnabled
+
     def numPartitions: Int = self.conf.numShufflePartitions
 
     def strategies: Seq[Strategy] =

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index b1ef655..5d9f202 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.TaskContext
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.trees._
@@ -40,6 +41,7 @@ case class AggregateEvaluation(
  *                ensure all values where `groupingExpressions` are equal are present.
  * @param groupingExpressions expressions that are evaluated to determine grouping.
  * @param aggregateExpressions expressions that are computed for each group.
+ * @param unsafeEnabled whether to allow Unsafe-based aggregation buffers to be used.
  * @param child the input data source.
  */
 @DeveloperApi
@@ -47,6 +49,7 @@ case class GeneratedAggregate(
     partial: Boolean,
     groupingExpressions: Seq[Expression],
     aggregateExpressions: Seq[NamedExpression],
+    unsafeEnabled: Boolean,
     child: SparkPlan)
   extends UnaryNode {
 
@@ -225,6 +228,21 @@ case class GeneratedAggregate(
       case e: Expression if groupMap.contains(e) => groupMap(e)
     })
 
+    val aggregationBufferSchema: StructType = StructType.fromAttributes(computationSchema)
+
+    val groupKeySchema: StructType = {
+      val fields = groupingExpressions.zipWithIndex.map { case (expr, idx) =>
+        // This is a dummy field name
+        StructField(idx.toString, expr.dataType, expr.nullable)
+      }
+      StructType(fields)
+    }
+
+    val schemaSupportsUnsafe: Boolean = {
+      UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(aggregationBufferSchema) &&
+        UnsafeFixedWidthAggregationMap.supportsGroupKeySchema(groupKeySchema)
+    }
+
     child.execute().mapPartitions { iter =>
       // Builds a new custom class for holding the results of aggregation for a group.
       val initialValues = computeFunctions.flatMap(_.initialValues)
@@ -265,7 +283,49 @@ case class GeneratedAggregate(
 
         val resultProjection = resultProjectionBuilder()
         Iterator(resultProjection(buffer))
+      } else if (unsafeEnabled && schemaSupportsUnsafe) {
+        log.info("Using Unsafe-based aggregator")
+        val aggregationMap = new UnsafeFixedWidthAggregationMap(
+          newAggregationBuffer(EmptyRow),
+          aggregationBufferSchema,
+          groupKeySchema,
+          TaskContext.get.taskMemoryManager(),
+          1024 * 16, // initial capacity
+          false // disable tracking of performance metrics
+        )
+
+        while (iter.hasNext) {
+          val currentRow: Row = iter.next()
+          val groupKey: Row = groupProjection(currentRow)
+          val aggregationBuffer = aggregationMap.getAggregationBuffer(groupKey)
+          updateProjection.target(aggregationBuffer)(joinedRow(aggregationBuffer, currentRow))
+        }
+
+        new Iterator[Row] {
+          private[this] val mapIterator = aggregationMap.iterator()
+          private[this] val resultProjection = resultProjectionBuilder()
+
+          def hasNext: Boolean = mapIterator.hasNext
+
+          def next(): Row = {
+            val entry = mapIterator.next()
+            val result = resultProjection(joinedRow(entry.key, entry.value))
+            if (hasNext) {
+              result
+            } else {
+              // This is the last element in the iterator, so let's free the buffer. Before we do,
+              // though, we need to make a defensive copy of the result so that we don't return an
+              // object that might contain dangling pointers to the freed memory
+              val resultCopy = result.copy()
+              aggregationMap.free()
+              resultCopy
+            }
+          }
+        }
       } else {
+        if (unsafeEnabled) {
+          log.info("Not using Unsafe-based aggregator because it is not supported for this schema")
+        }
         val buffers = new java.util.HashMap[Row, MutableRow]()
 
         var currentRow: Row = null

http://git-wip-us.apache.org/repos/asf/spark/blob/f49284b5/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 3a0a6c8..af58911 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -136,10 +136,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
             partial = false,
             namedGroupingAttributes,
             rewrittenAggregateExpressions,
+            unsafeEnabled,
             execution.GeneratedAggregate(
               partial = true,
               groupingExpressions,
               partialComputation,
+              unsafeEnabled,
               planLater(child))) :: Nil
 
       // Cases where some aggregate can not be codegened


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org