You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/10 18:22:58 UTC

[2/3] drill git commit: DRILL-2757: Verify operators correctly handle low memory conditions and cancellations

DRILL-2757: Verify operators correctly handle low memory conditions and cancellations

includes:
DRILL-2816: system error does not display the original Exception message
DRILL-2893: ScanBatch throws a NullPointerException instead of returning OUT_OF_MEMORY
DRILL-2894: FixedValueVectors shouldn't set it's data buffer to null when it fails to allocate it
DRILL-2895: AbstractRecordBatch.buildSchema() should properly handle OUT_OF_MEMORY outcome
DRILL-2905: RootExec implementations should properly handle IterOutcome.OUT_OF_MEMORY
DRILL-2920: properly handle OutOfMemoryException
DRILL-2947: AllocationHelper.allocateNew() doesn't have a consistent behavior when it can't allocate

also:
- added UserException.memoryError() with a pre assigned error message
- injection site in ScanBatch and unit test that runs various tpch queries and injects
  an exception in the ScanBatch that will cause an OUT_OF_MEMORY outcome to be sent


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1c09c2f1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1c09c2f1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1c09c2f1

Branch: refs/heads/master
Commit: 1c09c2f13bd0f50ca40c17dc0bfa7aae5826b8c3
Parents: a296383
Author: adeneche <ad...@gmail.com>
Authored: Wed Apr 29 08:29:04 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun May 10 09:22:16 2015 -0700

----------------------------------------------------------------------
 .../drill/common/exceptions/ErrorHelper.java    |  26 ++++
 .../drill/common/exceptions/UserException.java  | 117 +++++++++-------
 .../common/exceptions/TestUserException.java    |   5 +-
 .../codegen/templates/FixedValueVectors.java    |  28 +++-
 .../templates/VariableLengthVectors.java        |  22 +++-
 .../apache/drill/exec/ops/FragmentContext.java  |   6 +
 .../drill/exec/physical/impl/ScanBatch.java     |  23 ++--
 .../drill/exec/physical/impl/ScreenCreator.java |   3 +
 .../exec/physical/impl/SingleSenderCreator.java |   3 +
 .../exec/physical/impl/TopN/TopNBatch.java      |  15 +--
 .../exec/physical/impl/WriterRecordBatch.java   |  14 +-
 .../physical/impl/aggregate/HashAggBatch.java   |  59 +++++----
 .../impl/aggregate/HashAggTemplate.java         |   1 +
 .../impl/aggregate/StreamingAggBatch.java       |  75 ++++++-----
 .../impl/aggregate/StreamingAggTemplate.java    |   3 +
 .../BroadcastSenderRootExec.java                |   3 +
 .../physical/impl/filter/FilterTemplate2.java   |   3 +-
 .../exec/physical/impl/join/HashJoinBatch.java  |   7 +
 .../exec/physical/impl/join/MergeJoinBatch.java |   8 +-
 .../physical/impl/join/NestedLoopJoinBatch.java |  10 +-
 .../physical/impl/limit/LimitRecordBatch.java   |  14 +-
 .../impl/mergereceiver/MergingRecordBatch.java  |   7 +-
 .../PartitionSenderRootExec.java                |   4 +
 .../impl/producer/ProducerConsumerBatch.java    |  49 +++++--
 .../impl/project/ProjectRecordBatch.java        |  16 +--
 .../exec/physical/impl/sort/SortBatch.java      |   4 +-
 .../impl/svremover/RemovingRecordBatch.java     |   8 +-
 .../physical/impl/trace/TraceRecordBatch.java   |   3 +-
 .../impl/union/UnionAllRecordBatch.java         |   5 +-
 .../impl/window/WindowFrameRecordBatch.java     |  20 ++-
 .../physical/impl/xsort/ExternalSortBatch.java  |  19 ++-
 .../drill/exec/record/AbstractRecordBatch.java  |  31 ++---
 .../drill/exec/vector/AllocationHelper.java     |  13 +-
 .../org/apache/drill/exec/vector/BitVector.java |  20 ++-
 .../apache/drill/exec/work/foreman/Foreman.java |   4 +
 .../exec/work/fragment/FragmentExecutor.java    |   4 +-
 .../apache/drill/TestOutOfMemoryOutcome.java    | 132 +++++++++++++++++++
 37 files changed, 556 insertions(+), 228 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java b/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
index 4da4ee8..5dd9b67 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
@@ -29,6 +29,32 @@ class ErrorHelper {
 
   private final static Pattern IGNORE= Pattern.compile("^(sun|com\\.sun|java).*");
 
+  /**
+   * Constructs the root error message in the form [root exception class name]: [root exception message]
+   *
+   * @param cause exception we want the root message for
+   * @return root error message or empty string if none found
+   */
+  static String getRootMessage(final Throwable cause) {
+    String message = "";
+
+    Throwable ex = cause;
+    while (ex != null) {
+      if (ex.getMessage() != null) {
+        message = ex.getClass().getName() + ": " + ex.getMessage();
+      }
+
+      if (ex.getCause() != null && ex.getCause() != ex) {
+        ex = ex.getCause();
+      } else {
+        break;
+      }
+    }
+
+    return message;
+  }
+
+
   static String buildCausesMessage(final Throwable t) {
 
     StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index 9283339..d90ace1 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -21,14 +21,14 @@ import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 
 /**
- * Base class for all user exception. The goal is to separate out common error condititions where we can give users
+ * Base class for all user exception. The goal is to separate out common error conditions where we can give users
  * useful feedback.
  * <p>Throwing a user exception will guarantee it's message will be displayed to the user, along with any context
  * information added to the exception at various levels while being sent to the client.
  * <p>A specific class of user exceptions are system exception. They represent system level errors that don't display
  * any specific error message to the user apart from "A system error has occurend" along with informations to retrieve
  * the details of the exception from the logs.
- * <p>although system exception should only display a generic message to the user, for now they will display the root
+ * <p>Although system exception should only display a generic message to the user, for now they will display the root
  * error message, until all user errors are properly sent from the server side.
  * <p>Any thrown exception that is not wrapped inside a user exception will automatically be converted to a system
  * exception before being sent to the client.
@@ -37,10 +37,32 @@ import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
  */
 public class UserException extends DrillRuntimeException {
 
+  public static final String MEMORY_ERROR_MSG = "One or more nodes ran out of memory while executing the query.";
+
+  /**
+   * Creates a RESOURCE error with a prebuilt message for out of memory exceptions
+   *
+   * @param cause exception that will be wrapped inside a memory error
+   * @return resource error builder
+   */
+  public static Builder memoryError(final Throwable cause) {
+    return UserException.resourceError(cause)
+      .message(MEMORY_ERROR_MSG);
+  }
+
+  /**
+   * Creates a RESOURCE error with a prebuilt message for out of memory exceptions
+   *
+   * @return resource error builder
+   */
+  public static Builder memoryError() {
+    return memoryError(null);
+  }
+
   /**
-   * wraps the passed exception inside a system error.
-   * <p>the cause message will be used unless {@link Builder#message(String, Object...)} is called.
-   * <p>if the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
+   * Wraps the passed exception inside a system error.
+   * <p>The cause message will be used unless {@link Builder#message(String, Object...)} is called.
+   * <p>If the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
    * of creating a new exception. Any added context will be added to the user exception as well.
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#SYSTEM
@@ -49,7 +71,7 @@ public class UserException extends DrillRuntimeException {
    *              returned by the builder instead of creating a new user exception
    * @return user exception builder
    *
-   * @deprecated this method should never need to be used explicitely, unless you are passing the exception to the
+   * @deprecated This method should never need to be used explicitly, unless you are passing the exception to the
    *             Rpc layer or UserResultListener.submitFailed()
    */
   @Deprecated
@@ -58,7 +80,7 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * creates a new user exception builder .
+   * Creates a new user exception builder.
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#CONNECTION
    * @return user exception builder
@@ -68,9 +90,9 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * wraps the passed exception inside a connection error.
-   * <p>the cause message will be used unless {@link Builder#message(String, Object...)} is called.
-   * <p>if the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
+   * Wraps the passed exception inside a connection error.
+   * <p>The cause message will be used unless {@link Builder#message(String, Object...)} is called.
+   * <p>If the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
    * of creating a new exception. Any added context will be added to the user exception as well.
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#CONNECTION
@@ -84,7 +106,7 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * creates a new user exception builder .
+   * Creates a new user exception builder.
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#DATA_READ
    * @return user exception builder
@@ -94,9 +116,9 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * wraps the passed exception inside a data read error.
-   * <p>the cause message will be used unless {@link Builder#message(String, Object...)} is called.
-   * <p>if the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
+   * Wraps the passed exception inside a data read error.
+   * <p>The cause message will be used unless {@link Builder#message(String, Object...)} is called.
+   * <p>If the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
    * of creating a new exception. Any added context will be added to the user exception as well.
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#DATA_READ
@@ -110,7 +132,7 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * creates a new user exception builder .
+   * Creates a new user exception builder .
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#DATA_WRITE
    * @return user exception builder
@@ -120,9 +142,9 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * wraps the passed exception inside a data write error.
-   * <p>the cause message will be used unless {@link Builder#message(String, Object...)} is called.
-   * <p>if the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
+   * Wraps the passed exception inside a data write error.
+   * <p>The cause message will be used unless {@link Builder#message(String, Object...)} is called.
+   * <p>If the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
    * of creating a new exception. Any added context will be added to the user exception as well.
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#DATA_WRITE
@@ -136,7 +158,7 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * creates a new user exception builder .
+   * Creates a new user exception builder .
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#FUNCTION
    * @return user exception builder
@@ -146,9 +168,9 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * wraps the passed exception inside a function error.
-   * <p>the cause message will be used unless {@link Builder#message(String, Object...)} is called.
-   * <p>if the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
+   * Wraps the passed exception inside a function error.
+   * <p>The cause message will be used unless {@link Builder#message(String, Object...)} is called.
+   * <p>If the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
    * of creating a new exception. Any added context will be added to the user exception as well.
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#FUNCTION
@@ -162,7 +184,7 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * creates a new user exception builder .
+   * Creates a new user exception builder .
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PARSE
    * @return user exception builder
@@ -172,9 +194,9 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * wraps the passed exception inside a system error.
-   * <p>the cause message will be used unless {@link Builder#message(String, Object...)} is called.
-   * <p>if the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
+   * Wraps the passed exception inside a system error.
+   * <p>The cause message will be used unless {@link Builder#message(String, Object...)} is called.
+   * <p>If the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
    * of creating a new exception. Any added context will be added to the user exception as well.
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PARSE
@@ -188,7 +210,7 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * creates a new user exception builder .
+   * Creates a new user exception builder .
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PERMISSION
    * @return user exception builder
@@ -198,9 +220,9 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * wraps the passed exception inside a system error.
-   * <p>the cause message will be used unless {@link Builder#message(String, Object...)} is called.
-   * <p>if the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
+   * Wraps the passed exception inside a system error.
+   * <p>The cause message will be used unless {@link Builder#message(String, Object...)} is called.
+   * <p>If the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
    * of creating a new exception. Any added context will be added to the user exception as well.
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PERMISSION
@@ -214,7 +236,7 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * creates a new user exception builder .
+   * Creates a new user exception builder .
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PLAN
    * @return user exception builder
@@ -224,9 +246,9 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * wraps the passed exception inside a system error.
-   * <p>the cause message will be used unless {@link Builder#message(String, Object...)} is called.
-   * <p>if the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
+   * Wraps the passed exception inside a system error.
+   * <p>The cause message will be used unless {@link Builder#message(String, Object...)} is called.
+   * <p>If the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
    * of creating a new exception. Any added context will be added to the user exception as well.
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#PLAN
@@ -240,7 +262,7 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * creates a new user exception builder .
+   * Creates a new user exception builder .
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#RESOURCE
    * @return user exception builder
@@ -250,9 +272,9 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * wraps the passed exception inside a system error.
-   * <p>the cause message will be used unless {@link Builder#message(String, Object...)} is called.
-   * <p>if the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
+   * Wraps the passed exception inside a system error.
+   * <p>The cause message will be used unless {@link Builder#message(String, Object...)} is called.
+   * <p>If the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
    * of creating a new exception. Any added context will be added to the user exception as well.
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#RESOURCE
@@ -266,7 +288,7 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * creates a new user exception builder .
+   * Creates a new user exception builder .
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#UNSUPPORTED_OPERATION
    * @return user exception builder
@@ -276,9 +298,9 @@ public class UserException extends DrillRuntimeException {
   }
 
   /**
-   * wraps the passed exception inside a system error.
-   * <p>the cause message will be used unless {@link Builder#message(String, Object...)} is called.
-   * <p>if the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
+   * Wraps the passed exception inside a system error.
+   * <p>The cause message will be used unless {@link Builder#message(String, Object...)} is called.
+   * <p>If the wrapped exception is, or wraps, a user exception it will be returned by {@link Builder#build()} instead
    * of creating a new exception. Any added context will be added to the user exception as well.
    *
    * @see org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType#UNSUPPORTED_OPERATION
@@ -307,7 +329,7 @@ public class UserException extends DrillRuntimeException {
     private String message;
 
     /**
-     * wraps an existing exception inside a user exception.
+     * Wraps an existing exception inside a user exception.
      *
      * @param errorType user exception type that should be created if the passed exception isn't,
      *                  or doesn't wrap a user exception
@@ -462,13 +484,20 @@ public class UserException extends DrillRuntimeException {
         return uex;
       }
 
+      boolean isSystemError = errorType == DrillPBError.ErrorType.SYSTEM;
+
+      // make sure system errors use the root error message and display the root cause class name
+      if (isSystemError) {
+        message = ErrorHelper.getRootMessage(cause);
+      }
+
       final UserException newException = new UserException(this);
 
       // since we just created a new exception, we should log it for later reference. If this is a system error, this is
       // an issue that the Drill admin should pay attention to and we should log as ERROR. However, if this is a user
       // mistake or data read issue, the system admin should not be concerned about these and thus we'll log this
       // as an INFO message.
-      if (errorType == DrillPBError.ErrorType.SYSTEM) {
+      if (isSystemError) {
         logger.error(newException.getMessage(), newException);
       } else {
         logger.info("User Error Occurred", newException);

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/common/src/test/java/org/apache/drill/common/exceptions/TestUserException.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/drill/common/exceptions/TestUserException.java b/common/src/test/java/org/apache/drill/common/exceptions/TestUserException.java
index a145f95..151b762 100644
--- a/common/src/test/java/org/apache/drill/common/exceptions/TestUserException.java
+++ b/common/src/test/java/org/apache/drill/common/exceptions/TestUserException.java
@@ -40,9 +40,10 @@ public class TestUserException {
   @Test
   public void testBuildSystemException() {
     String message = "This is an exception";
-    UserException uex = UserException.systemError(new RuntimeException(message)).build();
+    UserException uex = UserException.systemError(new Exception(new RuntimeException(message))).build();
 
-    Assert.assertEquals(message, uex.getOriginalMessage());
+    Assert.assertTrue(uex.getOriginalMessage().contains(message));
+    Assert.assertTrue(uex.getOriginalMessage().contains("RuntimeException"));
 
     DrillPBError error = uex.getOrCreatePBError(true);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index 1059bfb..a805b8e 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -38,7 +38,7 @@ package org.apache.drill.exec.vector;
  *   The width of each element is ${type.width} byte(s)
  *   The equivalent Java primitive is '${minor.javaType!type.javaType}'
  *
- * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
+ * Source code generated using FreeMarker template ${.template_name}
  */
 @SuppressWarnings("unused")
 public final class ${minor.class}Vector extends BaseDataValueVector implements FixedWidthVector{
@@ -92,8 +92,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
       allocationValueCount = (int) (allocationValueCount * 2);
       allocationMonitor = 0;
     }
-    this.data = allocator.buffer(allocationValueCount * ${type.width});
-    if(data == null) return false;
+
+    DrillBuf newBuf = allocator.buffer(allocationValueCount * ${type.width});
+    if(newBuf == null) {
+      return false;
+    }
+
+    this.data = newBuf;
     this.data.readerIndex(0);
     return true;
   }
@@ -101,21 +106,36 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   /**
    * Allocate a new buffer that supports setting at least the provided number of values.  May actually be sized bigger depending on underlying buffer rounding size. Must be called prior to using the ValueVector.
    * @param valueCount
+   * @throws org.apache.drill.exec.memory.OutOfMemoryRuntimeException if it can't allocate the new buffer
    */
   public void allocateNew(int valueCount) {
     clear();
-    this.data = allocator.buffer(valueCount * ${type.width});
+
+    DrillBuf newBuf = allocator.buffer(valueCount * ${type.width});
+    if (newBuf == null) {
+      throw new OutOfMemoryRuntimeException(
+        String.format("Failure while allocating buffer of %d bytes",valueCount * ${type.width}));
+    }
+
+    this.data = newBuf;
     this.data.readerIndex(0);
     this.allocationValueCount = valueCount;
   }
 
 /**
  * Allocate new buffer with double capacity, and copy data into the new buffer. Replace vector's buffer with new buffer, and release old one
+ *
+ * @throws org.apache.drill.exec.memory.OutOfMemoryRuntimeException if it can't allocate the new buffer
  */
   public void reAlloc() {
     logger.info("Realloc vector {}. [{}] -> [{}]", field, allocationValueCount * ${type.width}, allocationValueCount * 2 * ${type.width});
     allocationValueCount *= 2;
     DrillBuf newBuf = allocator.buffer(allocationValueCount * ${type.width});
+    if (newBuf == null) {
+      throw new OutOfMemoryRuntimeException(
+      String.format("Failure while reallocating buffer to %d bytes",allocationValueCount * ${type.width}));
+    }
+
     newBuf.setBytes(0, data, 0, data.capacity());
     newBuf.setZero(newBuf.capacity() / 2, newBuf.capacity() / 2);
     newBuf.writerIndex(data.writerIndex());

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 7aa7415..659d99b 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -46,7 +46,7 @@ package org.apache.drill.exec.vector;
  *   The width of each element is ${type.width} byte(s)
  *   The equivalent Java primitive is '${minor.javaType!type.javaType}'
  *
- * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
+ * Source code generated using FreeMarker template ${.template_name}
  */
 @SuppressWarnings("unused")
 public final class ${minor.class}Vector extends BaseDataValueVector implements VariableWidthVector{
@@ -281,12 +281,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       allocationMonitor = 0;
     }
 
-    data = allocator.buffer(allocationTotalByteCount);
-    if(data == null){
+    DrillBuf newBuf = allocator.buffer(allocationTotalByteCount);
+    if(newBuf == null){
       return false;
     }
-    
+
+    this.data = newBuf;
     data.readerIndex(0);
+
     if(!offsetVector.allocateNewSafe()){
       return false;
     }
@@ -297,7 +299,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   public void allocateNew(int totalBytes, int valueCount) {
     clear();
     assert totalBytes >= 0;
-    data = allocator.buffer(totalBytes);
+    DrillBuf newBuf = allocator.buffer(totalBytes);
+    if(newBuf == null){
+      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes", totalBytes));
+    }
+
+    this.data = newBuf;
     data.readerIndex(0);
     allocationTotalByteCount = totalBytes;
     offsetVector.allocateNew(valueCount+1);
@@ -307,6 +314,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     public void reAlloc() {
       allocationTotalByteCount *= 2;
       DrillBuf newBuf = allocator.buffer(allocationTotalByteCount);
+      if(newBuf == null){
+        throw new OutOfMemoryRuntimeException(
+          String.format("Failure while reallocating buffer of %d bytes", allocationTotalByteCount));
+      }
+
       newBuf.setBytes(0, data, 0, data.capacity());
       data.release();
       data = newBuf;

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index b108924..cf4e9bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -27,6 +27,7 @@ import org.apache.calcite.schema.SchemaPlus;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -35,6 +36,7 @@ import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -149,6 +151,10 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     try {
       allocator = context.getAllocator().getChildAllocator(this, fragment.getMemInitial(), fragment.getMemMax(), true);
       Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
+    } catch(final OutOfMemoryException | OutOfMemoryRuntimeException e) {
+      throw UserException.memoryError(e)
+        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId())
+        .build();
     } catch(final Throwable e) {
       throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index f56dae3..6176f77 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
@@ -32,6 +33,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -47,6 +49,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
@@ -60,13 +63,11 @@ import com.google.common.collect.Maps;
  */
 public class ScanBatch implements CloseableRecordBatch {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
-
-  private static final int MAX_RECORD_CNT = Character.MAX_VALUE;
+  private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(ScanBatch.class);
 
   private final Map<MaterializedField.Key, ValueVector> fieldVectorMap = Maps.newHashMap();
 
   private final VectorContainer container = new VectorContainer();
-  private VectorContainer tempContainer;
   private int recordCount;
   private final FragmentContext context;
   private final OperatorContext oContext;
@@ -79,7 +80,6 @@ public class ScanBatch implements CloseableRecordBatch {
   private List<ValueVector> partitionVectors;
   private List<Integer> selectedPartitionColumns;
   private String partitionColumnDesignator;
-  private boolean first = true;
   private boolean done = false;
   private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
 
@@ -159,13 +159,14 @@ public class ScanBatch implements CloseableRecordBatch {
     if (done) {
       return IterOutcome.NONE;
     }
-    long t1 = System.nanoTime();
     oContext.getStats().startProcessing();
     try {
       try {
+        injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class);
+
         currentReader.allocate(fieldVectorMap);
-      } catch (OutOfMemoryException e) {
-        logger.debug("Caught OutOfMemoryException");
+      } catch (OutOfMemoryException | OutOfMemoryRuntimeException e) {
+        logger.debug("Caught Out of Memory Exception", e);
         for (ValueVector v : fieldVectorMap.values()) {
           v.clear();
         }
@@ -219,6 +220,9 @@ public class ScanBatch implements CloseableRecordBatch {
       } else {
         return IterOutcome.OK;
       }
+    } catch (OutOfMemoryRuntimeException ex) {
+      context.fail(UserException.memoryError(ex).build());
+      return IterOutcome.STOP;
     } catch (Exception ex) {
       logger.debug("Failed to read the batch. Stopping...", ex);
       context.fail(ex);
@@ -328,7 +332,7 @@ public class ScanBatch implements CloseableRecordBatch {
     @Override
     public boolean isNewSchema() {
       // Check if top level schema has changed, second condition checks if one of the deeper map schema has changed
-      if (schemaChange == true || callBack.getSchemaChange()) {
+      if (schemaChange || callBack.getSchemaChange()) {
         schemaChange = false;
         return true;
       }
@@ -353,9 +357,6 @@ public class ScanBatch implements CloseableRecordBatch {
 
   public void close() {
     container.clear();
-    if (tempContainer != null) {
-      tempContainer.clear();
-    }
     for (ValueVector v : partitionVectors) {
       v.clear();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 5b4d7bd..c31de66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.ops.AccountingUserConnection;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
@@ -80,6 +81,8 @@ public class ScreenCreator implements RootCreator<Screen>{
       IterOutcome outcome = next(incoming);
       logger.trace("Screen Outcome {}", outcome);
       switch (outcome) {
+      case OUT_OF_MEMORY:
+        throw new OutOfMemoryRuntimeException();
       case STOP:
         return false;
       case NONE:

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 67062f3..fe6239e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
@@ -94,6 +95,8 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       }
 //      logger.debug("Outcome of sender next {}", out);
       switch (out) {
+      case OUT_OF_MEMORY:
+        throw new OutOfMemoryRuntimeException();
       case STOP:
       case NONE:
         // if we didn't do anything yet, send an empty schema.

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 9f6bea9..c3e70f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -48,9 +48,7 @@ import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
@@ -61,10 +59,8 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.calcite.rel.RelFieldCollation.Direction;
-import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
 import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
 
@@ -72,8 +68,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNBatch.class);
 
   private static final long MAX_SORT_BYTES = 1L * 1024 * 1024 * 1024;
-  public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
-  public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
   private  final int batchPurgeThreshold;
 
   public final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
@@ -89,10 +83,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   private long countSincePurge;
   private int batchCount;
   private Copier copier;
-  private boolean schemaBuilt = false;
   private boolean first = true;
   private int recordCount = 0;
-  private boolean stop;
 
   public TopNBatch(TopN popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
@@ -153,7 +145,11 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         container.setRecordCount(0);
         return;
       case STOP:
-        stop = true;
+        state = BatchState.STOP;
+        return;
+      case OUT_OF_MEMORY:
+        state = BatchState.OUT_OF_MEMORY;
+        return;
       case NONE:
         state = BatchState.DONE;
       default:
@@ -198,6 +194,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
           break outer;
         case NOT_YET:
           throw new UnsupportedOperationException();
+        case OUT_OF_MEMORY:
         case STOP:
           return upstream;
         case OK_NEW_SCHEMA:

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 15fb7b5..28a99d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -42,7 +42,7 @@ import org.apache.drill.exec.vector.VarCharVector;
 
 /* Write the RecordBatch to the given RecordWriter. */
 public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WriterRecordBatch.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WriterRecordBatch.class);
 
   private EventBasedRecordWriter eventBasedRecordWriter;
   private RecordWriter recordWriter;
@@ -78,11 +78,6 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
 
   @Override
   public void buildSchema() throws SchemaChangeException {
-//    try {
-//      setupNewSchema();
-//    } catch (Exception e) {
-//      throw new SchemaChangeException(e);
-//    }
   }
 
   @Override
@@ -101,8 +96,9 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
         upstream = next(incoming);
 
         switch(upstream) {
+          case OUT_OF_MEMORY:
           case STOP:
-            return IterOutcome.STOP;
+            return upstream;
 
           case NOT_YET:
           case NONE:
@@ -124,7 +120,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
             throw new UnsupportedOperationException();
         }
       } while(upstream != IterOutcome.NONE);
-    }catch(Exception ex){
+    } catch(IOException ex) {
       logger.error("Failure during query", ex);
       kill(false);
       context.fail(ex);
@@ -154,7 +150,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
     container.setRecordCount(1);
   }
 
-  protected void setupNewSchema() throws Exception {
+  protected void setupNewSchema() throws IOException {
     try {
       // update the schema in RecordWriter
       stats.startSetup();

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 2f68faf..e1b5909 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -93,11 +93,20 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
   @Override
   public void buildSchema() throws SchemaChangeException {
-    if (next(incoming) == IterOutcome.NONE) {
-      state = BatchState.DONE;
-      container.buildSchema(SelectionVectorMode.NONE);
-      return;
+    IterOutcome outcome = next(incoming);
+    switch (outcome) {
+      case NONE:
+        state = BatchState.DONE;
+        container.buildSchema(SelectionVectorMode.NONE);
+        return;
+      case OUT_OF_MEMORY:
+        state = BatchState.OUT_OF_MEMORY;
+        return;
+      case STOP:
+        state = BatchState.STOP;
+        return;
     }
+
     if (!createAggregator()) {
       state = BatchState.DONE;
     }
@@ -115,33 +124,29 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
     if (aggregator.buildComplete() && !aggregator.allFlushed()) {
       // aggregation is complete and not all records have been output yet
-      IterOutcome outcome = aggregator.outputCurrentBatch();
-      return outcome;
+      return aggregator.outputCurrentBatch();
     }
 
     logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount());
 
-    while (true) {
-      AggOutcome out = aggregator.doWork();
-      logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
-      switch (out) {
-      case CLEANUP_AND_RETURN:
-        container.zeroVectors();
-        aggregator.cleanup();
-        state = BatchState.DONE;
-        // fall through
-      case RETURN_OUTCOME:
-        IterOutcome outcome = aggregator.getOutcome();
-        return aggregator.getOutcome();
-      case UPDATE_AGGREGATOR:
-        context.fail(UserException.unsupportedError()
-          .message("Hash aggregate does not support schema changes").build());
-        close();
-        killIncoming(false);
-        return IterOutcome.STOP;
-      default:
-        throw new IllegalStateException(String.format("Unknown state %s.", out));
-      }
+    AggOutcome out = aggregator.doWork();
+    logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
+    switch (out) {
+    case CLEANUP_AND_RETURN:
+      container.zeroVectors();
+      aggregator.cleanup();
+      state = BatchState.DONE;
+      // fall through
+    case RETURN_OUTCOME:
+      return aggregator.getOutcome();
+    case UPDATE_AGGREGATOR:
+      context.fail(UserException.unsupportedError()
+        .message("Hash aggregate does not support schema changes").build());
+      close();
+      killIncoming(false);
+      return IterOutcome.STOP;
+    default:
+      throw new IllegalStateException(String.format("Unknown state %s.", out));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 1b90dd8..e92de40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -309,6 +309,7 @@ public abstract class HashAggTemplate implements HashAggregator {
               logger.debug("Received IterOutcome of {}", out);
             }
             switch (out) {
+              case OUT_OF_MEMORY:
               case NOT_YET:
                 this.outcome = out;
                 return AggOutcome.RETURN_OUTCOME;

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 46b3721..b252971 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -96,11 +96,20 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
   @Override
   public void buildSchema() throws SchemaChangeException {
-    if (next(incoming) == IterOutcome.NONE) {
-      state = BatchState.DONE;
-      container.buildSchema(SelectionVectorMode.NONE);
-      return;
+    IterOutcome outcome = next(incoming);
+    switch (outcome) {
+      case NONE:
+        state = BatchState.DONE;
+        container.buildSchema(SelectionVectorMode.NONE);
+        return;
+      case OUT_OF_MEMORY:
+        state = BatchState.OUT_OF_MEMORY;
+        return;
+      case STOP:
+        state = BatchState.STOP;
+        return;
     }
+
     if (!createAggregator()) {
       state = BatchState.DONE;
     }
@@ -137,6 +146,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
           specialBatchSent = true;
           return IterOutcome.OK;
         }
+      case OUT_OF_MEMORY:
       case NOT_YET:
       case STOP:
         return outcome;
@@ -153,38 +163,37 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       }
     }
 
-    while (true) {
-      AggOutcome out = aggregator.doWork();
-      recordCount = aggregator.getOutputCount();
-      logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
-      switch (out) {
-      case CLEANUP_AND_RETURN:
-        if (!first) {
-          container.zeroVectors();
-        }
+    AggOutcome out = aggregator.doWork();
+    recordCount = aggregator.getOutputCount();
+    logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
+    switch (out) {
+    case CLEANUP_AND_RETURN:
+      if (!first) {
+        container.zeroVectors();
+      }
+      done = true;
+      // fall through
+    case RETURN_OUTCOME:
+      IterOutcome outcome = aggregator.getOutcome();
+      if (outcome == IterOutcome.NONE && first) {
+        first = false;
         done = true;
-        // fall through
-      case RETURN_OUTCOME:
-        IterOutcome outcome = aggregator.getOutcome();
-        if (outcome == IterOutcome.NONE && first) {
-          first = false;
-          done = true;
-          return IterOutcome.OK_NEW_SCHEMA;
-        } else if (outcome == IterOutcome.OK && first) {
-          outcome = IterOutcome.OK_NEW_SCHEMA;
-        }
+        return IterOutcome.OK_NEW_SCHEMA;
+      } else if (outcome == IterOutcome.OK && first) {
+        outcome = IterOutcome.OK_NEW_SCHEMA;
+      } else if (outcome != IterOutcome.OUT_OF_MEMORY) {
         first = false;
-        return outcome;
-      case UPDATE_AGGREGATOR:
-        context.fail(UserException.unsupportedError()
-          .message("Streaming aggregate does not support schema changes")
-          .build());
-        close();
-        killIncoming(false);
-        return IterOutcome.STOP;
-      default:
-        throw new IllegalStateException(String.format("Unknown state %s.", out));
       }
+      return outcome;
+    case UPDATE_AGGREGATOR:
+      context.fail(UserException.unsupportedError()
+        .message("Streaming aggregate does not support schema changes")
+        .build());
+      close();
+      killIncoming(false);
+      return IterOutcome.STOP;
+    default:
+      throw new IllegalStateException(String.format("Unknown state %s.", out));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 86f3100..0bbfd18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -97,6 +97,9 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
               } else {
                 break outer;
               }
+            case OUT_OF_MEMORY:
+              outcome = out;
+              return AggOutcome.RETURN_OUTCOME;
             case NONE:
               out = IterOutcome.OK_NEW_SCHEMA;
             case STOP:

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index d2282c8..c6a07f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.broadcastsender;
 import java.util.List;
 
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
@@ -97,6 +98,8 @@ public class BroadcastSenderRootExec extends BaseRootExec {
     RecordBatch.IterOutcome out = next(incoming);
     logger.debug("Outcome of sender next {}", out);
     switch(out){
+      case OUT_OF_MEMORY:
+        throw new OutOfMemoryRuntimeException();
       case STOP:
       case NONE:
         for (int i = 0; i < tunnels.length; ++i) {

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index 26f2657..cd2fbe9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.filter;
 import javax.inject.Named;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
@@ -64,7 +65,7 @@ public abstract class FilterTemplate2 implements Filterer{
       return;
     }
     if (! outgoingSelectionVector.allocateNew(recordCount)) {
-      throw new UnsupportedOperationException("Unable to allocate filter batch");
+      throw new OutOfMemoryRuntimeException("Unable to allocate filter batch");
     }
     switch(svMode){
     case NONE:

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index dd53477..56ce0ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -171,6 +171,12 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
   protected void buildSchema() throws SchemaChangeException {
     leftUpstream = next(left);
     rightUpstream = next(right);
+
+    if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
+      state = BatchState.OUT_OF_MEMORY;
+      return;
+    }
+
     // Initialize the hash join helper context
     hjHelper = new HashJoinHelper(context, oContext.getAllocator());
     try {
@@ -328,6 +334,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
 
       switch (rightUpstream) {
 
+      case OUT_OF_MEMORY:
       case NONE:
       case NOT_YET:
       case STOP:

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 6466f70..0430f1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -69,7 +69,7 @@ import com.sun.codemodel.JVar;
  */
 public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class);
 
   public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
   public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
@@ -148,6 +148,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
 
   public void buildSchema() throws SchemaChangeException {
     status.ensureInitial();
+
+    if (status.getLastLeft() == IterOutcome.OUT_OF_MEMORY || status.getLastRight() == IterOutcome.OUT_OF_MEMORY) {
+      state = BatchState.OUT_OF_MEMORY;
+      return;
+    }
+
     allocateBatch(true);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index aa4b300..4c86f5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -147,7 +147,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
       }
 
       boolean drainRight = true;
-      while (drainRight == true) {
+      while (drainRight) {
         rightUpstream = next(RIGHT_INPUT, right);
         switch (rightUpstream) {
           case OK_NEW_SCHEMA:
@@ -159,8 +159,11 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
           case OK:
             addBatchToHyperContainer(right);
             break;
+          case OUT_OF_MEMORY:
+            return IterOutcome.OUT_OF_MEMORY;
           case NONE:
           case STOP:
+            //TODO we got a STOP, shouldn't we stop immediately ?
           case NOT_YET:
             drainRight = false;
             break;
@@ -274,6 +277,11 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
       leftUpstream = next(LEFT_INPUT, left);
       rightUpstream = next(RIGHT_INPUT, right);
 
+      if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
+        state = BatchState.OUT_OF_MEMORY;
+        return;
+      }
+
       if (leftUpstream != IterOutcome.NONE) {
         leftSchema = left.getSchema();
         for (VectorWrapper vw : left) {

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index eff9e61..d9330ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -34,7 +34,7 @@ import com.google.common.collect.Lists;
 
 public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
 
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class);
 
   private SelectionVector2 outgoingSv;
   private SelectionVector2 incomingSv;
@@ -42,7 +42,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   private int recordsLeft;
   private boolean noEndLimit;
   private boolean skipBatch;
-  private boolean done = false;
   private boolean first = true;
   List<TransferPair> transfers = Lists.newArrayList();
 
@@ -90,14 +89,13 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
 
   @Override
   public IterOutcome innerNext() {
-    if (done) {
-      return IterOutcome.NONE;
-    }
-
     if(!first && !noEndLimit && recordsLeft <= 0) {
       incoming.kill(true);
 
       IterOutcome upStream = next(incoming);
+      if (upStream == IterOutcome.OUT_OF_MEMORY) {
+        return upStream;
+      }
 
       while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
 
@@ -106,10 +104,14 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
           wrapper.getValueVector().clear();
         }
         upStream = next(incoming);
+        if (upStream == IterOutcome.OUT_OF_MEMORY) {
+          return upStream;
+        }
       }
 
       return IterOutcome.NONE;
     }
+
     return super.innerNext();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index ce683cb..f19f371 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -75,7 +75,6 @@ import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.calcite.rel.RelFieldCollation.Direction;
-import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 
 import parquet.Preconditions;
 
@@ -88,10 +87,8 @@ import com.sun.codemodel.JExpr;
  * The MergingRecordBatch merges pre-sorted record batches from remote senders.
  */
 public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> implements RecordBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingRecordBatch.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingRecordBatch.class);
 
-  private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
-  private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
   private static final int OUTGOING_BATCH_SIZE = 32 * 1024;
 
   private RecordBatchLoader[] batchLoaders;
@@ -170,7 +167,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
       prevBatchWasFull = false;
     }
 
-    if (hasMoreIncoming == false) {
+    if (!hasMoreIncoming) {
       logger.debug("next() was called after all values have been processed");
       outgoingPosition = 0;
       return IterOutcome.NONE;

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 16d1400..1872a51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
@@ -169,6 +170,9 @@ public class PartitionSenderRootExec extends BaseRootExec {
         }
         return false;
 
+      case OUT_OF_MEMORY:
+        throw new OutOfMemoryRuntimeException();
+
       case STOP:
         if (partitioner != null) {
           partitioner.clear();

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 35bf3cd..bca9622 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingDeque;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.ProducerConsumer;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
@@ -38,7 +39,7 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
 
 public class ProducerConsumerBatch extends AbstractRecordBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProducerConsumerBatch.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProducerConsumerBatch.class);
 
   private final RecordBatch incoming;
   private final Thread producer = new Thread(new Producer(), Thread.currentThread().getName() + " - Producer Thread");
@@ -67,7 +68,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
       wrapper = queue.take();
       logger.debug("Got batch from queue");
     } catch (final InterruptedException e) {
-      if (!context.shouldContinue()) {
+      if (context.shouldContinue()) {
         context.fail(e);
       }
       return IterOutcome.STOP;
@@ -79,6 +80,8 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
       return IterOutcome.NONE;
     } else if (wrapper.failed) {
       return IterOutcome.STOP;
+    } else if (wrapper.outOfMemory) {
+      throw new OutOfMemoryRuntimeException();
     }
 
     recordCount = wrapper.batch.getRecordCount();
@@ -131,12 +134,15 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
             case NONE:
               stop = true;
               break outer;
+            case OUT_OF_MEMORY:
+              queue.putFirst(RecordBatchDataWrapper.outOfMemory());
+              return;
             case STOP:
-              queue.putFirst(new RecordBatchDataWrapper(null, false, true));
+              queue.putFirst(RecordBatchDataWrapper.failed());
               return;
             case OK_NEW_SCHEMA:
             case OK:
-              wrapper = new RecordBatchDataWrapper(new RecordBatchData(incoming), false, false);
+              wrapper = RecordBatchDataWrapper.batch(new RecordBatchData(incoming));
               queue.put(wrapper);
               wrapper = null;
               break;
@@ -144,6 +150,13 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
               throw new UnsupportedOperationException();
           }
         }
+      } catch (final OutOfMemoryRuntimeException e) {
+        try {
+          queue.putFirst(RecordBatchDataWrapper.outOfMemory());
+        } catch (final InterruptedException ex) {
+          logger.error("Unable to enqueue the last batch indicator. Something is broken.", ex);
+          // TODO InterruptedException
+        }
       } catch (final InterruptedException e) {
         logger.warn("Producer thread is interrupted.", e);
         // TODO InterruptedException
@@ -151,7 +164,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
         if (stop) {
           try {
             clearQueue();
-            queue.put(new RecordBatchDataWrapper(null, true, false));
+            queue.put(RecordBatchDataWrapper.finished());
           } catch (final InterruptedException e) {
             logger.error("Unable to enqueue the last batch indicator. Something is broken.", e);
             // TODO InterruptedException
@@ -206,14 +219,32 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
   }
 
   private static class RecordBatchDataWrapper {
-    RecordBatchData batch;
-    boolean finished;
-    boolean failed;
+    final RecordBatchData batch;
+    final boolean finished;
+    final boolean failed;
+    final boolean outOfMemory;
 
-    RecordBatchDataWrapper(final RecordBatchData batch, final boolean finished, final boolean failed) {
+    RecordBatchDataWrapper(final RecordBatchData batch, final boolean finished, final boolean failed, final boolean outOfMemory) {
       this.batch = batch;
       this.finished = finished;
       this.failed = failed;
+      this.outOfMemory = outOfMemory;
+    }
+
+    public static RecordBatchDataWrapper batch(final RecordBatchData batch) {
+      return new RecordBatchDataWrapper(batch, false, false, false);
+    }
+
+    public static RecordBatchDataWrapper finished() {
+      return new RecordBatchDataWrapper(null, true, false, false);
+    }
+
+    public static RecordBatchDataWrapper failed() {
+      return new RecordBatchDataWrapper(null, false, true, false);
+    }
+
+    public static RecordBatchDataWrapper outOfMemory() {
+      return new RecordBatchDataWrapper(null, false, false, true);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 7b9fffb..32ffb6f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -30,7 +30,6 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.FunctionCallFactory;
 import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.ValueExpressions;
@@ -80,7 +79,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   private boolean hasRemainder = false;
   private int remainderIndex = 0;
   private int recordCount;
-  private final boolean buildingSchema = true;
 
   private static final String EMPTY_STRING = "";
   private boolean first = true;
@@ -144,7 +142,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         IterOutcome next = null;
         while (incomingRecordCount == 0) {
           next = next(incoming);
-          if (next != IterOutcome.OK && next != IterOutcome.OK_NEW_SCHEMA) {
+          if (next == IterOutcome.OUT_OF_MEMORY) {
+            outOfMemory = true;
+            return next;
+          } else if (next != IterOutcome.OK && next != IterOutcome.OK_NEW_SCHEMA) {
             return next;
           }
           incomingRecordCount = incoming.getRecordCount();
@@ -255,13 +256,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
 
   /** hack to make ref and full work together... need to figure out if this is still necessary. **/
   private FieldReference getRef(final NamedExpression e) {
-    final FieldReference ref = e.getRef();
-    final PathSegment seg = ref.getRootSegment();
-
-//    if (seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())) {
-//      return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition());
-//    }
-    return ref;
+    return e.getRef();
   }
 
   private boolean isAnyWildcard(final List<NamedExpression> exprs) {
@@ -321,7 +316,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
             int k = 0;
             for (final VectorWrapper<?> wrapper : incoming) {
               final ValueVector vvIn = wrapper.getValueVector();
-              final SchemaPath originalPath = vvIn.getField().getPath();
               if (k > result.outputNames.size()-1) {
                 assert false;
               }

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 74b7d85..8748aaf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -43,13 +43,12 @@ import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.calcite.rel.RelFieldCollation.Direction;
-import org.apache.calcite.rel.RelFieldCollation.NullDirection;
 
 import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
 
 public class SortBatch extends AbstractRecordBatch<Sort> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatch.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatch.class);
 
   public final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
   public final MappingSet leftMapping = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
@@ -107,6 +106,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
           break outer;
         case NOT_YET:
           throw new UnsupportedOperationException();
+        case OUT_OF_MEMORY:
         case STOP:
           return upstream;
         case OK_NEW_SCHEMA:

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index aa9297e..57e7b55 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -17,9 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.svremover;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.PrintStream;
 import java.util.List;
 
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -36,7 +34,6 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.util.BatchPrinter;
 import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -44,13 +41,12 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVectorRemover>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class);
 
   private Copier copier;
   private int recordCount;
   private boolean hasRemainder;
   private int remainderIndex;
-  private boolean first;
 
   public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context, incoming);
@@ -248,7 +244,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
 
     for(VectorWrapper<?> vv : batch){
       ValueVector v = vv.getValueVectors()[0];
-      TransferPair tp = v.makeTransferPair(container.addOrGet(v.getField()));
+      v.makeTransferPair(container.addOrGet(v.getField()));
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index af45815..78e83d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -108,8 +108,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
     } else {
       sv = null;
     }
-    WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(), incoming, incomingHasSv2 ? true
-        : false);
+    WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(), incoming, incomingHasSv2);
     VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, oContext.getAllocator());
 
     try {

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 66bc3e3..445568b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.record.AbstractRecordBatch;
@@ -143,7 +144,9 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
 
   private boolean doAlloc() {
     for (ValueVector v : allocationVectors) {
-      if(!AllocationHelper.allocateNew(v, current.getRecordCount())) {
+      try {
+        AllocationHelper.allocateNew(v, current.getRecordCount());
+      } catch (OutOfMemoryRuntimeException ex) {
         return false;
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index 86d11d5..428632f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -37,7 +37,6 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
-import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.WindowPOP;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
@@ -128,6 +127,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
         case NONE:
           noMoreBatches = true;
           break;
+        case OUT_OF_MEMORY:
         case NOT_YET:
         case STOP:
           return upstream;
@@ -160,7 +160,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
     // process a saved batch
     try {
       framer.doWork();
-    } catch (DrillException | OutOfMemoryRuntimeException e) {
+    } catch (DrillException e) {
       context.fail(e);
       if (framer != null) {
         framer.cleanup();
@@ -179,10 +179,18 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
   @Override
   protected void buildSchema() throws SchemaChangeException {
     logger.trace("buildSchema()");
-    if (next(incoming) == IterOutcome.NONE) {
-      state = BatchState.DONE;
-      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-      return;
+    IterOutcome outcome = next(incoming);
+    switch (outcome) {
+      case NONE:
+        state = BatchState.DONE;
+        container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+        return;
+      case STOP:
+        state = BatchState.STOP;
+        return;
+      case OUT_OF_MEMORY:
+        state = BatchState.OUT_OF_MEMORY;
+        return;
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/drill/blob/1c09c2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index d08c86c..aab3391 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -44,6 +44,7 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.ExternalSort;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
@@ -185,12 +186,16 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         }
         container.buildSchema(SelectionVectorMode.NONE);
         container.setRecordCount(0);
-        return;
+        break;
       case STOP:
+        state = BatchState.STOP;
+        break;
+      case OUT_OF_MEMORY:
+        state = BatchState.OUT_OF_MEMORY;
+        break;
       case NONE:
         state = BatchState.DONE;
-      default:
-        return;
+        break;
     }
   }
 
@@ -273,7 +278,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
             try {
               sv2 = newSV2();
             } catch (OutOfMemoryException e) {
-              throw new RuntimeException(e);
+              throw new OutOfMemoryRuntimeException(e);
             }
           }
           int count = sv2.getCount();
@@ -316,11 +321,15 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 //          logger.debug("Took {} us to sort {} records", t, count);
           break;
         case OUT_OF_MEMORY:
+          logger.debug("received OUT_OF_MEMORY, trying to spill");
           highWaterMark = totalSizeInMemory;
           if (batchesSinceLastSpill > 2) {
             spilledBatchGroups.add(mergeAndSpill(batchGroups));
+            batchesSinceLastSpill = 0;
+          } else {
+            logger.debug("not enough batches to spill, sending OUT_OF_MEMORY downstream");
+            return IterOutcome.OUT_OF_MEMORY;
           }
-          batchesSinceLastSpill = 0;
           break;
         default:
           throw new UnsupportedOperationException();