You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/06 23:42:03 UTC

[03/15] DRILL-620: Memory consumption fixes

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
index c6632cb..3db5e7f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
@@ -37,8 +37,10 @@ import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.ValueVector;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -73,6 +75,8 @@ public class TestSimpleFilter extends ExecTest {
       assertEquals(50, exec.getRecordCount());
     }
 
+    exec.stop();
+
     if(context.getFailureCause() != null){
       throw context.getFailureCause();
     }
@@ -100,6 +104,7 @@ public class TestSimpleFilter extends ExecTest {
       }
       recordCount += exec.getSelectionVector4().getCount();
     }
+    exec.stop();
     assertEquals(50, recordCount);
 
     if(context.getFailureCause() != null){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
index f98015b..79ce550 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
@@ -98,6 +98,7 @@ public class TestHashJoin extends PopUnitTestBase{
             bitContext.getMetrics(); result = new MetricRegistry();
             bitContext.getAllocator(); result = new TopLevelAllocator();
             bitContext.getOperatorCreatorRegistry(); result = new OperatorCreatorRegistry(c);
+            bitContext.getConfig(); result = c;
         }};
 
         PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
@@ -110,6 +111,7 @@ public class TestHashJoin extends PopUnitTestBase{
         while (exec.next()) {
             totalRecordCount += exec.getRecordCount();
         }
+        exec.stop();
         assertEquals(expectedRows, totalRecordCount);
         System.out.println("Total Record Count: " + totalRecordCount);
         if (context.getFailureCause() != null)
@@ -140,8 +142,7 @@ public class TestHashJoin extends PopUnitTestBase{
     }
 
     @Test
-    public void simpleEqualityJoin(@Injectable final DrillbitContext bitContext,
-                                   @Injectable UserServer.UserClientConnection connection) throws Throwable {
+    public void simpleEqualityJoin() throws Throwable {
 
         // Function checks for casting from Float, Double to Decimal data types
         try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
index 02bbdf9..b9e8f6f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
@@ -42,7 +42,9 @@ import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.VectorUtil;
 import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -74,19 +76,18 @@ public class TestSimpleProjection extends ExecTest {
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
 
     while(exec.next()){
-      BigIntVector c1 = exec.getValueVectorById(new SchemaPath("col1", ExpressionPosition.UNKNOWN), BigIntVector.class);
-      BigIntVector c2 = exec.getValueVectorById(new SchemaPath("col2", ExpressionPosition.UNKNOWN), BigIntVector.class);
+      VectorUtil.showVectorAccessibleContent(exec.getIncoming(), "\t");
+      NullableBigIntVector c1 = exec.getValueVectorById(new SchemaPath("col1", ExpressionPosition.UNKNOWN), NullableBigIntVector.class);
+      NullableBigIntVector c2 = exec.getValueVectorById(new SchemaPath("col2", ExpressionPosition.UNKNOWN), NullableBigIntVector.class);
       int x = 0;
-      BigIntVector.Accessor a1, a2;
+      NullableBigIntVector.Accessor a1, a2;
       a1 = c1.getAccessor();
       a2 = c2.getAccessor();
 
       for(int i =0; i < c1.getAccessor().getValueCount(); i++){
-        assertEquals(a1.get(i)+1, a2.get(i));
-        x += a1.get(i);
+        if (!a1.isNull(i)) assertEquals(a1.get(i)+1, a2.get(i));
+        x += a1.isNull(i) ? 0 : a1.get(i);
       }
-
-      System.out.println(x);
     }
 
     if(context.getFailureCause() != null){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
index f115c44..b2c5b19 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
@@ -79,6 +79,8 @@ public class TestTraceMultiRecordBatch extends ExecTest {
         while(exec.next()) {
         }
 
+        exec.stop();
+
         if(context.getFailureCause() != null){
             throw context.getFailureCause();
         }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
index f42efd4..c768296 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
@@ -93,6 +93,8 @@ public class TestTraceOutputDump extends ExecTest {
         while(exec.next()){
         }
 
+        exec.stop();
+
         if(context.getFailureCause() != null){
             throw context.getFailureCause();
         }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
index b17f7e7..5de0ad7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
@@ -131,7 +131,6 @@ public class TestSimpleExternalSort extends PopUnitTestBase {
 
 
         BigIntVector.Accessor a1 = c1.getAccessor();
-//        IntVector.Accessor a2 = c2.getAccessor();
 
         for(int i =0; i < c1.getAccessor().getValueCount(); i++){
           recordCount++;
@@ -147,4 +146,54 @@ public class TestSimpleExternalSort extends PopUnitTestBase {
     }
   }
 
+  @Test
+  public void outOfMemoryExternalSort() throws Throwable{
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    DrillConfig config = DrillConfig.create("drill-oom-xsort.conf");
+
+    try(Drillbit bit1 = new Drillbit(config, serviceSet);
+        DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) {
+
+      bit1.run();
+      client.connect();
+      List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
+              Files.toString(FileUtils.getResourceAsFile("/xsort/oom_sort_test.json"),
+                      Charsets.UTF_8));
+      int count = 0;
+      for(QueryResultBatch b : results) {
+        if (b.getHeader().getRowCount() != 0)
+          count += b.getHeader().getRowCount();
+      }
+      assertEquals(10000000, count);
+
+      long previousBigInt = Long.MAX_VALUE;
+
+      int recordCount = 0;
+      int batchCount = 0;
+
+      for (QueryResultBatch b : results) {
+        if (b.getHeader().getRowCount() == 0) break;
+        batchCount++;
+        RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator());
+        loader.load(b.getHeader().getDef(),b.getData());
+        BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldId(), BigIntVector.class).getValueVector();
+
+
+        BigIntVector.Accessor a1 = c1.getAccessor();
+
+        for(int i =0; i < c1.getAccessor().getValueCount(); i++){
+          recordCount++;
+          assertTrue(String.format("%d < %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i));
+          previousBigInt = a1.get(i);
+        }
+        assertTrue(String.format("%d == %d", a1.get(0), a1.get(a1.getValueCount() - 1)), a1.get(0) != a1.get(a1.getValueCount() - 1));
+        loader.clear();
+        b.release();
+      }
+
+      System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount));
+
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
index f19d616..9a1eb94 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.physical.PhysicalPlan;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
index d79735b..788d7f1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
@@ -59,11 +59,11 @@ public class TestValueVector extends ExecTest {
     v.allocateNew(1024);
 
     // Put and set a few values
-    m.set(0, 100);
-    m.set(1, 101);
-    m.set(100, 102);
-    m.set(1022, 103);
-    m.set(1023, 104);
+    m.setSafe(0, 100);
+    m.setSafe(1, 101);
+    m.setSafe(100, 102);
+    m.setSafe(1022, 103);
+    m.setSafe(1023, 104);
     assertEquals(100, v.getAccessor().get(0));
     assertEquals(101, v.getAccessor().get(1));
     assertEquals(102, v.getAccessor().get(100));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
index 0e06af1..3b8b57b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestOrphanSchema.java
@@ -28,6 +28,7 @@ import net.hydromatic.optiq.SchemaPlus;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
@@ -41,12 +42,14 @@ import org.apache.drill.exec.store.ischema.RowRecordReader;
 import org.apache.drill.exec.vector.ValueVector;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
  * Using an orphan schema, create and display the various information schema tables.
  * An "orphan schema" is a stand alone schema which is not (yet?) connected to Optiq.
  */
+@Ignore // I think we should remove these tests. They are too difficult to maintain.
 public class TestOrphanSchema extends ExecTest {
   static SchemaPlus root;
 
@@ -56,33 +59,33 @@ public class TestOrphanSchema extends ExecTest {
   }
 
   @Test
-  public void testTables() {
+  public void testTables() throws OutOfMemoryException {
     displayTable(new InfoSchemaTable.Tables(), new OptiqProvider.Tables(root));
   }
 
   @Test
-  public void testSchemata() {
+  public void testSchemata() throws OutOfMemoryException {
     displayTable(new InfoSchemaTable.Schemata(), new OptiqProvider.Schemata(root));
   }
 
 
   @Test
-  public void testViews() {
+  public void testViews() throws OutOfMemoryException {
     displayTable(new InfoSchemaTable.Views(), new OptiqProvider.Views(root));
   }
 
   @Test
-  public void testCatalogs() {
+  public void testCatalogs() throws OutOfMemoryException {
     displayTable(new InfoSchemaTable.Catalogs(), new OptiqProvider.Catalogs(root));
   }
 
   @Test
-  public void testColumns() {
+  public void testColumns() throws OutOfMemoryException {
     displayTable(new InfoSchemaTable.Columns(), new OptiqProvider.Columns(root));
   }
 
 
-  private void displayTable(FixedTable table, RowProvider provider) {
+  private void displayTable(FixedTable table, RowProvider provider) throws OutOfMemoryException {
 
     // Set up a mock context
     FragmentContext context = mock(FragmentContext.class);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
index e1ed53a..8da1ea4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ischema/TestTableProvider.java
@@ -27,6 +27,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
@@ -40,26 +41,27 @@ import org.junit.Test;
 /**
  * Using a test table with two columns, create data and verify the values are in the record batch.
  */
+@Ignore
 public class TestTableProvider extends ExecTest {
 
   @Test
-  public void zeroRead() {
+  public void zeroRead() throws OutOfMemoryException {
     readTestTable(0);
   }
 
   @Test
-  public void oneRead() {
+  public void oneRead() throws OutOfMemoryException {
     readTestTable(1);
   }
 
   @Test
-  public void smallRead() {
+  public void smallRead() throws OutOfMemoryException {
     readTestTable(10);
   }
 
   @Test
   @Ignore // due to out of heap space
-  public void largeRead() {
+  public void largeRead() throws OutOfMemoryException {
     readTestTable(1024*1024);
   }
 
@@ -68,7 +70,7 @@ public class TestTableProvider extends ExecTest {
    * Read record batches from the test table and verify the contents.
    * @param nrRows - the total number of rows expected.
    */
-  private void readTestTable(int nrRows) {
+  private void readTestTable(int nrRows) throws OutOfMemoryException {
 
     // Mock up a context with a BufferAllocator
     FragmentContext context = mock(FragmentContext.class);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JSONRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JSONRecordReaderTest.java
index c3e7491..9887536 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JSONRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/JSONRecordReaderTest.java
@@ -40,6 +40,8 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
@@ -50,8 +52,7 @@ import org.apache.drill.exec.store.easy.json.JSONRecordReader;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.*;
 
 import com.google.common.collect.Lists;
 
@@ -59,13 +60,29 @@ import com.google.common.collect.Lists;
 public class JSONRecordReaderTest extends ExecTest {
   private static final Charset UTF_8 = Charset.forName("UTF-8");
 
+  private static MockOutputMutator mutator = new MockOutputMutator();
+
   private String getResource(String resourceName) {
     return "resource:" + resourceName;
   }
 
-  class MockOutputMutator implements OutputMutator {
-    List<MaterializedField> removedFields = Lists.newArrayList();
+  @After
+  public void setup() {
+    for (ValueVector v: mutator.getAddFields()) {
+      v.clear();
+    }
+    mutator.removeAllFields();
+    mutator.removedFields.clear();
+  }
+   @AfterClass
+   public static void cleanup() {
+     mutator.close();
+   }
+
+  static class MockOutputMutator implements OutputMutator {
+    public List<MaterializedField> removedFields = Lists.newArrayList();
     List<ValueVector> addFields = Lists.newArrayList();
+    private BufferAllocator allocator = new TopLevelAllocator();
 
     @Override
     public void removeField(MaterializedField field) throws SchemaChangeException {
@@ -96,7 +113,14 @@ public class JSONRecordReaderTest extends ExecTest {
 
     @Override
     public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
-      return null;
+      ValueVector v = TypeHelper.getNewVector(field, allocator);
+      if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+      addField(v);
+      return (T) v;
+    }
+
+    public void close() {
+      allocator.close();
     }
   }
 
@@ -137,17 +161,10 @@ public class JSONRecordReaderTest extends ExecTest {
   @Test
   public void testSameSchemaInSameBatch(@Injectable final FragmentContext context) throws IOException,
       ExecutionSetupException {
-    new Expectations() {
-      {
-        context.getAllocator();
-        returns(new TopLevelAllocator());
-      }
-    };
     JSONRecordReader jr = new JSONRecordReader(context,
         FileUtils.getResourceAsFile("/scan_json_test_1.json").toURI().toString(),
         FileSystem.getLocal(new Configuration()), null);
 
-    MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
     jr.setup(mutator);
     assertEquals(2, jr.next());
@@ -166,18 +183,11 @@ public class JSONRecordReaderTest extends ExecTest {
   @Test
   public void testChangedSchemaInSameBatch(@Injectable final FragmentContext context) throws IOException,
       ExecutionSetupException {
-    new Expectations() {
-      {
-        context.getAllocator();
-        returns(new TopLevelAllocator());
-      }
-    };
 
     JSONRecordReader jr = new JSONRecordReader(context,
         FileUtils.getResourceAsFile("/scan_json_test_2.json").toURI().toString(),
         FileSystem.getLocal(new Configuration()), null);
 
-    MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
 
     jr.setup(mutator);
@@ -207,18 +217,11 @@ public class JSONRecordReaderTest extends ExecTest {
   @Test
   public void testChangedSchemaInTwoBatchesColumnSelect(@Injectable final FragmentContext context) throws IOException,
       ExecutionSetupException {
-    new Expectations() {
-      {
-        context.getAllocator();
-        returns(new TopLevelAllocator());
-      }
-    };
 
     JSONRecordReader jr = new JSONRecordReader(context,
         FileUtils.getResourceAsFile("/scan_json_test_2.json").toURI().toString(),
         FileSystem.getLocal(new Configuration()),
         64, Arrays.asList(new SchemaPath("test", ExpressionPosition.UNKNOWN))); // batch only fits 1 int
-    MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
     List<MaterializedField> removedFields = mutator.getRemovedFields();
 
@@ -242,18 +245,11 @@ public class JSONRecordReaderTest extends ExecTest {
   @Test
   public void testChangedSchemaInTwoBatches(@Injectable final FragmentContext context) throws IOException,
       ExecutionSetupException {
-    new Expectations() {
-      {
-        context.getAllocator();
-        returns(new TopLevelAllocator());
-      }
-    };
 
     JSONRecordReader jr = new JSONRecordReader(context,
         FileUtils.getResourceAsFile("/scan_json_test_2.json").toURI().toString(),
         FileSystem.getLocal(new Configuration()),
         64, null); // batch only fits 1 int
-    MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
     List<MaterializedField> removedFields = mutator.getRemovedFields();
 
@@ -302,18 +298,11 @@ public class JSONRecordReaderTest extends ExecTest {
   @Test
   @Ignore // until repeated map
   public void testNestedFieldInSameBatch(@Injectable final FragmentContext context) throws ExecutionSetupException, IOException {
-    new Expectations() {
-      {
-        context.getAllocator();
-        returns(new TopLevelAllocator());
-      }
-    };
 
     JSONRecordReader jr = new JSONRecordReader(context,
         FileUtils.getResourceAsFile("/scan_json_test_3.json").toURI().toString(),
         FileSystem.getLocal(new Configuration()), null);
 
-    MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
     jr.setup(mutator);
     assertEquals(2, jr.next());
@@ -332,18 +321,11 @@ public class JSONRecordReaderTest extends ExecTest {
   @Test
   @Ignore // until repeated map is added.
   public void testRepeatedFields(@Injectable final FragmentContext context) throws ExecutionSetupException, IOException {
-    new Expectations() {
-      {
-        context.getAllocator();
-        returns(new TopLevelAllocator());
-      }
-    };
 
     JSONRecordReader jr = new JSONRecordReader(context,
         FileUtils.getResourceAsFile("/scan_json_test_4.json").toURI().toString(),
         FileSystem.getLocal(new Configuration()), null);
 
-    MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
     jr.setup(mutator);
     assertEquals(2, jr.next());
@@ -365,18 +347,11 @@ public class JSONRecordReaderTest extends ExecTest {
 
   @Test
   public void testRepeatedMissingFields(@Injectable final FragmentContext context) throws ExecutionSetupException, IOException {
-    new Expectations() {
-      {
-        context.getAllocator();
-        returns(new TopLevelAllocator());
-      }
-    };
 
     JSONRecordReader jr = new JSONRecordReader(context,
         FileUtils.getResourceAsFile("/scan_json_test_5.json").toURI().toString(),
         FileSystem.getLocal(new Configuration()), null);
 
-    MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
     jr.setup(mutator);
     assertEquals(9, jr.next());
@@ -398,18 +373,11 @@ public class JSONRecordReaderTest extends ExecTest {
 
   @Test
   public void testJsonArrayandNormalFields(@Injectable final FragmentContext context) throws ExecutionSetupException, IOException {
-    new Expectations() {
-      {
-        context.getAllocator();
-        returns(new TopLevelAllocator());
-      }
-    };
 
     JSONRecordReader jr = new JSONRecordReader(context,
         FileUtils.getResourceAsFile("/scan_json_test_7.json").toURI().toString(),
         FileSystem.getLocal(new Configuration()), null);
 
-    MockOutputMutator mutator = new MockOutputMutator();
     List<ValueVector> addFields = mutator.getAddFields();
     jr.setup(mutator);
     assertEquals(2, jr.next());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java
new file mode 100644
index 0000000..d86b5db
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.NullableVarCharVector.Accessor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Random;
+
+public class TestAdaptiveAllocation {
+
+  @Test
+  public void test() throws Exception {
+    BufferAllocator allocator = new TopLevelAllocator();
+    MaterializedField field = MaterializedField.create("field", Types.required(MinorType.VARCHAR));
+    VarBinaryVector varBinaryVector = new VarBinaryVector(field, allocator);
+
+    Random rand = new Random();
+//    int valuesToWrite = rand.nextInt(4000) + 1000;
+//    int bytesToWrite = rand.nextInt(100);
+    int valuesToWrite = 100;
+    int bytesToWrite = 1;
+//    System.out.println("value: " + valuesToWrite);
+//    System.out.println("bytes: " + bytesToWrite);
+
+    byte[] value = new byte[bytesToWrite];
+
+    for (int i = 0; i < 10000; i++) {
+      varBinaryVector.allocateNew();
+//      System.out.println("Value Capacity: " + varBinaryVector.getValueCapacity());
+//      System.out.println("Byte Capacity: " + varBinaryVector.getByteCapacity());
+      int offset = 0;
+      int j = 0;
+      for (j = 0; j < valuesToWrite; j++) {
+        if (!varBinaryVector.getMutator().setSafe(j - offset, value)) {
+          varBinaryVector.getMutator().setValueCount(j - offset);
+          offset = j;
+          varBinaryVector.allocateNew();
+//          System.out.println("Value Capacity: " + varBinaryVector.getValueCapacity());
+//          System.out.println("Byte Capacity: " + varBinaryVector.getByteCapacity());
+        }
+      }
+      varBinaryVector.getMutator().setValueCount(j - offset);
+    }
+    varBinaryVector.allocateNew();
+    System.out.println(varBinaryVector.getValueCapacity());
+    System.out.println(varBinaryVector.getByteCapacity());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java
new file mode 100644
index 0000000..4b3aa8a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.NullableVarCharVector.Accessor;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSplitAndTransfer {
+
+  @Test
+  public void test() throws Exception {
+    BufferAllocator allocator = new TopLevelAllocator();
+    MaterializedField field = MaterializedField.create("field", Types.optional(MinorType.VARCHAR));
+    NullableVarCharVector varCharVector = new NullableVarCharVector(field, allocator);
+    varCharVector.allocateNew(10000, 1000);
+
+    String[] compareArray = new String[500];
+
+    for (int i = 0; i < 500; i += 3) {
+      String s = String.format("%010d", i);
+      varCharVector.getMutator().set(i, s.getBytes());
+      compareArray[i] = s;
+    }
+    varCharVector.getMutator().setValueCount(500);
+
+    TransferPair tp = varCharVector.getTransferPair();
+    NullableVarCharVector newVarCharVector = (NullableVarCharVector) tp.getTo();
+    Accessor accessor = newVarCharVector.getAccessor();
+    int[][] startLengths = {{0, 201}, {201, 200}, {401, 99}};
+
+    for (int[] startLength : startLengths) {
+      int start = startLength[0];
+      int length = startLength[1];
+      tp.splitAndTransfer(start, length);
+      newVarCharVector.getMutator().setValueCount(length);
+      for (int i = 0; i < length; i++) {
+        boolean expectedSet = ((start + i) % 3) == 0;
+        if (expectedSet) {
+          byte[] expectedValue = compareArray[start + i].getBytes();
+          Assert.assertFalse(accessor.isNull(i));
+//          System.out.println(new String(accessor.get(i)));
+          Assert.assertArrayEquals(expectedValue, accessor.get(i));
+        } else {
+          Assert.assertTrue(accessor.isNull(i));
+        }
+      }
+      newVarCharVector.clear();
+    }
+
+    varCharVector.clear();
+    allocator.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/resources/drill-oom-xsort.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/drill-oom-xsort.conf b/exec/java-exec/src/test/resources/drill-oom-xsort.conf
new file mode 100644
index 0000000..c617a29
--- /dev/null
+++ b/exec/java-exec/src/test/resources/drill-oom-xsort.conf
@@ -0,0 +1,18 @@
+//  This file tells Drill to consider this module when class path scanning.
+//  This file can also include any supplementary configuration information.
+//  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl"
+
+drill.exec: {
+  memory: {
+    fragment: {
+      max: 50000000,
+      initial: 2000000
+    },
+    operator: {
+      max: 30000000,
+      initial: 2000000
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/resources/project/test1.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/project/test1.json b/exec/java-exec/src/test/resources/project/test1.json
index 2a7c935..3a84fd0 100644
--- a/exec/java-exec/src/test/resources/project/test1.json
+++ b/exec/java-exec/src/test/resources/project/test1.json
@@ -14,8 +14,9 @@
             entries:[
             	{records: 100, types: [
             	  {name: "blue", type: "INT", mode: "REQUIRED"},
-            	  {name: "red", type: "BIGINT", mode: "REQUIRED"},
-            	  {name: "green", type: "INT", mode: "REQUIRED"}
+            	  {name: "red", type: "BIGINT", mode: "OPTIONAL"},
+            	  {name: "green", type: "INT", mode: "REQUIRED"},
+            	  {name: "orange", type: "VARCHAR", mode: "OPTIONAL"}
             	]}
             ]
         },
@@ -25,7 +26,9 @@
             pop:"project",
             exprs: [
               { ref: "col1", expr:"red + 1" },
-              { ref: "col2", expr:"red + 2" }
+              { ref: "col2", expr:"red + 2" },
+              { ref: "col3", expr:"orange"},
+              { ref: "col4", expr:"orange"}
             ]
         },
         {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/exec/java-exec/src/test/resources/xsort/oom_sort_test.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/xsort/oom_sort_test.json b/exec/java-exec/src/test/resources/xsort/oom_sort_test.json
new file mode 100644
index 0000000..af5bc43
--- /dev/null
+++ b/exec/java-exec/src/test/resources/xsort/oom_sort_test.json
@@ -0,0 +1,57 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+	graph:[
+        {
+            @id:1,
+            pop:"mock-scan",
+            url: "http://apache.org",
+            entries:[
+                {records: 10000000, types: [
+                  {name: "green", type: "BIGINT", mode: "REQUIRED"}
+                ]}
+            ]
+        },
+        {
+                    @id: 2,
+                    pop: "project",
+                    child: 1,
+                    exprs: [
+                      { ref: "blue", expr: "randomBigInt(100000)" }
+                    ]
+                },
+        {
+            @id: 3,
+            pop: "union-exchange",
+            child: 2,
+            maxAllocation: 1000000
+        },
+        {
+            @id:4,
+            child: 3,
+            pop:"external-sort",
+            orderings: [
+              {expr: "blue", order : "DESC"}
+            ],
+            initialAllocation: 1000000,
+            maxAllocation: 30000000
+        },
+        {
+            @id:5,
+            child: 4,
+            pop:"selection-vector-remover",
+            maxAllocation: 1000000
+        },
+        {
+            @id: 6,
+            child: 5,
+            pop: "screen",
+            maxAllocation: 1000000
+        }
+    ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7225348..7ef07ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -258,8 +258,8 @@
           <artifactId>maven-surefire-plugin</artifactId>
           <version>2.17</version>
           <configuration>
-            <argLine>-Xms1g -Xmx2g -XX:MaxDirectMemorySize=6096M </argLine>
-            <forkCount>8</forkCount>
+            <argLine>-Xms1g -Xmx2g -XX:MaxDirectMemorySize=10096M </argLine>
+            <forkCount>1</forkCount>
             <reuseForks>true</reuseForks>
             <additionalClasspathElements>
               <additionalClasspathElement>./exec/jdbc/src/test/resources/storage-plugins.json</additionalClasspathElement>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
index 78343e6..37e8a18 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
@@ -1267,6 +1267,16 @@ public final class BitData {
      * <code>optional bool isLastBatch = 5;</code>
      */
     boolean getIsLastBatch();
+
+    // optional bool isOutOfMemory = 6 [default = false];
+    /**
+     * <code>optional bool isOutOfMemory = 6 [default = false];</code>
+     */
+    boolean hasIsOutOfMemory();
+    /**
+     * <code>optional bool isOutOfMemory = 6 [default = false];</code>
+     */
+    boolean getIsOutOfMemory();
   }
   /**
    * Protobuf type {@code exec.bit.data.FragmentRecordBatch}
@@ -1360,6 +1370,11 @@ public final class BitData {
               isLastBatch_ = input.readBool();
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              isOutOfMemory_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1492,12 +1507,29 @@ public final class BitData {
       return isLastBatch_;
     }
 
+    // optional bool isOutOfMemory = 6 [default = false];
+    public static final int ISOUTOFMEMORY_FIELD_NUMBER = 6;
+    private boolean isOutOfMemory_;
+    /**
+     * <code>optional bool isOutOfMemory = 6 [default = false];</code>
+     */
+    public boolean hasIsOutOfMemory() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    /**
+     * <code>optional bool isOutOfMemory = 6 [default = false];</code>
+     */
+    public boolean getIsOutOfMemory() {
+      return isOutOfMemory_;
+    }
+
     private void initFields() {
       handle_ = org.apache.drill.exec.proto.ExecProtos.FragmentHandle.getDefaultInstance();
       sendingMajorFragmentId_ = 0;
       sendingMinorFragmentId_ = 0;
       def_ = org.apache.drill.exec.proto.UserBitShared.RecordBatchDef.getDefaultInstance();
       isLastBatch_ = false;
+      isOutOfMemory_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1526,6 +1558,9 @@ public final class BitData {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeBool(5, isLastBatch_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeBool(6, isOutOfMemory_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1555,6 +1590,10 @@ public final class BitData {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(5, isLastBatch_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(6, isOutOfMemory_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1691,6 +1730,8 @@ public final class BitData {
         bitField0_ = (bitField0_ & ~0x00000008);
         isLastBatch_ = false;
         bitField0_ = (bitField0_ & ~0x00000010);
+        isOutOfMemory_ = false;
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
 
@@ -1747,6 +1788,10 @@ public final class BitData {
           to_bitField0_ |= 0x00000010;
         }
         result.isLastBatch_ = isLastBatch_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.isOutOfMemory_ = isOutOfMemory_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1778,6 +1823,9 @@ public final class BitData {
         if (other.hasIsLastBatch()) {
           setIsLastBatch(other.getIsLastBatch());
         }
+        if (other.hasIsOutOfMemory()) {
+          setIsOutOfMemory(other.getIsOutOfMemory());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -2138,6 +2186,39 @@ public final class BitData {
         return this;
       }
 
+      // optional bool isOutOfMemory = 6 [default = false];
+      private boolean isOutOfMemory_ ;
+      /**
+       * <code>optional bool isOutOfMemory = 6 [default = false];</code>
+       */
+      public boolean hasIsOutOfMemory() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>optional bool isOutOfMemory = 6 [default = false];</code>
+       */
+      public boolean getIsOutOfMemory() {
+        return isOutOfMemory_;
+      }
+      /**
+       * <code>optional bool isOutOfMemory = 6 [default = false];</code>
+       */
+      public Builder setIsOutOfMemory(boolean value) {
+        bitField0_ |= 0x00000020;
+        isOutOfMemory_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool isOutOfMemory = 6 [default = false];</code>
+       */
+      public Builder clearIsOutOfMemory() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        isOutOfMemory_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.bit.data.FragmentRecordBatch)
     }
 
@@ -2179,15 +2260,16 @@ public final class BitData {
       "\013rpc_version\030\001 \001(\005\0222\n\007channel\030\002 \001(\0162\027.ex" +
       "ec.shared.RpcChannel:\010BIT_DATA\022(\n\006handle" +
       "\030\003 \001(\0132\030.exec.bit.FragmentHandle\")\n\022BitS" +
-      "erverHandshake\022\023\n\013rpc_version\030\001 \001(\005\"\304\001\n\023" +
+      "erverHandshake\022\023\n\013rpc_version\030\001 \001(\005\"\342\001\n\023" +
       "FragmentRecordBatch\022(\n\006handle\030\001 \001(\0132\030.ex" +
       "ec.bit.FragmentHandle\022!\n\031sending_major_f" +
       "ragment_id\030\002 \001(\005\022!\n\031sending_minor_fragme",
       "nt_id\030\003 \001(\005\022(\n\003def\030\004 \001(\0132\033.exec.shared.R" +
-      "ecordBatchDef\022\023\n\013isLastBatch\030\005 \001(\010*D\n\007Rp" +
-      "cType\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE" +
-      "\020\002\022\024\n\020REQ_RECORD_BATCH\020\003B(\n\033org.apache.d" +
-      "rill.exec.protoB\007BitDataH\001"
+      "ecordBatchDef\022\023\n\013isLastBatch\030\005 \001(\010\022\034\n\ris" +
+      "OutOfMemory\030\006 \001(\010:\005false*D\n\007RpcType\022\r\n\tH" +
+      "ANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_" +
+      "RECORD_BATCH\020\003B(\n\033org.apache.drill.exec." +
+      "protoB\007BitDataH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -2211,7 +2293,7 @@ public final class BitData {
           internal_static_exec_bit_data_FragmentRecordBatch_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_data_FragmentRecordBatch_descriptor,
-              new java.lang.String[] { "Handle", "SendingMajorFragmentId", "SendingMinorFragmentId", "Def", "IsLastBatch", });
+              new java.lang.String[] { "Handle", "SendingMajorFragmentId", "SendingMinorFragmentId", "Def", "IsLastBatch", "IsOutOfMemory", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a2355d42/protocol/src/main/protobuf/BitData.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/BitData.proto b/protocol/src/main/protobuf/BitData.proto
index 5356398..de8e9e7 100644
--- a/protocol/src/main/protobuf/BitData.proto
+++ b/protocol/src/main/protobuf/BitData.proto
@@ -31,4 +31,5 @@ message FragmentRecordBatch{
   optional int32 sending_minor_fragment_id = 3;
   optional exec.shared.RecordBatchDef def = 4;
   optional bool isLastBatch = 5;
+  optional bool isOutOfMemory = 6 [ default = false ];
 }