You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by hg...@apache.org on 2015/07/07 01:28:56 UTC

[1/3] drill git commit: DRILL-3445: BufferAllocator.buffer() implementations should throw an OutOfMemoryRuntimeException

Repository: drill
Updated Branches:
  refs/heads/master 0a27a033a -> 48d8a59d1


DRILL-3445: BufferAllocator.buffer() implementations should throw an OutOfMemoryRuntimeException


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/48d8a59d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/48d8a59d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/48d8a59d

Branch: refs/heads/master
Commit: 48d8a59d1b97988c006f85daad0ae2fcb3a9cd06
Parents: 1c9093e
Author: adeneche <ad...@gmail.com>
Authored: Wed Jul 1 12:10:07 2015 -0700
Committer: Hanifi Gunes <hg...@maprtech.com>
Committed: Mon Jul 6 16:28:29 2015 -0700

----------------------------------------------------------------------
 .../codegen/templates/FixedValueVectors.java    | 10 +-----
 .../templates/VariableLengthVectors.java        | 28 ++++-----------
 .../cache/VectorAccessibleSerializable.java     |  6 +---
 .../drill/exec/memory/BufferAllocator.java      |  2 ++
 .../drill/exec/memory/TopLevelAllocator.java    | 23 ++++++-------
 .../physical/impl/filter/FilterTemplate2.java   |  2 +-
 .../physical/impl/xsort/ExternalSortBatch.java  |  4 +--
 .../exec/record/selection/SelectionVector2.java | 16 ++++++---
 .../drill/exec/rpc/ProtobufLengthDecoder.java   |  7 ++--
 .../ParquetDirectByteBufferAllocator.java       |  3 --
 .../org/apache/drill/exec/vector/BitVector.java | 12 ++-----
 .../parquet/hadoop/ColumnChunkIncReadStore.java |  3 ++
 .../apache/drill/TestAllocationException.java   | 36 ++++++--------------
 .../drill/exec/memory/TestAllocators.java       | 34 ++++++------------
 14 files changed, 67 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index e8a4d5f..cadcfd9 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -125,11 +125,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
     final int curSize = (int)size;
     clear();
-    final DrillBuf newBuf = allocator.buffer(curSize);
-    if (newBuf == null) {
-      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes", size));
-    }
-    data = newBuf;
+    data = allocator.buffer(curSize);
     data.readerIndex(0);
     allocationSizeInBytes = curSize;
   }
@@ -147,10 +143,6 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
 
     logger.debug("Reallocating vector [{}]. # of bytes: [{}] -> [{}]", field, allocationSizeInBytes, newAllocationSize);
     final DrillBuf newBuf = allocator.buffer((int)newAllocationSize);
-    if (newBuf == null) {
-      throw new OutOfMemoryRuntimeException(String.format("Failure while reallocating buffer to %d bytes", newAllocationSize));
-    }
-
     newBuf.setBytes(0, data, 0, data.capacity());
     newBuf.setZero(newBuf.capacity() / 2, newBuf.capacity() / 2);
     newBuf.writerIndex(data.writerIndex());

http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 2c2e6b6..fc08ac6 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -300,34 +300,24 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
      * buffers for multiple vectors. If one of the allocations failed we need to
      * clear all the memory that we allocated
      */
-    boolean success = false;
     try {
       final int requestedSize = (int)curAllocationSize;
-      DrillBuf newBuf = allocator.buffer(requestedSize);
-      if (newBuf == null) {
-        return false;
-      }
-      this.data = newBuf;
-      success = offsetVector.allocateNewSafe();
-    } finally {
-      if (!success) {
-        clear();
-      }
+      data = allocator.buffer(requestedSize);
+      offsetVector.allocateNew();
+    } catch (OutOfMemoryRuntimeException e) {
+      clear();
+      return false;
     }
     data.readerIndex(0);
     offsetVector.zeroVector();
-    return success;
+    return true;
   }
 
   public void allocateNew(int totalBytes, int valueCount) {
     clear();
     assert totalBytes >= 0;
     try {
-      final DrillBuf newBuf = allocator.buffer(totalBytes);
-      if (newBuf == null) {
-        throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes", totalBytes));
-      }
-      data = newBuf;
+      data = allocator.buffer(totalBytes);
       offsetVector.allocateNew(valueCount + 1);
     } catch (DrillRuntimeException e) {
       clear();
@@ -345,10 +335,6 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     }
 
     final DrillBuf newBuf = allocator.buffer((int)newAllocationSize);
-    if(newBuf == null) {
-      throw new OutOfMemoryRuntimeException(
-        String.format("Failure while reallocating buffer of %d bytes", newAllocationSize));
-    }
     newBuf.setBytes(0, data, 0, data.capacity());
     data.release();
     data = newBuf;

http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 016cd92..a5d2ce8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -26,7 +26,6 @@ import java.util.List;
 
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
@@ -47,7 +46,7 @@ import com.google.common.collect.Lists;
  * from an InputStream and construct a new VectorContainer.
  */
 public class VectorAccessibleSerializable extends AbstractStreamSerializable {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
   static final MetricRegistry metrics = DrillMetrics.getInstance();
   static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
 
@@ -114,9 +113,6 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
       int dataLength = metaData.getBufferLength();
       MaterializedField field = MaterializedField.create(metaData);
       DrillBuf buf = allocator.buffer(dataLength);
-      if (buf == null) {
-        throw new IOException(new OutOfMemoryException());
-      }
       final ValueVector vector;
       try {
         buf.writeBytes(input, dataLength);

http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index 811cceb..b01534e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -39,6 +39,7 @@ public interface BufferAllocator extends Closeable {
    * @param size
    *          The size in bytes.
    * @return A new ByteBuf.
+   * @throws OutOfMemoryRuntimeException if buffer cannot be allocated
    */
   public abstract DrillBuf buffer(int size);
 
@@ -49,6 +50,7 @@ public interface BufferAllocator extends Closeable {
    * @param minSize The minimum size in bytes.
    * @param maxSize The maximum size in bytes.
    * @return A new ByteBuf.
+   * @throws OutOfMemoryRuntimeException if buffer cannot be allocated
    */
   public abstract DrillBuf buffer(int minSize, int maxSize);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index b4386a4..05849ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -93,7 +93,7 @@ public class TopLevelAllocator implements BufferAllocator {
       return empty;
     }
     if(!acct.reserve(min)) {
-      return null;
+      throw new OutOfMemoryRuntimeException(createErrorMsg(this, min));
     }
 
     try {
@@ -104,7 +104,7 @@ public class TopLevelAllocator implements BufferAllocator {
     } catch (OutOfMemoryError e) {
       if ("Direct buffer memory".equals(e.getMessage())) {
         acct.release(min);
-        return null;
+        throw new OutOfMemoryRuntimeException(createErrorMsg(this, min), e);
       } else {
         throw e;
       }
@@ -233,25 +233,18 @@ public class TopLevelAllocator implements BufferAllocator {
       return acct.transferIn(b, b.capacity());
     }
 
+
     @Override
     public DrillBuf buffer(int size, int max) {
       if (ENABLE_ACCOUNTING) {
-        try {
-          injector.injectUnchecked(fragmentContext, CHILD_BUFFER_INJECTION_SITE);
-        } catch (NullPointerException e) {
-          // This is an unusual way to use exception injection. If we inject a NullPointerException into this site
-          // it will actually cause this method to return null, simulating a "normal" failure to allocate memory
-          // this can be useful to check if the caller will properly handle nulls
-          return null;
-        }
+        injector.injectUnchecked(fragmentContext, CHILD_BUFFER_INJECTION_SITE);
       }
 
       if (size == 0) {
         return empty;
       }
       if(!childAcct.reserve(size)) {
-        logger.warn("Unable to allocate buffer of size {} due to memory limit. Current allocation: {}", size, getAllocatedMemory(), new Exception());
-        return null;
+        throw new OutOfMemoryRuntimeException(createErrorMsg(this, size));
       }
 
       try {
@@ -262,7 +255,7 @@ public class TopLevelAllocator implements BufferAllocator {
       } catch (OutOfMemoryError e) {
         if ("Direct buffer memory".equals(e.getMessage())) {
           childAcct.release(size);
-          return null;
+          throw new OutOfMemoryRuntimeException(createErrorMsg(this, size), e);
         } else {
           throw e;
         }
@@ -401,4 +394,8 @@ public class TopLevelAllocator implements BufferAllocator {
     }
   }
 
+  private static String createErrorMsg(final BufferAllocator allocator, final int size) {
+    return String.format("Unable to allocate buffer of size %d due to memory limit. Current allocation: %d",
+      size, allocator.getAllocatedMemory());
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index cd2fbe9..11d01d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -64,7 +64,7 @@ public abstract class FilterTemplate2 implements Filterer{
     if (recordCount == 0) {
       return;
     }
-    if (! outgoingSelectionVector.allocateNew(recordCount)) {
+    if (! outgoingSelectionVector.allocateNewSafe(recordCount)) {
       throw new OutOfMemoryRuntimeException("Unable to allocate filter batch");
     }
     switch(svMode){

http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 6da5582..4bb1572 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -533,7 +533,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
   private SelectionVector2 newSV2() throws OutOfMemoryException, InterruptedException {
     SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator());
-    if (!sv2.allocateNew(incoming.getRecordCount())) {
+    if (!sv2.allocateNewSafe(incoming.getRecordCount())) {
       try {
         spilledBatchGroups.addFirst(mergeAndSpill(batchGroups));
       } catch (SchemaChangeException e) {
@@ -550,7 +550,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           }
         }
         waitTime *= 2;
-        if (sv2.allocateNew(incoming.getRecordCount())) {
+        if (sv2.allocateNewSafe(incoming.getRecordCount())) {
           break;
         }
         if (waitTime >= 32) {

http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index 7a7c012..ba8640a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -23,13 +23,14 @@ import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.record.DeadBuf;
 
 /**
  * A selection vector that fronts, at most, a
  */
 public class SelectionVector2 implements Closeable{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector2.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector2.class);
 
   private final BufferAllocator allocator;
   private int recordCount;
@@ -90,15 +91,20 @@ public class SelectionVector2 implements Closeable{
     buffer.setChar(index, value);
   }
 
-  public boolean allocateNew(int size){
-    clear();
-    buffer = allocator.buffer(size * RECORD_SIZE);
-    if (buffer == null) {
+  public boolean allocateNewSafe(int size) {
+    try {
+      allocateNew(size);
+    } catch (OutOfMemoryRuntimeException e) {
       return false;
     }
     return true;
   }
 
+  public void allocateNew(int size) {
+    clear();
+    buffer = allocator.buffer(size * RECORD_SIZE);
+  }
+
   @Override
   public SelectionVector2 clone(){
     SelectionVector2 newSV = new SelectionVector2(allocator);

http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
index 4e03f11..3e2adaf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
@@ -27,6 +27,7 @@ import java.util.List;
 import org.apache.drill.exec.memory.BufferAllocator;
 
 import com.google.protobuf.CodedInputStream;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 
 /**
  * Modified version of {@link io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder} that avoids bytebuf copy.
@@ -81,8 +82,10 @@ public class ProtobufLengthDecoder extends ByteToMessageDecoder {
         } else {
           // need to make buffer copy, otherwise netty will try to refill this buffer if we move the readerIndex forward...
           // TODO: Can we avoid this copy?
-          ByteBuf outBuf = allocator.buffer(length);
-          if (outBuf == null) {
+          ByteBuf outBuf;
+          try {
+            outBuf = allocator.buffer(length);
+          } catch (OutOfMemoryRuntimeException e) {
             logger.warn("Failure allocating buffer on incoming stream due to memory limits.  Current Allocation: {}.", allocator.getAllocatedMemory());
             in.resetReaderIndex();
             outOfMemoryHandler.handle();

http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
index cf30db6..10c8fd1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
@@ -49,9 +49,6 @@ public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
   @Override
   public ByteBuffer allocate(int sz) {
     ByteBuf bb = allocator.buffer(sz);
-    if (bb == null) {
-      throw new OutOfMemoryRuntimeException();
-    }
     ByteBuffer b = bb.nioBuffer(0, sz);
     allocatedBuffers.put(System.identityHashCode(b), bb);
     logger.debug("ParquetDirectByteBufferAllocator: Allocated "+sz+" bytes. Allocated ByteBuffer id: "+System.identityHashCode(b));

http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 1d48043..054ef82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -121,11 +121,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
 
     final int curSize = (int)size;
     clear();
-    final DrillBuf newBuf = allocator.buffer(curSize);
-    if (newBuf == null) {
-      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of d% bytes.", curSize));
-    }
-    data = newBuf;
+    data = allocator.buffer(curSize);
     zeroVector();
     allocationSizeInBytes = curSize;
   }
@@ -141,15 +137,11 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
 
     final int curSize = (int)newAllocationSize;
     final DrillBuf newBuf = allocator.buffer(curSize);
-    if (newBuf == null) {
-      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes.", newAllocationSize));
-    }
-
     newBuf.setZero(0, newBuf.capacity());
     newBuf.setBytes(0, data, 0, data.capacity());
     data.release();
     data = newBuf;
-    allocationSizeInBytes =  curSize;
+    allocationSizeInBytes = curSize;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
index 6337d4c..d8bf2fd 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -28,6 +28,7 @@ import java.util.Map;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.store.parquet.DirectCodecFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -177,6 +178,8 @@ public class ColumnChunkIncReadStore implements PageReadStore {
         }
         in.close();
         return null;
+      } catch (OutOfMemoryRuntimeException e) {
+        throw e; // throw as it is
       } catch (Exception e) {
         throw new DrillRuntimeException("Error reading page." +
           "\nFile path: " + path.toUri().getPath() +

http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java b/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java
index 051ad4e..10fd5da 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java
@@ -20,8 +20,8 @@ package org.apache.drill;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
+import org.apache.drill.exec.testing.Controls;
 import org.apache.drill.exec.testing.ControlsInjectionUtil;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -36,26 +36,17 @@ public class TestAllocationException extends BaseTestQuery {
 
   private static final String SINGLE_MODE = "ALTER SESSION SET `planner.disable_exchanges` = true";
 
-  private void testWithException(final String fileName) throws Exception {
-    testWithException(fileName, OutOfMemoryRuntimeException.class);
-  }
-
-  private void testWithException(final String fileName, Class<? extends Throwable> exceptionClass) throws Exception{
+  private void testWithException(final String fileName) throws Exception{
     test(SINGLE_MODE);
 
-    CoordinationProtos.DrillbitEndpoint endpoint = bits[0].getContext().getEndpoint();
-
-    String controlsString = "{\"injections\":[{"
-      + "\"address\":\"" + endpoint.getAddress() + "\","
-      + "\"port\":\"" + endpoint.getUserPort() + "\","
-      + "\"type\":\"exception\","
-      + "\"siteClass\":\"" + TopLevelAllocator.class.getName() + "\","
-      + "\"desc\":\"" + TopLevelAllocator.CHILD_BUFFER_INJECTION_SITE + "\","
-      + "\"nSkip\":200,"
-      + "\"nFire\":1,"
-      + "\"exceptionClass\":\"" + exceptionClass.getName() + "\""
-      + "}]}";
-    ControlsInjectionUtil.setControls(client, controlsString);
+    final String controls = Controls.newBuilder()
+      .addException(TopLevelAllocator.class,
+        TopLevelAllocator.CHILD_BUFFER_INJECTION_SITE,
+        OutOfMemoryRuntimeException.class,
+        200,
+        1
+      ).build();
+    ControlsInjectionUtil.setControls(client, controls);
 
     String query = getFile(fileName);
 
@@ -71,12 +62,7 @@ public class TestAllocationException extends BaseTestQuery {
   }
 
   @Test
-  public void testWithNull() throws Exception{
-    testWithException("queries/tpch/01.sql");
-  }
-
-  @Test
   public void testWithOOM() throws Exception{
-    testWithException("queries/tpch/03.sql", NullPointerException.class);
+    testWithException("queries/tpch/01.sql");
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/48d8a59d/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
index 74ce225..0b8314c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
@@ -19,7 +19,8 @@
 package org.apache.drill.exec.memory;
 
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 import io.netty.buffer.DrillBuf;
 
 import java.util.Iterator;
@@ -143,36 +144,23 @@ public class TestAllocators {
     ((AutoCloseable) oContext22).close();
 
     // Fragment 3 asks for more and fails
-    boolean outOfMem=false;
     try {
-      DrillBuf b31b = oContext31.getAllocator().buffer(4400000);
-      if(b31b!=null) {
-        b31b.release();
-      }else{
-        outOfMem=true;
-      }
-    }catch(Exception e){
-      outOfMem=true;
+      oContext31.getAllocator().buffer(4400000);
+      fail("Fragment 3 should fail to allocate buffer");
+    } catch (OutOfMemoryRuntimeException e) {
+      // expected
     }
-    assertEquals(true, (boolean)outOfMem);
 
     // Operator is Exempt from Fragment limits. Fragment 3 asks for more and succeeds
-    outOfMem=false;
     OperatorContext oContext32 = fragmentContext3.newOperatorContext(physicalOperator6, false);
-    DrillBuf b32=null;
     try {
-      b32=oContext32.getAllocator().buffer(4400000);
-    }catch(Exception e){
-      outOfMem=true;
-    }finally{
-      if(b32!=null) {
-        b32.release();
-      }else{
-        outOfMem=true;
-      }
+      DrillBuf b32 = oContext32.getAllocator().buffer(4400000);
+      b32.release();
+    } catch (OutOfMemoryRuntimeException e) {
+      fail("Fragment 3 failed to allocate buffer");
+    } finally {
       closeOp(oContext32);
     }
-    assertEquals(false, (boolean)outOfMem);
 
     b11.release();
     closeOp(oContext11);


[3/3] drill git commit: DRILL-2851: handle oversized allocation requests; ensure flatten splits a batch if data is oversized; add unit tests

Posted by hg...@apache.org.
DRILL-2851: handle oversized allocation requests; ensure flatten splits a batch if data is oversized; add unit tests


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b2bbd994
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b2bbd994
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b2bbd994

Branch: refs/heads/master
Commit: b2bbd9941be6b132a83d27c0ae02c935e1dec5dd
Parents: 0a27a03
Author: Hanifi Gunes <hg...@maprtech.com>
Authored: Fri Jun 19 22:38:06 2015 -0700
Committer: Hanifi Gunes <hg...@maprtech.com>
Committed: Mon Jul 6 16:28:29 2015 -0700

----------------------------------------------------------------------
 .../src/main/codegen/includes/vv_imports.ftl    |   2 +
 .../codegen/templates/FixedValueVectors.java    |  68 ++-
 .../codegen/templates/NullableValueVectors.java |  16 +-
 .../templates/VariableLengthVectors.java        |  65 ++-
 .../exception/OversizedAllocationException.java |  51 ++
 .../impl/flatten/FlattenRecordBatch.java        |   4 +-
 .../physical/impl/flatten/FlattenTemplate.java  |  70 +--
 .../exec/physical/impl/flatten/Flattener.java   |   2 +-
 .../drill/exec/vector/BaseDataValueVector.java  |   2 +-
 .../drill/exec/vector/BaseValueVector.java      |   8 +-
 .../org/apache/drill/exec/vector/BitVector.java |  57 +-
 .../exec/record/vector/TestValueVector.java     | 522 +++++++++++--------
 12 files changed, 539 insertions(+), 328 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
index 92c8007..733e0a5 100644
--- a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
+++ b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
@@ -30,6 +30,8 @@ import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.vector.*;
+import org.apache.drill.common.exceptions.*;
+import org.apache.drill.exec.exception.*;
 import org.apache.drill.exec.expr.holders.*;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.types.TypeProtos.*;

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index 7103a17..e8a4d5f 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -48,7 +48,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   private final Accessor accessor = new Accessor();
   private final Mutator mutator = new Mutator();
 
-  private int allocationValueCount = INITIAL_VALUE_ALLOCATION;
+  private int allocationSizeInBytes = INITIAL_VALUE_ALLOCATION * ${type.width};
   private int allocationMonitor = 0;
 
   public ${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
@@ -73,8 +73,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   }
 
   @Override
-  public void setInitialCapacity(int numRecords) {
-    allocationValueCount = numRecords;
+  public void setInitialCapacity(final int valueCount) {
+    final long size = 1L * valueCount * ${type.width};
+    if (size > MAX_ALLOCATION_SIZE) {
+      throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
+    }
+    allocationSizeInBytes = (int)size;
   }
 
   public void allocateNew() {
@@ -84,42 +88,50 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   }
 
   public boolean allocateNewSafe() {
-    clear();
+    long curAllocationSize = allocationSizeInBytes;
     if (allocationMonitor > 10) {
-      allocationValueCount = Math.max(8, (int) (allocationValueCount / 2));
+      curAllocationSize = Math.max(8, curAllocationSize / 2);
       allocationMonitor = 0;
     } else if (allocationMonitor < -2) {
-      allocationValueCount = (int) (allocationValueCount * 2);
+      curAllocationSize = allocationSizeInBytes * 2L;
       allocationMonitor = 0;
     }
 
-    DrillBuf newBuf = allocator.buffer(allocationValueCount * ${type.width});
-    if(newBuf == null) {
+    try{
+      allocateBytes(curAllocationSize);
+    } catch (DrillRuntimeException ex) {
       return false;
     }
-
-    this.data = newBuf;
-    this.data.readerIndex(0);
     return true;
   }
 
   /**
-   * Allocate a new buffer that supports setting at least the provided number of values.  May actually be sized bigger depending on underlying buffer rounding size. Must be called prior to using the ValueVector.
+   * Allocate a new buffer that supports setting at least the provided number of values. May actually be sized bigger
+   * depending on underlying buffer rounding size. Must be called prior to using the ValueVector.
+   *
+   * Note that the maximum number of values a vector can allocate is Integer.MAX_VALUE / value width.
+   *
    * @param valueCount
    * @throws org.apache.drill.exec.memory.OutOfMemoryRuntimeException if it can't allocate the new buffer
    */
-  public void allocateNew(int valueCount) {
-    clear();
+  public void allocateNew(final int valueCount) {
+    allocateBytes(valueCount * ${type.width});
+  }
 
-    DrillBuf newBuf = allocator.buffer(valueCount * ${type.width});
-    if (newBuf == null) {
-      throw new OutOfMemoryRuntimeException(
-        String.format("Failure while allocating buffer of %d bytes",valueCount * ${type.width}));
+  private void allocateBytes(final long size) {
+    if (size > MAX_ALLOCATION_SIZE) {
+      throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
     }
 
-    this.data = newBuf;
-    this.data.readerIndex(0);
-    this.allocationValueCount = valueCount;
+    final int curSize = (int)size;
+    clear();
+    final DrillBuf newBuf = allocator.buffer(curSize);
+    if (newBuf == null) {
+      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes", size));
+    }
+    data = newBuf;
+    data.readerIndex(0);
+    allocationSizeInBytes = curSize;
   }
 
 /**
@@ -128,12 +140,15 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
  * @throws org.apache.drill.exec.memory.OutOfMemoryRuntimeException if it can't allocate the new buffer
  */
   public void reAlloc() {
-    logger.info("Realloc vector {}. [{}] -> [{}]", field, allocationValueCount * ${type.width}, allocationValueCount * 2 * ${type.width});
-    allocationValueCount *= 2;
-    DrillBuf newBuf = allocator.buffer(allocationValueCount * ${type.width});
+    final long newAllocationSize = allocationSizeInBytes * 2L;
+    if (newAllocationSize > MAX_ALLOCATION_SIZE)  {
+      throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached.");
+    }
+
+    logger.debug("Reallocating vector [{}]. # of bytes: [{}] -> [{}]", field, allocationSizeInBytes, newAllocationSize);
+    final DrillBuf newBuf = allocator.buffer((int)newAllocationSize);
     if (newBuf == null) {
-      throw new OutOfMemoryRuntimeException(
-      String.format("Failure while reallocating buffer to %d bytes",allocationValueCount * ${type.width}));
+      throw new OutOfMemoryRuntimeException(String.format("Failure while reallocating buffer to %d bytes", newAllocationSize));
     }
 
     newBuf.setBytes(0, data, 0, data.capacity());
@@ -141,6 +156,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     newBuf.writerIndex(data.writerIndex());
     data.release();
     data = newBuf;
+    allocationSizeInBytes = (int)newAllocationSize;
   }
 
   public void zeroVector() {

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index 7f83542..7fa0d55 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -131,9 +131,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
      */
     boolean success = false;
     try {
-      if(!values.allocateNewSafe()) return false;
-      if(!bits.allocateNewSafe()) return false;
-      success = true;
+      success = values.allocateNewSafe() && bits.allocateNewSafe();
     } finally {
       if (!success) {
         clear();
@@ -142,7 +140,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     bits.zeroVector();
     mutator.reset();
     accessor.reset();
-    return true;
+    return success;
   }
 
   @Override
@@ -150,7 +148,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     try {
       values.allocateNew(totalBytes, valueCount);
       bits.allocateNew(valueCount);
-    } catch(OutOfMemoryRuntimeException e){
+    } catch(DrillRuntimeException e) {
       clear();
       throw e;
     }
@@ -196,7 +194,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     try {
       values.allocateNew();
       bits.allocateNew();
-    } catch(OutOfMemoryRuntimeException e) {
+    } catch(DrillRuntimeException e) {
       clear();
       throw e;
     }
@@ -215,9 +213,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
      */
     boolean success = false;
     try {
-      if(!values.allocateNewSafe()) return false;
-      if(!bits.allocateNewSafe()) return false;
-      success = true;
+      success = values.allocateNewSafe() && bits.allocateNewSafe();
     } finally {
       if (!success) {
         clear();
@@ -226,7 +222,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     bits.zeroVector();
     mutator.reset();
     accessor.reset();
-    return true;
+    return success;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 50ae770..2c2e6b6 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -69,7 +69,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   private final UInt${type.width}Vector.Accessor oAccessor;
 
 
-  private int allocationTotalByteCount = INITIAL_BYTE_COUNT;
+  private int allocationSizeInBytes = INITIAL_BYTE_COUNT;
   private int allocationMonitor = 0;
 
   public ${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
@@ -264,9 +264,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   }
 
   @Override
-  public void setInitialCapacity(int numRecords) {
-    allocationTotalByteCount = numRecords * DEFAULT_RECORD_BYTE_COUNT;
-    offsetVector.setInitialCapacity(numRecords + 1);
+  public void setInitialCapacity(final int valueCount) {
+    final long size = 1L * valueCount * ${type.width};
+    if (size > MAX_ALLOCATION_SIZE) {
+      throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
+    }
+    allocationSizeInBytes = (int)size;
+    offsetVector.setInitialCapacity(valueCount + 1);
   }
 
   public void allocateNew() {
@@ -277,15 +281,20 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
 
   @Override
   public boolean allocateNewSafe() {
-    clear();
+    long curAllocationSize = allocationSizeInBytes;
     if (allocationMonitor > 10) {
-      allocationTotalByteCount = Math.max(MIN_BYTE_COUNT, (int) (allocationTotalByteCount / 2));
+      curAllocationSize = Math.max(MIN_BYTE_COUNT, curAllocationSize / 2);
       allocationMonitor = 0;
     } else if (allocationMonitor < -2) {
-      allocationTotalByteCount = (int) (allocationTotalByteCount * 2);
+      curAllocationSize = curAllocationSize * 2L;
       allocationMonitor = 0;
     }
 
+    if (curAllocationSize > MAX_ALLOCATION_SIZE) {
+      return false;
+    }
+
+    clear();
     /* Boolean to keep track if all the memory allocations were successful
      * Used in the case of composite vectors when we need to allocate multiple
      * buffers for multiple vectors. If one of the allocations failed we need to
@@ -293,15 +302,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
      */
     boolean success = false;
     try {
-      DrillBuf newBuf = allocator.buffer(allocationTotalByteCount);
+      final int requestedSize = (int)curAllocationSize;
+      DrillBuf newBuf = allocator.buffer(requestedSize);
       if (newBuf == null) {
         return false;
       }
       this.data = newBuf;
-      if (!offsetVector.allocateNewSafe()) {
-        return false;
-      }
-      success = true;
+      success = offsetVector.allocateNewSafe();
     } finally {
       if (!success) {
         clear();
@@ -309,40 +316,44 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     }
     data.readerIndex(0);
     offsetVector.zeroVector();
-    return true;
+    return success;
   }
 
   public void allocateNew(int totalBytes, int valueCount) {
     clear();
     assert totalBytes >= 0;
     try {
-      DrillBuf newBuf = allocator.buffer(totalBytes);
+      final DrillBuf newBuf = allocator.buffer(totalBytes);
       if (newBuf == null) {
         throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes", totalBytes));
       }
-      this.data = newBuf;
+      data = newBuf;
       offsetVector.allocateNew(valueCount + 1);
-    } catch (OutOfMemoryRuntimeException e) {
+    } catch (DrillRuntimeException e) {
       clear();
       throw e;
     }
     data.readerIndex(0);
-    allocationTotalByteCount = totalBytes;
+    allocationSizeInBytes = totalBytes;
     offsetVector.zeroVector();
   }
 
-    public void reAlloc() {
-      allocationTotalByteCount *= 2;
-      DrillBuf newBuf = allocator.buffer(allocationTotalByteCount);
-      if(newBuf == null){
-        throw new OutOfMemoryRuntimeException(
-          String.format("Failure while reallocating buffer of %d bytes", allocationTotalByteCount));
-      }
+  public void reAlloc() {
+    final long newAllocationSize = allocationSizeInBytes*2L;
+    if (newAllocationSize > MAX_ALLOCATION_SIZE)  {
+      throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached.");
+    }
 
-      newBuf.setBytes(0, data, 0, data.capacity());
-      data.release();
-      data = newBuf;
+    final DrillBuf newBuf = allocator.buffer((int)newAllocationSize);
+    if(newBuf == null) {
+      throw new OutOfMemoryRuntimeException(
+        String.format("Failure while reallocating buffer of %d bytes", newAllocationSize));
     }
+    newBuf.setBytes(0, data, 0, data.capacity());
+    data.release();
+    data = newBuf;
+    allocationSizeInBytes = (int)newAllocationSize;
+  }
 
   public void decrementAllocationMonitor() {
     if (allocationMonitor > 0) {

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OversizedAllocationException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OversizedAllocationException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OversizedAllocationException.java
new file mode 100644
index 0000000..f5ae70c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/OversizedAllocationException.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.record.RecordBatch;
+
+/**
+ * An exception that is used to signal that allocation request in bytes is greater than the maximum allowed by
+ * {@link org.apache.drill.exec.memory.BufferAllocator#buffer(int) allocator}.
+ *
+ * <p>Operators should handle this exception to split the batch and later resume the execution on the next
+ * {@link RecordBatch#next() iteration}.</p>
+ *
+ */
+public class OversizedAllocationException extends DrillRuntimeException {
+  public OversizedAllocationException() {
+    super();
+  }
+
+  public OversizedAllocationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+  public OversizedAllocationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public OversizedAllocationException(String message) {
+    super(message);
+  }
+
+  public OversizedAllocationException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index b8daceb..491ced3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -150,7 +150,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     setFlattenVector();
 
     int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getInnerValueCount();
-    int outputRecords = flattener.flattenRecords(0, incomingRecordCount, 0);
+    int outputRecords = flattener.flattenRecords(incomingRecordCount, 0);
     // TODO - change this to be based on the repeated vector length
     if (outputRecords < childCount) {
       setValueCount(outputRecords);
@@ -181,7 +181,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
       return;
     }
 
-    int projRecords = flattener.flattenRecords(remainderIndex, remainingRecordCount, 0);
+    int projRecords = flattener.flattenRecords(remainingRecordCount, 0);
     if (projRecords < remainingRecordCount) {
       setValueCount(projRecords);
       this.recordCount = projRecords;

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
index de67b62..a0d82dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import javax.inject.Named;
 
+import org.apache.drill.exec.exception.OversizedAllocationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -32,9 +33,11 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class FlattenTemplate implements Flattener {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenTemplate.class);
+  private static final Logger logger = LoggerFactory.getLogger(FlattenTemplate.class);
 
   private static final int OUTPUT_BATCH_SIZE = 4*1024;
 
@@ -48,14 +51,11 @@ public abstract class FlattenTemplate implements Flattener {
 
   // this allows for groups to be written between batches if we run out of space, for cases where we have finished
   // a batch on the boundary it will be set to 0
-  private int childIndexWithinCurrGroup;
-  // calculating the current group size requires reading the start and end out of the offset vector, this only happens
-  // once and is stored here for faster reference
-  private int currGroupSize;
-  private int childIndex;
+  private int innerValueIndex;
+  private int currentInnerValueIndex;
 
   public FlattenTemplate() throws SchemaChangeException {
-    childIndexWithinCurrGroup = -1;
+    innerValueIndex = -1;
   }
 
   @Override
@@ -69,8 +69,7 @@ public abstract class FlattenTemplate implements Flattener {
   }
 
   @Override
-  public final int flattenRecords(int startIndex, final int recordCount, int firstOutputIndex) {
-    startIndex = childIndex;
+  public final int flattenRecords(final int recordCount, final int firstOutputIndex) {
     switch (svMode) {
       case FOUR_BYTE:
         throw new UnsupportedOperationException("Flatten does not support selection vector inputs.");
@@ -79,35 +78,47 @@ public abstract class FlattenTemplate implements Flattener {
         throw new UnsupportedOperationException("Flatten does not support selection vector inputs.");
 
       case NONE:
-        if (childIndexWithinCurrGroup == -1) {
-          childIndexWithinCurrGroup = 0;
+        if (innerValueIndex == -1) {
+          innerValueIndex = 0;
         }
+        final int initialInnerValueIndex = currentInnerValueIndex;
+        // restore state to local stack
+        int valueIndexLocal = valueIndex;
+        int innerValueIndexLocal = innerValueIndex;
+        int currentInnerValueIndexLocal = currentInnerValueIndex;
         outer: {
+          int outputIndex = firstOutputIndex;
           final int valueCount = accessor.getValueCount();
-          for ( ; valueIndex < valueCount; valueIndex++) {
-            currGroupSize = accessor.getInnerValueCountAt(valueIndex);
-            for ( ; childIndexWithinCurrGroup < currGroupSize; childIndexWithinCurrGroup++) {
-              if (firstOutputIndex == OUTPUT_BATCH_SIZE) {
+          for ( ; valueIndexLocal < valueCount; valueIndexLocal++) {
+            final int innerValueCount = accessor.getInnerValueCountAt(valueIndexLocal);
+            for ( ; innerValueIndexLocal < innerValueCount; innerValueIndexLocal++) {
+              if (outputIndex == OUTPUT_BATCH_SIZE) {
                 break outer;
               }
-              doEval(valueIndex, firstOutputIndex);
-              firstOutputIndex++;
-              childIndex++;
+              try {
+                doEval(valueIndexLocal, outputIndex);
+              } catch (OversizedAllocationException ex) {
+                // unable to flatten due to a soft buffer overflow. split the batch here and resume execution.
+                logger.debug("Reached allocation limit. Splitting the batch at input index: {} - inner index: {} - current completed index: {}",
+                    valueIndexLocal, innerValueIndexLocal, currentInnerValueIndexLocal) ;
+                break outer;
+              }
+              outputIndex++;
+              currentInnerValueIndexLocal++;
             }
-            childIndexWithinCurrGroup = 0;
+            innerValueIndexLocal = 0;
           }
         }
-//        System.out.println(String.format("startIndex %d, recordCount %d, firstOutputIndex: %d, currGroupSize: %d, childIndexWithinCurrGroup: %d, groupIndex: %d", startIndex, recordCount, firstOutputIndex, currGroupSize, childIndexWithinCurrGroup, groupIndex));
-//        try{
-////          Thread.sleep(1000);
-//        }catch(Exception e){
-//
-//        }
-
+        // save state to heap
+        valueIndex = valueIndexLocal;
+        innerValueIndex = innerValueIndexLocal;
+        currentInnerValueIndex = currentInnerValueIndexLocal;
+        // transfer the computed range
+        final int delta = currentInnerValueIndexLocal - initialInnerValueIndex;
         for (TransferPair t : transfers) {
-          t.splitAndTransfer(startIndex, childIndex - startIndex);
+          t.splitAndTransfer(initialInnerValueIndex, delta);
         }
-        return childIndex - startIndex;
+        return delta;
 
       default:
         throw new UnsupportedOperationException();
@@ -133,8 +144,7 @@ public abstract class FlattenTemplate implements Flattener {
   @Override
   public void resetGroupIndex() {
     this.valueIndex = 0;
-    this.currGroupSize = 0;
-    this.childIndex = 0;
+    this.currentInnerValueIndex = 0;
   }
 
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
index 92cf79d..d691545 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
@@ -29,7 +29,7 @@ import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 public interface Flattener {
 
   public abstract void setup(FragmentContext context, RecordBatch incoming,  RecordBatch outgoing, List<TransferPair> transfers)  throws SchemaChangeException;
-  public abstract int flattenRecords(int startIndex, int recordCount, int firstOutputIndex);
+  public abstract int flattenRecords(int recordCount, int firstOutputIndex);
   public void setFlattenField(RepeatedValueVector repeatedColumn);
   public RepeatedValueVector getFlattenField();
   public void resetGroupIndex();

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index 0e38f3c..579eed6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -22,7 +22,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.MaterializedField;
 
 
-public abstract class BaseDataValueVector extends BaseValueVector{
+public abstract class BaseDataValueVector extends BaseValueVector {
 
   protected DrillBuf data;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index 8129668..cc287c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -26,13 +26,17 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class BaseValueVector implements ValueVector {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseValueVector.class);
+  private static final Logger logger = LoggerFactory.getLogger(BaseValueVector.class);
+
+  public static final int MAX_ALLOCATION_SIZE = Integer.MAX_VALUE;
+  public static final int INITIAL_VALUE_ALLOCATION = 4096;
 
   protected final BufferAllocator allocator;
   protected final MaterializedField field;
-  public static final int INITIAL_VALUE_ALLOCATION = 4096;
 
   protected BaseValueVector(MaterializedField field, BufferAllocator allocator) {
     this.field = Preconditions.checkNotNull(field, "field cannot be null");

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 10bdf07..1d48043 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.vector;
 import io.netty.buffer.DrillBuf;
 
 import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.exception.OversizedAllocationException;
 import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.NullableBitHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -43,7 +44,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   private final Mutator mutator = new Mutator();
 
   private int valueCount;
-  private int allocationValueCount = INITIAL_VALUE_ALLOCATION;
+  private int allocationSizeInBytes = INITIAL_VALUE_ALLOCATION;
   private int allocationMonitor = 0;
 
   public BitVector(MaterializedField field, BufferAllocator allocator) {
@@ -66,7 +67,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
 
   @Override
   public int getValueCapacity() {
-    return data.capacity() * 8;
+    return (int)Math.min((long)Integer.MAX_VALUE, data.capacity() * 8L);
   }
 
   private int getByteIndex(int index) {
@@ -74,8 +75,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   }
 
   @Override
-  public void setInitialCapacity(int numRecords) {
-    allocationValueCount = numRecords;
+  public void setInitialCapacity(final int valueCount) {
+    allocationSizeInBytes = getSizeFromCount(valueCount);
   }
 
   public void allocateNew() {
@@ -85,24 +86,20 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   }
 
   public boolean allocateNewSafe() {
-    clear();
+    long curAllocationSize = allocationSizeInBytes;
     if (allocationMonitor > 10) {
-      allocationValueCount = Math.max(8, (int) (allocationValueCount / 2));
+      curAllocationSize = Math.max(8, allocationSizeInBytes / 2);
       allocationMonitor = 0;
     } else if (allocationMonitor < -2) {
-      allocationValueCount = (int) (allocationValueCount * 2);
+      curAllocationSize = allocationSizeInBytes * 2L;
       allocationMonitor = 0;
     }
 
-    clear();
-    int valueSize = getSizeFromCount(allocationValueCount);
-    DrillBuf newBuf = allocator.buffer(valueSize);
-    if (newBuf == null) {
+    try {
+      allocateBytes(curAllocationSize);
+    } catch (OutOfMemoryRuntimeException ex) {
       return false;
     }
-
-    data = newBuf;
-    zeroVector();
     return true;
   }
 
@@ -113,32 +110,46 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
    *          The number of values which can be contained within this vector.
    */
   public void allocateNew(int valueCount) {
+    final int size = getSizeFromCount(valueCount);
+    allocateBytes(size);
+  }
+
+  private void allocateBytes(final long size) {
+    if (size > MAX_ALLOCATION_SIZE) {
+      throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
+    }
+
+    final int curSize = (int)size;
     clear();
-    int valueSize = getSizeFromCount(valueCount);
-    DrillBuf newBuf = allocator.buffer(valueSize);
+    final DrillBuf newBuf = allocator.buffer(curSize);
     if (newBuf == null) {
-      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of d% bytes.", valueSize));
+      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of d% bytes.", curSize));
     }
-
     data = newBuf;
     zeroVector();
+    allocationSizeInBytes = curSize;
   }
 
   /**
    * Allocate new buffer with double capacity, and copy data into the new buffer. Replace vector's buffer with new buffer, and release old one
    */
   public void reAlloc() {
-    allocationValueCount *= 2;
-    int valueSize = getSizeFromCount(allocationValueCount);
-    DrillBuf newBuf = allocator.buffer(valueSize);
+    final long newAllocationSize = allocationSizeInBytes * 2L;
+    if (newAllocationSize > MAX_ALLOCATION_SIZE) {
+      throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
+    }
+
+    final int curSize = (int)newAllocationSize;
+    final DrillBuf newBuf = allocator.buffer(curSize);
     if (newBuf == null) {
-      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes.", valueSize));
+      throw new OutOfMemoryRuntimeException(String.format("Failure while allocating buffer of %d bytes.", newAllocationSize));
     }
 
     newBuf.setZero(0, newBuf.capacity());
     newBuf.setBytes(0, data, 0, data.capacity());
     data.release();
     data = newBuf;
+    allocationSizeInBytes =  curSize;
   }
 
   /**
@@ -154,7 +165,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     clear();
     this.valueCount = valueCount;
     int len = getSizeFromCount(valueCount);
-    data = (DrillBuf) buf.slice(0, len);
+    data = buf.slice(0, len);
     data.retain();
     return len;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/b2bbd994/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
index 037c8c6..4eeb8f2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
@@ -23,8 +23,10 @@ import static org.junit.Assert.assertTrue;
 
 import java.nio.charset.Charset;
 
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.exception.OversizedAllocationException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.IntHolder;
@@ -38,15 +40,19 @@ import org.apache.drill.exec.expr.holders.UInt4Holder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.BitVector;
 import org.apache.drill.exec.vector.NullableFloat4Vector;
 import org.apache.drill.exec.vector.NullableUInt4Vector;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarCharVector;
 import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.RepeatedListVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestValueVector extends ExecTest {
@@ -56,29 +62,124 @@ public class TestValueVector extends ExecTest {
   private final static byte[] STR2 = new String("BBBBBBBBB2").getBytes(Charset.forName("UTF-8"));
   private final static byte[] STR3 = new String("CCCC3").getBytes(Charset.forName("UTF-8"));
 
-  TopLevelAllocator allocator = new TopLevelAllocator();
+  private TopLevelAllocator allocator;
+
+  @Before
+  public void init() {
+    allocator = new TopLevelAllocator();
+  }
+
+  @After
+  public void terminate() {
+    allocator.close();
+  }
+
+
+  @Test(expected = OversizedAllocationException.class)
+  public void testFixedVectorReallocation() {
+    final MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, UInt4Holder.TYPE);
+    final UInt4Vector vector = new UInt4Vector(field, allocator);
+    // edge case 1: buffer size = max value capacity
+    final int expectedValueCapacity = BaseValueVector.MAX_ALLOCATION_SIZE / 4;
+    try {
+      vector.allocateNew(expectedValueCapacity);
+      assertEquals(expectedValueCapacity, vector.getValueCapacity());
+      vector.reAlloc();
+      assertEquals(expectedValueCapacity * 2, vector.getValueCapacity());
+    } finally {
+      vector.close();
+    }
+
+    // common case: value count < max value capacity
+    try {
+      vector.allocateNew(BaseValueVector.MAX_ALLOCATION_SIZE / 8);
+      vector.reAlloc(); // value allocation reaches to MAX_VALUE_ALLOCATION
+      vector.reAlloc(); // this should throw an IOOB
+    } finally {
+      vector.close();
+    }
+  }
+
+  @Test(expected = OversizedAllocationException.class)
+  public void testBitVectorReallocation() {
+    final MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, UInt4Holder.TYPE);
+    final BitVector vector = new BitVector(field, allocator);
+    // edge case 1: buffer size ~ max value capacity
+    final int expectedValueCapacity = 1 << 29;
+    try {
+      vector.allocateNew(expectedValueCapacity);
+      assertEquals(expectedValueCapacity, vector.getValueCapacity());
+      vector.reAlloc();
+      assertEquals(expectedValueCapacity * 2, vector.getValueCapacity());
+    } finally {
+      vector.close();
+    }
+
+    // common: value count < MAX_VALUE_ALLOCATION
+    try {
+      vector.allocateNew(expectedValueCapacity);
+      for (int i=0; i<3;i++) {
+        vector.reAlloc(); // expand buffer size
+      }
+      assertEquals(Integer.MAX_VALUE, vector.getValueCapacity());
+      vector.reAlloc(); // buffer size ~ max allocation
+      assertEquals(Integer.MAX_VALUE, vector.getValueCapacity());
+      vector.reAlloc(); // overflow
+    } finally {
+      vector.close();
+    }
+  }
+
+
+  @Test(expected = OversizedAllocationException.class)
+  public void testVariableVectorReallocation() {
+    final MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, UInt4Holder.TYPE);
+    final VarCharVector vector = new VarCharVector(field, allocator);
+    // edge case 1: value count = MAX_VALUE_ALLOCATION
+    final int expectedAllocationInBytes = BaseValueVector.MAX_ALLOCATION_SIZE;
+    final int expectedOffsetSize = 10;
+    try {
+      vector.allocateNew(expectedAllocationInBytes, 10);
+      assertEquals(expectedOffsetSize, vector.getValueCapacity());
+      assertEquals(expectedAllocationInBytes, vector.getBuffer().capacity());
+      vector.reAlloc();
+      assertEquals(expectedOffsetSize * 2, vector.getValueCapacity());
+      assertEquals(expectedAllocationInBytes * 2, vector.getBuffer().capacity());
+    } finally {
+      vector.close();
+    }
+
+    // common: value count < MAX_VALUE_ALLOCATION
+    try {
+      vector.allocateNew(BaseValueVector.MAX_ALLOCATION_SIZE / 2, 0);
+      vector.reAlloc(); // value allocation reaches to MAX_VALUE_ALLOCATION
+      vector.reAlloc(); // this tests if it overflows
+    } finally {
+      vector.close();
+    }
+  }
 
   @Test
   public void testFixedType() {
     MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, UInt4Holder.TYPE);
 
     // Create a new value vector for 1024 integers
-    UInt4Vector v = new UInt4Vector(field, allocator);
-    UInt4Vector.Mutator m = v.getMutator();
-    v.allocateNew(1024);
-
-    // Put and set a few values
-    m.setSafe(0, 100);
-    m.setSafe(1, 101);
-    m.setSafe(100, 102);
-    m.setSafe(1022, 103);
-    m.setSafe(1023, 104);
-    assertEquals(100, v.getAccessor().get(0));
-    assertEquals(101, v.getAccessor().get(1));
-    assertEquals(102, v.getAccessor().get(100));
-    assertEquals(103, v.getAccessor().get(1022));
-    assertEquals(104, v.getAccessor().get(1023));
-
+    try (UInt4Vector vector = new UInt4Vector(field, allocator)) {
+      UInt4Vector.Mutator m = vector.getMutator();
+      vector.allocateNew(1024);
+
+      // Put and set a few values
+      m.setSafe(0, 100);
+      m.setSafe(1, 101);
+      m.setSafe(100, 102);
+      m.setSafe(1022, 103);
+      m.setSafe(1023, 104);
+      assertEquals(100, vector.getAccessor().get(0));
+      assertEquals(101, vector.getAccessor().get(1));
+      assertEquals(102, vector.getAccessor().get(100));
+      assertEquals(103, vector.getAccessor().get(1022));
+      assertEquals(104, vector.getAccessor().get(1023));
+    }
   }
 
   @Test
@@ -86,29 +187,29 @@ public class TestValueVector extends ExecTest {
     MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableVarCharHolder.TYPE);
 
     // Create a new value vector for 1024 integers
-    NullableVarCharVector v = new NullableVarCharVector(field, allocator);
-    NullableVarCharVector.Mutator m = v.getMutator();
-    v.allocateNew(1024*10, 1024);
+    try (NullableVarCharVector vector = new NullableVarCharVector(field, allocator)) {
+      NullableVarCharVector.Mutator m = vector.getMutator();
+      vector.allocateNew(1024 * 10, 1024);
 
-    m.set(0, STR1);
-    m.set(1, STR2);
-    m.set(2, STR3);
+      m.set(0, STR1);
+      m.set(1, STR2);
+      m.set(2, STR3);
 
-    // Check the sample strings
-    assertArrayEquals(STR1, v.getAccessor().get(0));
-    assertArrayEquals(STR2, v.getAccessor().get(1));
-    assertArrayEquals(STR3, v.getAccessor().get(2));
+      // Check the sample strings
+      assertArrayEquals(STR1, vector.getAccessor().get(0));
+      assertArrayEquals(STR2, vector.getAccessor().get(1));
+      assertArrayEquals(STR3, vector.getAccessor().get(2));
 
-    // Ensure null value throws
-    boolean b = false;
-    try {
-      v.getAccessor().get(3);
-    } catch(IllegalStateException e) {
-      b = true;
-    }finally{
-      assertTrue(b);
+      // Ensure null value throws
+      boolean b = false;
+      try {
+        vector.getAccessor().get(3);
+      } catch (IllegalStateException e) {
+        b = true;
+      } finally {
+        assertTrue(b);
+      }
     }
-
   }
 
 
@@ -117,68 +218,69 @@ public class TestValueVector extends ExecTest {
     MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableUInt4Holder.TYPE);
 
     // Create a new value vector for 1024 integers
-    NullableUInt4Vector v = new NullableUInt4Vector(field, allocator);
-    NullableUInt4Vector.Mutator m = v.getMutator();
-    v.allocateNew(1024);
-
-    // Put and set a few values
-    m.set(0, 100);
-    m.set(1, 101);
-    m.set(100, 102);
-    m.set(1022, 103);
-    m.set(1023, 104);
-    assertEquals(100, v.getAccessor().get(0));
-    assertEquals(101, v.getAccessor().get(1));
-    assertEquals(102, v.getAccessor().get(100));
-    assertEquals(103, v.getAccessor().get(1022));
-    assertEquals(104, v.getAccessor().get(1023));
-
-    // Ensure null values throw
-    {
-      boolean b = false;
-      try {
-        v.getAccessor().get(3);
-      } catch(IllegalStateException e) {
-        b = true;
-      }finally{
-        assertTrue(b);
+    try (NullableUInt4Vector vector = new NullableUInt4Vector(field, allocator)) {
+      NullableUInt4Vector.Mutator m = vector.getMutator();
+      vector.allocateNew(1024);
+
+      // Put and set a few values
+      m.set(0, 100);
+      m.set(1, 101);
+      m.set(100, 102);
+      m.set(1022, 103);
+      m.set(1023, 104);
+      assertEquals(100, vector.getAccessor().get(0));
+      assertEquals(101, vector.getAccessor().get(1));
+      assertEquals(102, vector.getAccessor().get(100));
+      assertEquals(103, vector.getAccessor().get(1022));
+      assertEquals(104, vector.getAccessor().get(1023));
+
+      // Ensure null values throw
+      {
+        boolean b = false;
+        try {
+          vector.getAccessor().get(3);
+        } catch (IllegalStateException e) {
+          b = true;
+        } finally {
+          assertTrue(b);
+        }
       }
-    }
 
 
-    v.allocateNew(2048);
-    {
-      boolean b = false;
-      try {
-        v.getAccessor().get(0);
-      } catch(IllegalStateException e) {
-        b = true;
-      }finally{
-        assertTrue(b);
+      vector.allocateNew(2048);
+      {
+        boolean b = false;
+        try {
+          vector.getAccessor().get(0);
+        } catch (IllegalStateException e) {
+          b = true;
+        } finally {
+          assertTrue(b);
+        }
       }
-    }
-
-    m.set(0, 100);
-    m.set(1, 101);
-    m.set(100, 102);
-    m.set(1022, 103);
-    m.set(1023, 104);
-    assertEquals(100, v.getAccessor().get(0));
-    assertEquals(101, v.getAccessor().get(1));
-    assertEquals(102, v.getAccessor().get(100));
-    assertEquals(103, v.getAccessor().get(1022));
-    assertEquals(104, v.getAccessor().get(1023));
 
-    // Ensure null values throw
-
-    {
-      boolean b = false;
-      try {
-        v.getAccessor().get(3);
-      } catch(IllegalStateException e) {
-        b = true;
-      }finally{
-        assertTrue(b);
+      m.set(0, 100);
+      m.set(1, 101);
+      m.set(100, 102);
+      m.set(1022, 103);
+      m.set(1023, 104);
+      assertEquals(100, vector.getAccessor().get(0));
+      assertEquals(101, vector.getAccessor().get(1));
+      assertEquals(102, vector.getAccessor().get(100));
+      assertEquals(103, vector.getAccessor().get(1022));
+      assertEquals(104, vector.getAccessor().get(1023));
+
+      // Ensure null values throw
+
+      {
+        boolean b = false;
+        try {
+          vector.getAccessor().get(3);
+        } catch (IllegalStateException e) {
+          b = true;
+        } finally {
+          assertTrue(b);
+        }
       }
     }
 
@@ -189,43 +291,44 @@ public class TestValueVector extends ExecTest {
     MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableFloat4Holder.TYPE);
 
     // Create a new value vector for 1024 integers
-    NullableFloat4Vector v = (NullableFloat4Vector) TypeHelper.getNewVector(field, allocator);
-    NullableFloat4Vector.Mutator m = v.getMutator();
-    v.allocateNew(1024);
-
-    // Put and set a few values
-    m.set(0, 100.1f);
-    m.set(1, 101.2f);
-    m.set(100, 102.3f);
-    m.set(1022, 103.4f);
-    m.set(1023, 104.5f);
-    assertEquals(100.1f, v.getAccessor().get(0), 0);
-    assertEquals(101.2f, v.getAccessor().get(1), 0);
-    assertEquals(102.3f, v.getAccessor().get(100), 0);
-    assertEquals(103.4f, v.getAccessor().get(1022), 0);
-    assertEquals(104.5f, v.getAccessor().get(1023), 0);
-
-    // Ensure null values throw
-    {
-      boolean b = false;
-      try {
-        v.getAccessor().get(3);
-      } catch(IllegalStateException e) {
-        b = true;
-      }finally{
-        assertTrue(b);
+    try (NullableFloat4Vector vector = (NullableFloat4Vector) TypeHelper.getNewVector(field, allocator)) {
+      NullableFloat4Vector.Mutator m = vector.getMutator();
+      vector.allocateNew(1024);
+
+      // Put and set a few values
+      m.set(0, 100.1f);
+      m.set(1, 101.2f);
+      m.set(100, 102.3f);
+      m.set(1022, 103.4f);
+      m.set(1023, 104.5f);
+      assertEquals(100.1f, vector.getAccessor().get(0), 0);
+      assertEquals(101.2f, vector.getAccessor().get(1), 0);
+      assertEquals(102.3f, vector.getAccessor().get(100), 0);
+      assertEquals(103.4f, vector.getAccessor().get(1022), 0);
+      assertEquals(104.5f, vector.getAccessor().get(1023), 0);
+
+      // Ensure null values throw
+      {
+        boolean b = false;
+        try {
+          vector.getAccessor().get(3);
+        } catch (IllegalStateException e) {
+          b = true;
+        } finally {
+          assertTrue(b);
+        }
       }
-    }
 
-    v.allocateNew(2048);
-    {
-      boolean b = false;
-      try {
-        v.getAccessor().get(0);
-      } catch(IllegalStateException e) {
-        b = true;
-      }finally{
-        assertTrue(b);
+      vector.allocateNew(2048);
+      {
+        boolean b = false;
+        try {
+          vector.getAccessor().get(0);
+        } catch (IllegalStateException e) {
+          b = true;
+        } finally {
+          assertTrue(b);
+        }
       }
     }
   }
@@ -235,36 +338,37 @@ public class TestValueVector extends ExecTest {
     MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, BitHolder.TYPE);
 
     // Create a new value vector for 1024 integers
-    BitVector v = new BitVector(field, allocator);
-    BitVector.Mutator m = v.getMutator();
-    v.allocateNew(1024);
-
-    // Put and set a few values
-    m.set(0, 1);
-    m.set(1, 0);
-    m.set(100, 0);
-    m.set(1022, 1);
-    assertEquals(1, v.getAccessor().get(0));
-    assertEquals(0, v.getAccessor().get(1));
-    assertEquals(0, v.getAccessor().get(100));
-    assertEquals(1, v.getAccessor().get(1022));
-
-    // test setting the same value twice
-    m.set(0, 1);
-    m.set(0, 1);
-    m.set(1, 0);
-    m.set(1, 0);
-    assertEquals(1, v.getAccessor().get(0));
-    assertEquals(0, v.getAccessor().get(1));
-
-    // test toggling the values
-    m.set(0, 0);
-    m.set(1, 1);
-    assertEquals(0, v.getAccessor().get(0));
-    assertEquals(1, v.getAccessor().get(1));
-
-    // Ensure unallocated space returns 0
-    assertEquals(0, v.getAccessor().get(3));
+    try (BitVector vector = new BitVector(field, allocator)) {
+      BitVector.Mutator m = vector.getMutator();
+      vector.allocateNew(1024);
+
+      // Put and set a few values
+      m.set(0, 1);
+      m.set(1, 0);
+      m.set(100, 0);
+      m.set(1022, 1);
+      assertEquals(1, vector.getAccessor().get(0));
+      assertEquals(0, vector.getAccessor().get(1));
+      assertEquals(0, vector.getAccessor().get(100));
+      assertEquals(1, vector.getAccessor().get(1022));
+
+      // test setting the same value twice
+      m.set(0, 1);
+      m.set(0, 1);
+      m.set(1, 0);
+      m.set(1, 0);
+      assertEquals(1, vector.getAccessor().get(0));
+      assertEquals(0, vector.getAccessor().get(1));
+
+      // test toggling the values
+      m.set(0, 0);
+      m.set(1, 1);
+      assertEquals(0, vector.getAccessor().get(0));
+      assertEquals(1, vector.getAccessor().get(1));
+
+      // Ensure unallocated space returns 0
+      assertEquals(0, vector.getAccessor().get(3));
+    }
   }
 
 
@@ -273,33 +377,34 @@ public class TestValueVector extends ExecTest {
     MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableFloat4Holder.TYPE);
 
     // Create a new value vector for 1024 integers
-    NullableFloat4Vector v = (NullableFloat4Vector) TypeHelper.getNewVector(field, allocator);
-    NullableFloat4Vector.Mutator m = v.getMutator();
-    v.allocateNew(1024);
+    try (NullableFloat4Vector vector = (NullableFloat4Vector) TypeHelper.getNewVector(field, allocator)) {
+      NullableFloat4Vector.Mutator m = vector.getMutator();
+      vector.allocateNew(1024);
 
-    assertEquals(1024, v.getValueCapacity());
+      assertEquals(1024, vector.getValueCapacity());
 
-    // Put values in indexes that fall within the initial allocation
-    m.setSafe(0, 100.1f);
-    m.setSafe(100, 102.3f);
-    m.setSafe(1023, 104.5f);
+      // Put values in indexes that fall within the initial allocation
+      m.setSafe(0, 100.1f);
+      m.setSafe(100, 102.3f);
+      m.setSafe(1023, 104.5f);
 
-    // Now try to put values in space that falls beyond the initial allocation
-    m.setSafe(2000, 105.5f);
+      // Now try to put values in space that falls beyond the initial allocation
+      m.setSafe(2000, 105.5f);
 
-    // Check valueCapacity is more than initial allocation
-    assertEquals(1024*2, v.getValueCapacity());
+      // Check valueCapacity is more than initial allocation
+      assertEquals(1024 * 2, vector.getValueCapacity());
 
-    assertEquals(100.1f, v.getAccessor().get(0), 0);
-    assertEquals(102.3f, v.getAccessor().get(100), 0);
-    assertEquals(104.5f, v.getAccessor().get(1023), 0);
-    assertEquals(105.5f, v.getAccessor().get(2000), 0);
+      assertEquals(100.1f, vector.getAccessor().get(0), 0);
+      assertEquals(102.3f, vector.getAccessor().get(100), 0);
+      assertEquals(104.5f, vector.getAccessor().get(1023), 0);
+      assertEquals(105.5f, vector.getAccessor().get(2000), 0);
 
 
-    // Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors
-    // as we don't call setSafe for null values, but we do call setValueCount when all values are inserted into the
-    // vector
-    m.setValueCount(v.getValueCapacity() + 200);
+      // Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors
+      // as we don't call setSafe for null values, but we do call setValueCount when all values are inserted into the
+      // vector
+      m.setValueCount(vector.getValueCapacity() + 200);
+    }
   }
 
   @Test
@@ -307,33 +412,34 @@ public class TestValueVector extends ExecTest {
     MaterializedField field = MaterializedField.create(EMPTY_SCHEMA_PATH, NullableVarCharHolder.TYPE);
 
     // Create a new value vector for 1024 integers
-    NullableVarCharVector v = (NullableVarCharVector) TypeHelper.getNewVector(field, allocator);
-    NullableVarCharVector.Mutator m = v.getMutator();
-    v.allocateNew();
+    try (NullableVarCharVector vector = (NullableVarCharVector) TypeHelper.getNewVector(field, allocator)) {
+      NullableVarCharVector.Mutator m = vector.getMutator();
+      vector.allocateNew();
 
-    int initialCapacity = v.getValueCapacity();
+      int initialCapacity = vector.getValueCapacity();
 
-    // Put values in indexes that fall within the initial allocation
-    m.setSafe(0, STR1, 0, STR1.length);
-    m.setSafe(initialCapacity - 1, STR2, 0, STR2.length);
+      // Put values in indexes that fall within the initial allocation
+      m.setSafe(0, STR1, 0, STR1.length);
+      m.setSafe(initialCapacity - 1, STR2, 0, STR2.length);
 
-    // Now try to put values in space that falls beyond the initial allocation
-    m.setSafe(initialCapacity + 200, STR3, 0, STR3.length);
+      // Now try to put values in space that falls beyond the initial allocation
+      m.setSafe(initialCapacity + 200, STR3, 0, STR3.length);
 
-    // Check valueCapacity is more than initial allocation
-    assertEquals((initialCapacity+1)*2-1, v.getValueCapacity());
+      // Check valueCapacity is more than initial allocation
+      assertEquals((initialCapacity + 1) * 2 - 1, vector.getValueCapacity());
 
-    assertArrayEquals(STR1, v.getAccessor().get(0));
-    assertArrayEquals(STR2, v.getAccessor().get(initialCapacity-1));
-    assertArrayEquals(STR3, v.getAccessor().get(initialCapacity + 200));
+      assertArrayEquals(STR1, vector.getAccessor().get(0));
+      assertArrayEquals(STR2, vector.getAccessor().get(initialCapacity - 1));
+      assertArrayEquals(STR3, vector.getAccessor().get(initialCapacity + 200));
 
-    // Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors
-    // as we don't call setSafe for null values, but we do call setValueCount when the current batch is processed.
-    m.setValueCount(v.getValueCapacity() + 200);
+      // Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors
+      // as we don't call setSafe for null values, but we do call setValueCount when the current batch is processed.
+      m.setValueCount(vector.getValueCapacity() + 200);
+    }
   }
 
   @Test
-  public void testVVInitialCapacity() {
+  public void testVVInitialCapacity() throws Exception {
     final MaterializedField[] fields = new MaterializedField[9];
     final ValueVector[] valueVectors = new ValueVector[9];
 
@@ -357,17 +463,21 @@ public class TestValueVector extends ExecTest {
 
     final int initialCapacity = 1024;
 
-    for(int i=0; i<valueVectors.length; i++) {
-      valueVectors[i] = TypeHelper.getNewVector(fields[i], allocator);
-      valueVectors[i].setInitialCapacity(initialCapacity);
-      valueVectors[i].allocateNew();
-    }
+    try {
+      for (int i = 0; i < valueVectors.length; i++) {
+        valueVectors[i] = TypeHelper.getNewVector(fields[i], allocator);
+        valueVectors[i].setInitialCapacity(initialCapacity);
+        valueVectors[i].allocateNew();
+      }
 
-    for(int i=0; i<valueVectors.length; i++) {
-      final ValueVector vv = valueVectors[i];
-      final int vvCapacity = vv.getValueCapacity();
-      assertEquals(String.format("Incorrect value capacity for %s [%d]", vv.getField(), vvCapacity),
-          initialCapacity, vvCapacity);
+      for (int i = 0; i < valueVectors.length; i++) {
+        final ValueVector vv = valueVectors[i];
+        final int vvCapacity = vv.getValueCapacity();
+        assertEquals(String.format("Incorrect value capacity for %s [%d]", vv.getField(), vvCapacity),
+            initialCapacity, vvCapacity);
+      }
+    } finally {
+      AutoCloseables.close(valueVectors);
     }
   }
 }


[2/3] drill git commit: DRILL-3243: Better error message when we use an alias in OVER clause

Posted by hg...@apache.org.
DRILL-3243: Better error message when we use an alias in OVER clause


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1c9093e0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1c9093e0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1c9093e0

Branch: refs/heads/master
Commit: 1c9093e0f34daaeb1ad6661bb4d4115bc573ed78
Parents: b2bbd99
Author: adeneche <ad...@gmail.com>
Authored: Mon Jun 8 16:00:23 2015 -0700
Committer: Hanifi Gunes <hg...@maprtech.com>
Committed: Mon Jul 6 16:28:29 2015 -0700

----------------------------------------------------------------------
 .../drill/common/exceptions/UserException.java  |  4 ++++
 .../common/exceptions/UserRemoteException.java  |  2 +-
 .../compliant/CompliantTextRecordReader.java    |  3 +++
 .../text/compliant/RepeatedVarCharOutput.java   |  7 +++---
 .../exec/store/text/TestNewTextReader.java      | 23 +++++++++++++++-----
 5 files changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/1c9093e0/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index 13c17bd..b943710 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -616,6 +616,10 @@ public class UserException extends DrillRuntimeException {
     return context.getErrorId();
   }
 
+  public DrillPBError.ErrorType getErrorType() {
+    return errorType;
+  }
+
   public String getErrorLocation() {
     DrillbitEndpoint ep = context.getEndpoint();
     if (ep != null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/1c9093e0/common/src/main/java/org/apache/drill/common/exceptions/UserRemoteException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserRemoteException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserRemoteException.java
index 1b3fa42..d13fb49 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserRemoteException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserRemoteException.java
@@ -28,7 +28,7 @@ public class UserRemoteException extends UserException {
   private final DrillPBError error;
 
   public UserRemoteException(DrillPBError error) {
-    super(null, "Drill Remote Exception", null);
+    super(error.getErrorType(), "Drill Remote Exception", null);
     this.error = error;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1c9093e0/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
index 254e0d8..27b9116 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -112,6 +113,8 @@ public class CompliantTextRecordReader extends AbstractRecordReader {
       reader.start();
     } catch (SchemaChangeException | IOException e) {
       throw new ExecutionSetupException(String.format("Failure while setting up text reader for file %s", split.getPath()), e);
+    } catch (IllegalArgumentException e) {
+      throw UserException.dataReadError(e).addContext("File Path", split.getPath().toString()).build(logger);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1c9093e0/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
index 40276f4..91b1a7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
@@ -132,13 +132,14 @@ class RepeatedVarCharOutput extends TextOutput {
       if (!isStarQuery) {
         String pathStr;
         for (SchemaPath path : columns) {
-          assert path.getRootSegment().isNamed();
+          assert path.getRootSegment().isNamed() : "root segment should be named";
           pathStr = path.getRootSegment().getPath();
           Preconditions.checkArgument(pathStr.equals(COL_NAME) || (pathStr.equals("*") && path.getRootSegment().getChild() == null),
-              "Selected column(s) must have name 'columns' or must be plain '*'");
+              String.format("Selected column '%s' must have name 'columns' or must be plain '*'", pathStr));
 
           if (path.getRootSegment().getChild() != null) {
-            Preconditions.checkArgument(path.getRootSegment().getChild().isArray(), "Selected column must be an array index");
+            Preconditions.checkArgument(path.getRootSegment().getChild().isArray(),
+              String.format("Selected column '%s' must be an array index", pathStr));
             int index = path.getRootSegment().getChild().getArraySegment().getIndex();
             columnIds.add(index);
           }

http://git-wip-us.apache.org/repos/asf/drill/blob/1c9093e0/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
index 76674f9..e63e528 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
@@ -17,17 +17,19 @@
  */
 package org.apache.drill.exec.store.text;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import org.apache.drill.BaseTestQuery;
-import org.junit.Assert;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
 import org.junit.Test;
 
 public class TestNewTextReader extends BaseTestQuery {
 
   @Test
   public void fieldDelimiterWithinQuotes() throws Exception {
-    test("select columns[1] as col1 from cp.`textinput/input1.csv`");
     testBuilder()
         .sqlQuery("select columns[1] as col1 from cp.`textinput/input1.csv`")
         .unOrdered()
@@ -37,14 +39,25 @@ public class TestNewTextReader extends BaseTestQuery {
   }
 
   @Test
-  public void ensureFailureOnNewLineDelimiterWithinQuotes() throws Exception {
+  public void ensureFailureOnNewLineDelimiterWithinQuotes() {
     try {
       test("select columns[1] as col1 from cp.`textinput/input2.csv`");
+      fail("Expected exception not thrown.");
     } catch (Exception e) {
       assertTrue(e.getMessage().contains("Cannot use newline character within quoted string"));
-      return;
     }
-    Assert.fail("Expected exception not thrown.");
   }
 
+  @Test
+  public void ensureColumnNameDisplayedinError() throws Exception {
+    final String COL_NAME = "col1";
+
+    try {
+      test("select max(columns[1]) as %s from cp.`textinput/input1.csv` where %s is not null", COL_NAME, COL_NAME);
+      fail("Query should have failed");
+    } catch(UserRemoteException ex) {
+      assertEquals(ErrorType.DATA_READ, ex.getErrorType());
+      assertTrue("Error message should contain " + COL_NAME, ex.getMessage().contains(COL_NAME));
+    }
+  }
 }