You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/01/23 02:56:33 UTC

[1/3] drill git commit: DRILL-1960: Automatic reallocation

Repository: drill
Updated Branches:
  refs/heads/master 839ae249b -> a22b47243


http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 49ad390..a6294d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -51,9 +51,7 @@ public abstract class ProjectorTemplate implements Projector {
     case TWO_BYTE:
       final int count = recordCount;
       for (int i = 0; i < count; i++, firstOutputIndex++) {
-        if (!doEval(vector2.getIndex(i), firstOutputIndex)) {
-          return i;
-        }
+        doEval(vector2.getIndex(i), firstOutputIndex);
       }
       return recordCount;
 
@@ -61,9 +59,7 @@ public abstract class ProjectorTemplate implements Projector {
       final int countN = recordCount;
       int i;
       for (i = startIndex; i < startIndex + countN; i++, firstOutputIndex++) {
-        if (!doEval(i, firstOutputIndex)) {
-          break;
-        }
+        doEval(i, firstOutputIndex);
       }
       if (i < startIndex + recordCount || startIndex > 0) {
         for (TransferPair t : transfers) {
@@ -98,6 +94,6 @@ public abstract class ProjectorTemplate implements Projector {
   }
 
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
-  public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
index 5cc308a..d2b94c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
@@ -19,11 +19,14 @@ package org.apache.drill.exec.physical.impl.svremover;
 
 import javax.inject.Named;
 
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.AllocationHelper;
 
 
 public abstract class CopierTemplate2 implements Copier{
@@ -42,21 +45,24 @@ public abstract class CopierTemplate2 implements Copier{
   @Override
   public int copyRecords(int index, int recordCount){
     for(VectorWrapper<?> out : outgoing){
-      out.getValueVector().allocateNew();
+      MajorType type = out.getField().getType();
+      if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
+        out.getValueVector().allocateNew();
+      } else {
+        AllocationHelper.allocate(out.getValueVector(), recordCount, 1);
+      }
     }
 
     int outgoingPosition = 0;
 
     for(int svIndex = index; svIndex < index + recordCount; svIndex++, outgoingPosition++){
-      if (!doEval(sv2.getIndex(svIndex), outgoingPosition)) {
-        break;
-      }
+      doEval(sv2.getIndex(svIndex), outgoingPosition);
     }
     return outgoingPosition;
   }
 
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
-  public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
 
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
index c42332d..57c2e36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
@@ -19,11 +19,14 @@ package org.apache.drill.exec.physical.impl.svremover;
 
 import javax.inject.Named;
 
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.AllocationHelper;
 
 public abstract class CopierTemplate4 implements Copier{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class);
@@ -43,21 +46,24 @@ public abstract class CopierTemplate4 implements Copier{
   @Override
   public int copyRecords(int index, int recordCount){
     for(VectorWrapper<?> out : outgoing){
-      out.getValueVector().allocateNew();
+      MajorType type = out.getField().getType();
+      if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
+        out.getValueVector().allocateNew();
+      } else {
+        AllocationHelper.allocate(out.getValueVector(), recordCount, 1);
+      }
     }
 
     int outgoingPosition = 0;
     for(int svIndex = index; svIndex < index + recordCount; svIndex++, outgoingPosition++){
       int deRefIndex = sv4.get(svIndex);
-      if (!doEval(deRefIndex, outgoingPosition)) {
-        break;
-      }
+      doEval(deRefIndex, outgoingPosition);
     }
     return outgoingPosition;
   }
 
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
-  public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
 
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
index a3e7940..26d23f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
@@ -220,9 +220,7 @@ public class StreamingWindowFrameRecordBatch extends AbstractSingleRecordBatch<W
     cg.setMappingSet(EVAL);
     for (LogicalExpression ex : valueExprs) {
       ClassGenerator.HoldingContainer hc = cg.addExpr(ex);
-      cg.getBlock(ClassGenerator.BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
     }
-    cg.getBlock(ClassGenerator.BlockType.EVAL)._return(JExpr.TRUE);
   }
 
   private final GeneratorMapping OUTPUT_WINDOW_VALUES = GeneratorMapping.create("setupInterior", "outputWindowValues", null, null);
@@ -232,9 +230,7 @@ public class StreamingWindowFrameRecordBatch extends AbstractSingleRecordBatch<W
     cg.setMappingSet(WINDOW_VALUES);
     for (int i = 0; i < valueExprs.length; i++) {
       ClassGenerator.HoldingContainer hc = cg.addExpr(valueExprs[i]);
-      cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
     }
-    cg.getEvalBlock()._return(JExpr.TRUE);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java
index b4e3fed..e2c7e9e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java
@@ -243,14 +243,10 @@ public abstract class StreamingWindowFrameTemplate implements StreamingWindowFra
   }
 
   private final boolean outputToBatch(int inIndex) {
-    boolean success = outputRecordValues(outputCount)
-        && outputWindowValues(inIndex, outputCount);
+    outputRecordValues(outputCount);
+    outputWindowValues(inIndex, outputCount);
 
-    if (success) {
-      outputCount++;
-    }
-
-    return success;
+    return true;
   }
 
   @Override
@@ -278,8 +274,8 @@ public abstract class StreamingWindowFrameTemplate implements StreamingWindowFra
    */
   public abstract boolean isSameFromBatch(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int index2);
   public abstract void addRecord(@Named("index") int index);
-  public abstract boolean outputRecordValues(@Named("outIndex") int outIndex);
-  public abstract boolean outputWindowValues(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract void outputRecordValues(@Named("outIndex") int outIndex);
+  public abstract void outputWindowValues(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
   public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
 
   public abstract boolean resetValues();

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index c57ec28..dd3d4b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -78,10 +78,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
       int compoundIndex = vector4.get(0);
       int batch = compoundIndex >>> 16;
       assert batch < batchGroups.size() : String.format("batch: %d batchGroups: %d", batch, batchGroups.size());
-      if (!doCopy(compoundIndex, outgoingIndex)) {
-        setValueCount(outgoingIndex);
-        return outgoingIndex;
-      }
+      doCopy(compoundIndex, outgoingIndex);
       int nextIndex = batchGroups.get(batch).getNextIndex();
       if (nextIndex < 0) {
         vector4.set(0, vector4.get(--queueSize));
@@ -174,6 +171,6 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
 
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
   public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
-  public abstract boolean doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract void doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
index 9645be9..3585a8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
@@ -23,5 +23,5 @@ public interface TransferPair {
   public void transfer();
   public void splitAndTransfer(int startIndex, int length);
   public ValueVector getTo();
-  public boolean copyValueSafe(int from, int to);
+  public void copyValueSafe(int from, int to);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
index 0c4437a..f20d765 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
@@ -200,10 +200,8 @@ public class FixedWidthRepeatedReader extends VarLengthColumn {
       currentValueListLength += numLeftoverVals;
     }
     // this should not fail
-    if (!castedRepeatedVector.getMutator().setRepetitionAtIndexSafe(repeatedGroupsReadInCurrentPass,
-        currentValueListLength)) {
-      return true;
-    }
+    castedRepeatedVector.getMutator().setRepetitionAtIndexSafe(repeatedGroupsReadInCurrentPass,
+        currentValueListLength);
     // This field is being referenced in the superclass determineSize method, so we need to set it here
     // again going to make this the length in BYTES to avoid repetitive multiplication/division
     dataTypeLengthInBits = repeatedValuesInCurrentList * dataTypeLengthInBytes;

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
index b9b808b..a4143d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
@@ -50,10 +50,8 @@ final class NullableBitReader extends ColumnReader {
       defLevel = pageReader.definitionLevels.readInteger();
       // if the value is defined
       if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
-        if (!((NullableBitVector)valueVec).getMutator().setSafe(i + valuesReadInCurrentPass,
-            pageReader.valueReader.readBoolean() ? 1 : 0 )) {
-          throw new RuntimeException();
-        }
+        ((NullableBitVector)valueVec).getMutator().setSafe(i + valuesReadInCurrentPass,
+            pageReader.valueReader.readBoolean() ? 1 : 0 );
       }
       // otherwise the value is skipped, because the bit vector indicating nullability is zero filled
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
index 83f9bde..11bd29f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
@@ -176,7 +176,6 @@ public class VarLengthColumnReaders {
 
     @Override
     public boolean setSafe(int index, DrillBuf bytebuf, int start, int length) {
-      boolean success;
       if (index >= varCharVector.getValueCapacity()) {
         return false;
       }
@@ -189,16 +188,16 @@ public class VarLengthColumnReaders {
         holder.buffer=b;
         holder.start=0;
         holder.end=currDictValToWrite.length();
-        success = varCharVector.getMutator().setSafe(index, holder);
+        varCharVector.getMutator().setSafe(index, holder);
       }
       else {
         VarCharHolder holder = new VarCharHolder();
         holder.buffer=bytebuf;
         holder.start=start;
         holder.end=start+length;
-        success = varCharVector.getMutator().setSafe(index, holder);
+        varCharVector.getMutator().setSafe(index, holder);
       }
-      return success;
+      return true;
     }
 
     @Override
@@ -225,19 +224,18 @@ public class VarLengthColumnReaders {
 
     @Override
     public boolean setSafe(int index, DrillBuf value, int start, int length) {
-      boolean success;
       if (index >= vector.getValueCapacity()) {
         return false;
       }
 
       if (usingDictionary) {
         DrillBuf b = DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer());
-        success = mutator.setSafe(index, 1, 0, currDictValToWrite.length(), b);
+        mutator.setSafe(index, 1, 0, currDictValToWrite.length(), b);
       }
       else {
-        success = mutator.setSafe(index, 1, start, start+length, value);
+        mutator.setSafe(index, 1, start, start+length, value);
       }
-      return success;
+      return true;
     }
 
     @Override
@@ -260,7 +258,6 @@ public class VarLengthColumnReaders {
 
     @Override
     public boolean setSafe(int index, DrillBuf value, int start, int length) {
-      boolean success;
       if (index >= varBinaryVector.getValueCapacity()) {
         return false;
       }
@@ -273,16 +270,16 @@ public class VarLengthColumnReaders {
         holder.buffer=b;
         holder.start=0;
         holder.end=currDictValToWrite.length();
-        success = varBinaryVector.getMutator().setSafe(index, holder);
+        varBinaryVector.getMutator().setSafe(index, holder);
       }
       else {
         VarBinaryHolder holder = new VarBinaryHolder();
         holder.buffer=value;
         holder.start=start;
         holder.end=start+length;
-        success = varBinaryVector.getMutator().setSafe(index, holder);
+        varBinaryVector.getMutator().setSafe(index, holder);
       }
-      return success;
+      return true;
     }
 
     @Override
@@ -307,7 +304,6 @@ public class VarLengthColumnReaders {
 
     @Override
     public boolean setSafe(int index, DrillBuf value, int start, int length) {
-      boolean success;
       if (index >= nullableVarBinaryVector.getValueCapacity()) {
         return false;
       }
@@ -319,7 +315,7 @@ public class VarLengthColumnReaders {
         holder.start=0;
         holder.end=currDictValToWrite.length();
         holder.isSet=1;
-        success = nullableVarBinaryVector.getMutator().setSafe(index, holder);
+        nullableVarBinaryVector.getMutator().setSafe(index, holder);
       }
       else {
         NullableVarBinaryHolder holder = new NullableVarBinaryHolder();
@@ -327,9 +323,9 @@ public class VarLengthColumnReaders {
         holder.start=start;
         holder.end=start+length;
         holder.isSet=1;
-        success = nullableVarBinaryVector.getMutator().setSafe(index, holder);
+        nullableVarBinaryVector.getMutator().setSafe(index, holder);
       }
-      return  success;
+      return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
index 7c4b33b..78cf244 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
@@ -98,10 +98,8 @@ public abstract class VarLengthValuesColumn<V extends ValueVector> extends VarLe
     }
 
     // this should not fail
-    if (!variableWidthVector.getMutator().setValueLengthSafe((int) valuesReadInCurrentPass + pageReader.valuesReadyToRead,
-        dataTypeLengthInBits)) {
-      return true;
-    }
+    variableWidthVector.getMutator().setValueLengthSafe((int) valuesReadInCurrentPass + pageReader.valuesReadyToRead,
+        dataTypeLengthInBits);
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index 241fa95..b7ffbf0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -164,13 +164,7 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
         }
 
         for (PojoWriter writer : writers) {
-          if (!writer.writeField(currentPojo, i)) {
-            doCurrent = true;
-            if (i == 0) {
-              throw new IllegalStateException("Got into a position where we can't write data but the batch is empty.");
-            }
-            break outside;
-          };
+          writer.writeField(currentPojo, i);
         }
         i++;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriter.java
index 0ffa55c..31748f4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoWriter.java
@@ -21,7 +21,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 
 interface PojoWriter{
-  boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException ;
+  void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException ;
   void init(OutputMutator output) throws SchemaChangeException;
   void allocate();
   void setValueCount(int i);

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
index fee011a..e52384e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
@@ -51,9 +51,9 @@ public class Writers {
     }
 
     @Override
-    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+    public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
       int i = field.getInt(pojo);
-      return vector.getMutator().setSafe(outboundIndex, i);
+      vector.getMutator().setSafe(outboundIndex, i);
     }
 
   }
@@ -68,9 +68,9 @@ public class Writers {
     }
 
     @Override
-    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+    public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
       boolean b = field.getBoolean(pojo);
-      return vector.getMutator().setSafe(outboundIndex, b ? 1 : 0);
+      vector.getMutator().setSafe(outboundIndex, b ? 1 : 0);
     }
 
   }
@@ -85,9 +85,9 @@ public class Writers {
     }
 
     @Override
-    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+    public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
       long l = field.getLong(pojo);
-      return vector.getMutator().setSafe(outboundIndex, l);
+      vector.getMutator().setSafe(outboundIndex, l);
     }
 
   }
@@ -102,10 +102,10 @@ public class Writers {
     }
 
     @Override
-    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+    public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
       double d = field.getDouble(pojo);
 
-      return vector.getMutator().setSafe(outboundIndex, d);
+      vector.getMutator().setSafe(outboundIndex, d);
     }
 
   }
@@ -128,9 +128,9 @@ public class Writers {
     public void cleanup() {
     }
 
-    public boolean writeString(String s, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+    public void writeString(String s, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
       if (s == null) {
-        return true;
+        return;
       } else {
         h.isSet = 1;
         byte[] bytes = s.getBytes(Charsets.UTF_8);
@@ -140,7 +140,7 @@ public class Writers {
         h.buffer = data;
         h.start = 0;
         h.end = bytes.length;
-        return vector.getMutator().setSafe(outboundIndex, h);
+        vector.getMutator().setSafe(outboundIndex, h);
       }
     }
 
@@ -155,12 +155,12 @@ public class Writers {
     }
 
     @Override
-    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+    public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
       Enum<?> e= ((Enum<?>) field.get(pojo));
       if (e == null) {
-        return true;
+        return;
       }
-      return writeString(e.name(), outboundIndex);
+      writeString(e.name(), outboundIndex);
     }
   }
 
@@ -173,9 +173,9 @@ public class Writers {
     }
 
     @Override
-    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+    public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
       String s = (String) field.get(pojo);
-      return writeString(s, outboundIndex);
+      writeString(s, outboundIndex);
     }
   }
 
@@ -189,12 +189,11 @@ public class Writers {
     }
 
     @Override
-    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+    public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
       Integer i = (Integer) field.get(pojo);
       if (i != null) {
-        return vector.getMutator().setSafe(outboundIndex, i);
+        vector.getMutator().setSafe(outboundIndex, i);
       }
-      return true;
     }
 
   }
@@ -209,12 +208,11 @@ public class Writers {
     }
 
     @Override
-    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+    public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
       Long o = (Long) field.get(pojo);
       if (o != null) {
-        return vector.getMutator().setSafe(outboundIndex, o);
+        vector.getMutator().setSafe(outboundIndex, o);
       }
-      return true;
     }
 
   }
@@ -229,12 +227,11 @@ public class Writers {
     }
 
     @Override
-    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+    public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
       Boolean o = (Boolean) field.get(pojo);
       if (o != null) {
-        return vector.getMutator().setSafe(outboundIndex, o ? 1 : 0);
+        vector.getMutator().setSafe(outboundIndex, o ? 1 : 0);
       }
-      return true;
     }
 
   }
@@ -248,12 +245,11 @@ public class Writers {
     }
 
     @Override
-    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+    public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
       Double o = (Double) field.get(pojo);
       if (o != null) {
-        return vector.getMutator().setSafe(outboundIndex, o);
+        vector.getMutator().setSafe(outboundIndex, o);
       }
-      return true;
     }
 
   }
@@ -268,12 +264,11 @@ public class Writers {
     }
 
     @Override
-    public boolean writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
+    public void writeField(Object pojo, int outboundIndex) throws IllegalArgumentException, IllegalAccessException {
       Timestamp o = (Timestamp) field.get(pojo);
       if (o != null) {
-        return vector.getMutator().setSafe(outboundIndex, o.getTime());
+        vector.getMutator().setSafe(outboundIndex, o.getTime());
       }
-      return true;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 495e3e2..7c1f888 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -68,7 +68,6 @@ public class DrillTextRecordReader extends AbstractRecordReader {
   private LongWritable key;
   private Text value;
   private int numCols = 0;
-  private boolean redoRecord = false;
 
   public DrillTextRecordReader(FileSplit split, FragmentContext context, char delimiter, List<SchemaPath> columns) {
     this.fragmentContext = context;
@@ -143,10 +142,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
     int batchSize = 0;
     try {
       int recordCount = 0;
-      while (redoRecord || (batchSize < 200*1000 && reader.next(key, value))) {
-        redoRecord = false;
-
-        // start and end together are used to split fields
+      while (recordCount < Character.MAX_VALUE && batchSize < 200*1000 && reader.next(key, value)) {
         int start;
         int end = -1;
 
@@ -169,21 +165,11 @@ public class DrillTextRecordReader extends AbstractRecordReader {
             }
           }
           if (numCols > 0 && i++ < columnIds.get(p)) {
-            if (!vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, 0)) {
-              redoRecord = true;
-              vector.getMutator().setValueCount(recordCount);
-              logger.debug("text scan batch size {}", batchSize);
-              return recordCount;
-            }
+            vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, 0);
             continue;
           }
           p++;
-          if (!vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, end - start - 1)) {
-            redoRecord = true;
-            vector.getMutator().setValueCount(recordCount);
-            logger.debug("text scan batch size {}", batchSize);
-            return recordCount;
-          }
+          vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, end - start - 1);
           batchSize += end - start;
         }
         recordCount++;

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
index 51726a3..7c77ca2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
@@ -42,4 +42,19 @@ public class AllocationHelper {
     allocatePrecomputedChildCount(v, valueCount, bytesPerValue, repeatedPerTop * valueCount);
   }
 
+  /**
+   * Allocates the exact amount if v is fixed width, otherwise falls back to dynamic allocation
+   * @param v
+   * @param valueCount
+   * @return
+   */
+  public static boolean allocateNew(ValueVector v, int valueCount){
+    if (v instanceof  FixedWidthVector) {
+      ((FixedWidthVector) v).allocateNew(valueCount);
+      return true;
+    } else {
+      return v.allocateNewSafe();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index d2523c5..c984666 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -108,6 +108,19 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   }
 
   /**
+   * Allocate new buffer with double capacity, and copy data into the new buffer. Replace vector's buffer with new buffer, and release old one
+   */
+  public void reAlloc() {
+    allocationValueCount *= 2;
+    DrillBuf newBuf = allocator.buffer(getSizeFromCount(allocationValueCount));
+    newBuf.setZero(0, newBuf.capacity());
+    newBuf.setBytes(0, data, 0, data.capacity());
+    data.release();
+    data = newBuf;
+    valueCapacity = allocationValueCount;
+  }
+
+  /**
    * {@inheritDoc}
    */
   @Override
@@ -233,8 +246,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     }
 
     @Override
-    public boolean copyValueSafe(int fromIndex, int toIndex) {
-      return to.copyFromSafe(fromIndex, toIndex, BitVector.this);
+    public void copyValueSafe(int fromIndex, int toIndex) {
+      to.copyFromSafe(fromIndex, toIndex, BitVector.this);
     }
   }
 
@@ -335,42 +348,40 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
       set(index, holder.value);
     }
 
-    public boolean setSafe(int index, int value) {
+    public void setSafe(int index, int value) {
       if(index >= getValueCapacity()) {
-        decrementAllocationMonitor();
-        return false;
+        reAlloc();
       }
       set(index, value);
-      return true;
     }
 
-    public boolean setSafe(int index, BitHolder holder) {
+    public void setSafe(int index, BitHolder holder) {
       if(index >= getValueCapacity()) {
-        decrementAllocationMonitor();
-        return false;
+        reAlloc();
       }
       set(index, holder.value);
-      return true;
     }
 
-    public boolean setSafe(int index, NullableBitHolder holder) {
+    public void setSafe(int index, NullableBitHolder holder) {
       if(index >= getValueCapacity()) {
-        decrementAllocationMonitor();
-        return false;
+        reAlloc();
       }
       set(index, holder.value);
-      return true;
     }
 
     public final void setValueCount(int valueCount) {
       int currentValueCapacity = getValueCapacity();
       BitVector.this.valueCount = valueCount;
       int idx = getSizeFromCount(valueCount);
+      while(valueCount > getValueCapacity()) {
+        reAlloc();
+      }
       if (valueCount > 0 && currentValueCapacity > valueCount * 2) {
         incrementAllocationMonitor();
       } else if (allocationMonitor > 0) {
         allocationMonitor = 0;
       }
+      data.readerIndex(data.writerIndex());
       VectorTrimmer.trim(data, idx);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java
index 1ed7f37..4b9d357 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.vector;
 
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
@@ -34,32 +35,35 @@ public class CopyUtil {
     JExpression inIndex = JExpr.direct("inIndex");
     JExpression outIndex = JExpr.direct("outIndex");
     for(VectorWrapper<?> vv : batch) {
+      String copyMethod;
+      if (!Types.isFixedWidthType(vv.getField().getType()) || Types.isRepeated(vv.getField().getType()) || Types.isComplex(vv.getField().getType())) {
+        copyMethod = "copyFromSafe";
+      } else {
+        copyMethod = "copyFrom";
+      }
       g.rotateBlock();
       JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), vv.isHyper(), fieldId));
       JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, fieldId));
 
       if(hyper){
 
-        g.getEvalBlock()._if(
+        g.getEvalBlock().add(
                 outVV
-                        .invoke("copyFromSafe")
+                        .invoke(copyMethod)
                         .arg(
                                 inIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
                         .arg(outIndex)
                         .arg(
                                 inVV.component(inIndex.shrz(JExpr.lit(16)))
                         )
-                        .not()
-        )
-                ._then()._return(JExpr.FALSE);
+        );
       }else{
-        g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(inIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
+        g.getEvalBlock().add(outVV.invoke(copyMethod).arg(inIndex).arg(outIndex).arg(inVV));
       }
 
       g.rotateBlock();
       fieldId++;
     }
-    g.getEvalBlock()._return(JExpr.TRUE);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
index aadc563..eaae7ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
@@ -47,7 +47,7 @@ public interface RepeatedFixedWidthVector extends ValueVector, RepeatedVector {
   }
   public interface RepeatedMutator extends Mutator {
     public void setValueCounts(int parentValueCount, int childValueCount);
-    public boolean setRepetitionAtIndexSafe(int index, int repetitionCount);
+    public void setRepetitionAtIndexSafe(int index, int repetitionCount);
     public BaseDataValueVector getDataVector();
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
index ad2ba1b..8e097e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
@@ -18,5 +18,5 @@
 package org.apache.drill.exec.vector;
 
 public interface RepeatedMutator extends ValueVector.Mutator {
-  public boolean startNewGroup(int index);
+  public void startNewGroup(int index);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
index ff3ee63..10ddcc9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
@@ -55,6 +55,6 @@ public interface VariableWidthVector extends ValueVector{
   public int getCurrentSizeInBytes();
 
   public interface VariableWidthMutator extends Mutator {
-    public boolean setValueLengthSafe(int index, int length);
+    public void setValueLengthSafe(int index, int length);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index 62a5140..d1801f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -69,18 +69,18 @@ public class MapVector extends AbstractContainerVector {
   transient private MapTransferPair ephPair;
   transient private MapSingleCopier ephPair2;
 
-  public boolean copyFromSafe(int fromIndex, int thisIndex, MapVector from) {
+  public void copyFromSafe(int fromIndex, int thisIndex, MapVector from) {
     if(ephPair == null || ephPair.from != from) {
       ephPair = (MapTransferPair) from.makeTransferPair(this);
     }
-    return ephPair.copyValueSafe(fromIndex, thisIndex);
+    ephPair.copyValueSafe(fromIndex, thisIndex);
   }
 
-  public boolean copyFromSafe(int fromSubIndex, int thisIndex, RepeatedMapVector from) {
+  public void copyFromSafe(int fromSubIndex, int thisIndex, RepeatedMapVector from) {
     if(ephPair2 == null || ephPair2.from != from) {
       ephPair2 = from.makeSingularCopier(this);
     }
-    return ephPair2.copySafe(fromSubIndex, thisIndex);
+    ephPair2.copySafe(fromSubIndex, thisIndex);
   }
 
   @Override
@@ -170,13 +170,10 @@ public class MapVector extends AbstractContainerVector {
     }
 
     @Override
-    public boolean copyValueSafe(int from, int to) {
+    public void copyValueSafe(int from, int to) {
       for (TransferPair p : pairs) {
-        if (!p.copyValueSafe(from, to)) {
-          return false;
-        }
+        p.copyValueSafe(from, to);
       }
-      return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index 58eb546..73daf93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -81,11 +81,11 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
 
   transient private RepeatedListTransferPair ephPair;
 
-  public boolean copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) {
+  public void copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) {
     if(ephPair == null || ephPair.from != from) {
       ephPair = (RepeatedListTransferPair) from.makeTransferPair(this);
     }
-    return ephPair.copyValueSafe(fromIndex, thisIndex);
+    ephPair.copyValueSafe(fromIndex, thisIndex);
   }
 
   public Mutator getMutator() {
@@ -106,22 +106,22 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
     }
   }
 
+  public void reAlloc() {
+    offsets.reAlloc();
+  }
+
   public class Mutator implements ValueVector.Mutator, RepeatedMutator{
 
     public void startNewGroup(int index) {
-      offsets.getMutator().set(index+1, offsets.getAccessor().get(index));
+      offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
     }
 
     public int add(int index) {
       int endOffset = index+1;
       int currentChildOffset = offsets.getAccessor().get(endOffset);
       int newChildOffset = currentChildOffset + 1;
-      boolean success = offsets.getMutator().setSafe(endOffset, newChildOffset);
+      offsets.getMutator().setSafe(endOffset, newChildOffset);
       lastSet = index;
-      if (!success) {
-        return -1;
-      }
-
       // this is done at beginning so return the currentChildOffset, not the new offset.
       return currentChildOffset;
 
@@ -154,8 +154,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
     }
 
     @Override
-    public boolean setRepetitionAtIndexSafe(int index, int repetitionCount) {
-      return false;  //To change body of implemented methods use File | Settings | File Templates.
+    public void setRepetitionAtIndexSafe(int index, int repetitionCount) {
     }
 
     @Override
@@ -304,21 +303,16 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
     }
 
     @Override
-    public boolean copyValueSafe(int from, int to) {
+    public void copyValueSafe(int from, int to) {
       RepeatedListHolder holder = new RepeatedListHolder();
       accessor.get(from, holder);
       int newIndex = this.to.offsets.getAccessor().get(to);
       //todo: make this a bulk copy.
       for (int i = holder.start; i < holder.end; i++, newIndex++) {
-        if (!vectorTransfer.copyValueSafe(i, newIndex)) {
-          return false;
-        }
-      }
-      if (!this.to.offsets.getMutator().setSafe(to + 1, newIndex)) {
-        return false;
+        vectorTransfer.copyValueSafe(i, newIndex);
       }
+      this.to.offsets.getMutator().setSafe(to + 1, newIndex);
       this.to.lastSet++;
-      return true;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index 43a3881..86a3f64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -57,7 +57,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
 
   public final static MajorType TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REPEATED).build();
 
-  private final UInt4Vector offsets;   // offsets to start of each record (considering record indices are 0-indexed)
+  final UInt4Vector offsets;   // offsets to start of each record (considering record indices are 0-indexed)
   private final RepeatedMapReaderImpl reader = new RepeatedMapReaderImpl(RepeatedMapVector.this);
   private final RepeatedMapAccessor accessor = new RepeatedMapAccessor();
   private final Mutator mutator = new Mutator();
@@ -80,6 +80,10 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
     accessor.reset();
   }
 
+  public void reAlloc() {
+    offsets.reAlloc();
+  }
+
   public Iterator<String> fieldNameIterator() {
     return getChildFieldNames().iterator();
   }
@@ -147,13 +151,10 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
       }
     }
 
-    public boolean copySafe(int fromSubIndex, int toIndex) {
+    public void copySafe(int fromSubIndex, int toIndex) {
       for (TransferPair p : pairs) {
-        if (!p.copyValueSafe(fromSubIndex, toIndex)) {
-          return false;
-        }
+        p.copyValueSafe(fromSubIndex, toIndex);
       }
-      return true;
     }
   }
 
@@ -225,13 +226,10 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
     }
 
     @Override
-    public boolean copyValueSafe(int from, int to) {
+    public void copyValueSafe(int from, int to) {
       for (TransferPair p : pairs) {
-        if (!p.copyValueSafe(from, to)) {
-          return false;
-        }
+        p.copyValueSafe(from, to);
       }
-      return true;
     }
 
     @Override
@@ -295,26 +293,18 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
     }
 
     @Override
-    public boolean copyValueSafe(int srcIndex, int destIndex) {
+    public void copyValueSafe(int srcIndex, int destIndex) {
       RepeatedMapHolder holder = new RepeatedMapHolder();
       from.getAccessor().get(srcIndex, holder);
-      if(destIndex >= to.getValueCapacity()){
-        return false;
-      }
       to.populateEmpties(destIndex+1);
       int newIndex = to.offsets.getAccessor().get(destIndex);
       //todo: make these bulk copies
       for (int i = holder.start; i < holder.end; i++, newIndex++) {
         for (TransferPair p : pairs) {
-          if (!p.copyValueSafe(i, newIndex)) {
-            return false;
-          }
+          p.copyValueSafe(i, newIndex);
         }
       }
-      if (!to.offsets.getMutator().setSafe(destIndex+1, newIndex)) {
-        return false;
-      }
-      return true;
+      to.offsets.getMutator().setSafe(destIndex+1, newIndex);
     }
 
     @Override
@@ -349,11 +339,11 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
 
   transient private RepeatedMapTransferPair ephPair;
 
-  public boolean copyFromSafe(int fromIndex, int thisIndex, RepeatedMapVector from) {
+  public void copyFromSafe(int fromIndex, int thisIndex, RepeatedMapVector from) {
     if (ephPair == null || ephPair.from != from) {
       ephPair = (RepeatedMapTransferPair) from.makeTransferPair(this);
     }
-    return ephPair.copyValueSafe(fromIndex, thisIndex);
+    ephPair.copyValueSafe(fromIndex, thisIndex);
   }
 
   @Override
@@ -509,15 +499,12 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
 
     public void startNewGroup(int index) {
       populateEmpties(index+1);
-      offsets.getMutator().set(index+1, offsets.getAccessor().get(index));
+      offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
     }
 
     public int add(int index) {
       int prevEnd = offsets.getAccessor().get(index+1);
-      boolean success = offsets.getMutator().setSafe(index+1, prevEnd+1);
-      if (!success) {
-        return -1;
-      }
+      offsets.getMutator().setSafe(index+1, prevEnd+1);
       return prevEnd;
     }
 
@@ -547,8 +534,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
     }
 
     @Override
-    public boolean setRepetitionAtIndexSafe(int index, int repetitionCount) {
-      return false;  //To change body of implemented methods use File | Settings | File Templates.
+    public void setRepetitionAtIndexSafe(int index, int repetitionCount) {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java
index ae2f779..c51dfda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java
@@ -52,7 +52,7 @@ public class RepeatedListReaderImpl extends AbstractFieldReader{
       return;
     }
     RepeatedListWriter impl = (RepeatedListWriter) writer;
-    impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), container));
+    impl.container.copyFromSafe(idx(), impl.idx(), container);
   }
 
   @Override
@@ -61,7 +61,7 @@ public class RepeatedListReaderImpl extends AbstractFieldReader{
       return;
     }
     RepeatedListWriter impl = (RepeatedListWriter) writer.list(name);
-    impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), container));
+    impl.container.copyFromSafe(idx(), impl.idx(), container);
   }
 
   private int currentOffset;

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
index 3171d8a..9136277 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
@@ -153,7 +153,7 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{
       return;
     }
     RepeatedMapWriter impl = (RepeatedMapWriter) writer;
-    impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), vector));
+    impl.container.copyFromSafe(idx(), impl.idx(), vector);
   }
 
   public void copyAsValueSingle(MapWriter writer) {
@@ -161,7 +161,7 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{
       return;
     }
     SingleMapWriter impl = (SingleMapWriter) writer;
-    impl.inform(impl.container.copyFromSafe(currentOffset, impl.idx(), vector));
+    impl.container.copyFromSafe(currentOffset, impl.idx(), vector);
   }
 
   @Override
@@ -170,7 +170,7 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{
       return;
     }
     RepeatedMapWriter impl = (RepeatedMapWriter) writer.map(name);
-    impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), vector));
+    impl.container.copyFromSafe(idx(), impl.idx(), vector);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java
index 76f9e2f..5c8f688 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java
@@ -95,7 +95,7 @@ public class SingleMapReaderImpl extends AbstractFieldReader{
   public void copyAsValue(MapWriter writer){
     if (writer.ok()) {
       SingleMapWriter impl = (SingleMapWriter) writer;
-      impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), vector));
+      impl.container.copyFromSafe(idx(), impl.idx(), vector);
     }
   }
 
@@ -103,7 +103,7 @@ public class SingleMapReaderImpl extends AbstractFieldReader{
   public void copyAsField(String name, MapWriter writer){
     if (writer.ok()) {
       SingleMapWriter impl = (SingleMapWriter) writer.map(name);
-      impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), vector));
+      impl.container.copyFromSafe(idx(), impl.idx(), vector);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
index 7c04477..a9d2ef8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.NullableBigIntVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.List;
@@ -43,6 +44,7 @@ import static org.junit.Assert.assertTrue;
 public class TestWindowFrame extends PopUnitTestBase {
 
   @Test
+  @Ignore
   public void testWindowFrameWithOneKeyCount() throws Throwable {
     try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
          Drillbit bit = new Drillbit(CONFIG, serviceSet);
@@ -93,6 +95,7 @@ public class TestWindowFrame extends PopUnitTestBase {
   }
 
   @Test
+  @Ignore
   public void testWindowFrameWithOneKeyMultipleBatches() throws Throwable {
     try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
          Drillbit bit = new Drillbit(CONFIG, serviceSet);
@@ -159,6 +162,7 @@ public class TestWindowFrame extends PopUnitTestBase {
   }
 
   @Test
+  @Ignore
   public void testWindowFrameWithTwoKeys() throws Throwable {
     try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
          Drillbit bit = new Drillbit(CONFIG, serviceSet);

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java
deleted file mode 100644
index ebc4df7..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.vector;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.Random;
-
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.junit.Test;
-
-public class TestAdaptiveAllocation {
-
-  @Test
-  public void test() throws Exception {
-    BufferAllocator allocator = new TopLevelAllocator();
-    MaterializedField field = MaterializedField.create("field", Types.required(MinorType.VARCHAR));
-    NullableVarBinaryVector vector1 = new NullableVarBinaryVector(field, allocator);
-    NullableVarCharVector vector2 = new NullableVarCharVector(field, allocator);
-    NullableBigIntVector vector3 = new NullableBigIntVector(field, allocator);
-
-    Random rand = new Random();
-//    int valuesToWrite = rand.nextInt(4000) + 1000;
-//    int bytesToWrite = rand.nextInt(100);
-    int valuesToWrite = 8000;
-    int bytesToWrite1 = 2;
-    int bytesToWrite2 = 200;
-//    System.out.println("value: " + valuesToWrite);
-//    System.out.println("bytes: " + bytesToWrite);
-
-    byte[] value1 = new byte[bytesToWrite1];
-    byte[] value2 = new byte[bytesToWrite2];
-
-    NullableVarBinaryVector copyVector1 = new NullableVarBinaryVector(field, allocator);
-    NullableVarCharVector copyVector2 = new NullableVarCharVector(field, allocator);
-    NullableBigIntVector copyVector3 = new NullableBigIntVector(field, allocator);
-
-    copyVector1.allocateNew();
-    copyVector2.allocateNew();
-    copyVector3.allocateNew();
-
-    copyVector1.getMutator().set(0, value1);
-    copyVector2.getMutator().set(0, value2);
-    copyVector3.getMutator().set(0, 100);
-
-    for (int i = 0; i < 10000; i++) {
-      vector1.allocateNew();
-      vector2.allocateNew();
-      vector3.allocateNew();
-//      System.out.println("Value Capacity: " + varBinaryVector.getValueCapacity());
-//      System.out.println("Byte Capacity: " + varBinaryVector.getByteCapacity());
-      int offset = 0;
-      int j = 0;
-      int toWrite = (int) valuesToWrite * (int) (2 + rand.nextGaussian()) / 2;
-      for (j = 0; j < toWrite; j += 1) {
-//        if (!(vector1.getMutator().setSafe(j - offset, value1, 0, value1.length) &&
-//        vector2.getMutator().setSafe(j - offset, value2, 0 , value2.length) &&
-//        vector3.getMutator().setSafe(j - offset, 100))) {
-        if (!(vector1.copyFromSafe(0, j - offset, copyVector1) &&
-          vector2.copyFromSafe(0, j - offset, copyVector2) &&
-          vector3.copyFromSafe(0, j - offset, copyVector3))) {
-          vector1.getMutator().setValueCount(j - offset);
-          vector2.getMutator().setValueCount(j - offset);
-          vector3.getMutator().setValueCount(j - offset);
-          offset = j;
-          vector1.clear();
-          vector2.clear();
-          vector3.clear();
-          vector1.allocateNew();
-          vector2.allocateNew();
-          vector3.allocateNew();
-//          System.out.println("Value Capacity: " + varBinaryVector.getValueCapacity());
-//          System.out.println("Byte Capacity: " + varBinaryVector.getByteCapacity());
-        }
-      }
-      vector1.getMutator().setValueCount(j - offset);
-      vector2.getMutator().setValueCount(j - offset);
-      vector3.getMutator().setValueCount(j - offset);
-    }
-    vector1.allocateNew();
-    vector2.allocateNew();
-    vector3.allocateNew();
-    assertTrue(vector1.getValueCapacity() > 8000);
-    assertTrue(vector2.getValueCapacity() > 8000);
-    assertTrue(vector3.getValueCapacity() > 8000);
-    assertTrue(vector1.getByteCapacity() > 8000 * 2);
-    assertTrue(vector2.getByteCapacity() > 8000 * 200);
-  }
-}


[2/3] drill git commit: DRILL-1960: Automatic reallocation

Posted by sm...@apache.org.
http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 2fd5ce1..4b8e357 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -17,12 +17,20 @@
  */
 package org.apache.drill.exec.physical.impl.aggregate;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.inject.Named;
+
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -46,19 +54,19 @@ import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ObjectVector;
+import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
-import javax.inject.Named;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import com.google.common.collect.Lists;
 
 public abstract class HashAggTemplate implements HashAggregator {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
 
+  private static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
+  private static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
+
   private static final boolean EXTRA_DEBUG_1 = false;
   private static final boolean EXTRA_DEBUG_2 = false;
   private static final String TOO_BIG_ERROR =
@@ -112,6 +120,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
   }
 
+
   public class BatchHolder {
 
     private VectorContainer aggrValuesContainer; // container for aggr values (workspace variables)
@@ -144,7 +153,7 @@ public abstract class HashAggTemplate implements HashAggregator {
           ((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE * HashTable.BATCH_SIZE,
               HashTable.BATCH_SIZE);
         } else if (vector instanceof ObjectVector) {
-          ((ObjectVector)vector).allocateNew(HashTable.BATCH_SIZE);
+          ((ObjectVector) vector).allocateNew(HashTable.BATCH_SIZE);
         } else {
           vector.allocateNew();
         }
@@ -165,33 +174,17 @@ public abstract class HashAggTemplate implements HashAggregator {
       setupInterior(incoming, outgoing, aggrValuesContainer);
     }
 
-    private boolean outputValues(IndexPointer outStartIdxHolder, IndexPointer outNumRecordsHolder) {
+    private void outputValues(IndexPointer outStartIdxHolder, IndexPointer outNumRecordsHolder) {
       outStartIdxHolder.value = batchOutputCount;
       outNumRecordsHolder.value = 0;
-      boolean status = true;
-
-      // Output records starting from 'batchOutputCount' in current batch until there are no more records
-      // or output vectors have no space left. In destination vectors, start filling records from 0th position.
-      while(batchOutputCount <= maxOccupiedIdx) {
-        if (outputRecordValues(batchOutputCount, outNumRecordsHolder.value)) {
-          if (EXTRA_DEBUG_2) {
-            logger.debug("Outputting values from input index {} to output index: {}",
-                batchOutputCount, outNumRecordsHolder.value);
-          }
-          batchOutputCount++;
-          outNumRecordsHolder.value++;
-        } else {
-          status = false;
-          break;
+      for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
+        outputRecordValues(i, batchOutputCount);
+        if (EXTRA_DEBUG_2) {
+          logger.debug("Outputting values to output index: {}", batchOutputCount);
         }
+        batchOutputCount++;
+        outNumRecordsHolder.value++;
       }
-      // It's not a failure if only some records were output (at least 1) .. since out-of-memory
-      // conditions may prevent all records from being output; the caller has the responsibility to
-      // allocate more memory and continue outputting more records
-      if (!status && outNumRecordsHolder.value > 0) {
-        status = true;
-      }
-      return status;
     }
 
     private void clear() {
@@ -218,8 +211,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
 
     @RuntimeOverridden
-    public boolean outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
-      return true;
+    public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
     }
   }
 
@@ -303,8 +295,7 @@ public abstract class HashAggTemplate implements HashAggregator {
           if (EXTRA_DEBUG_2) {
             logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
           }
-          boolean success = checkGroupAndAggrValues(currentIndex);
-          assert success : "HashAgg couldn't copy values.";
+          checkGroupAndAggrValues(currentIndex);
         }
 
         if (EXTRA_DEBUG_1) {
@@ -323,57 +314,56 @@ public abstract class HashAggTemplate implements HashAggregator {
               logger.debug("Received IterOutcome of {}", out);
             }
             switch (out) {
-            case NOT_YET:
-              this.outcome = out;
-              return AggOutcome.RETURN_OUTCOME;
-
-            case OK_NEW_SCHEMA:
-              if (EXTRA_DEBUG_1) {
-                logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
-              }
-              newSchema = true;
-              this.cleanup();
-              // TODO: new schema case needs to be handled appropriately
-              return AggOutcome.UPDATE_AGGREGATOR;
-
-            case OK:
-              resetIndex();
-              if (incoming.getRecordCount() == 0) {
-                continue;
-              } else {
-                boolean success = checkGroupAndAggrValues(currentIndex);
-                assert success : "HashAgg couldn't copy values.";
-                incIndex();
+              case NOT_YET:
+                this.outcome = out;
+                return AggOutcome.RETURN_OUTCOME;
 
+              case OK_NEW_SCHEMA:
                 if (EXTRA_DEBUG_1) {
-                  logger.debug("Continuing outside loop");
+                  logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
+                }
+                newSchema = true;
+                this.cleanup();
+                // TODO: new schema case needs to be handled appropriately
+                return AggOutcome.UPDATE_AGGREGATOR;
+
+              case OK:
+                resetIndex();
+                if (incoming.getRecordCount() == 0) {
+                  continue;
+                } else {
+                  checkGroupAndAggrValues(currentIndex);
+                  incIndex();
+
+                  if (EXTRA_DEBUG_1) {
+                    logger.debug("Continuing outside loop");
+                  }
+                  continue outside;
                 }
-                continue outside;
-              }
 
-            case NONE:
-              // outcome = out;
+              case NONE:
+                // outcome = out;
 
-              buildComplete = true;
+                buildComplete = true;
 
-              updateStats(htable);
+                updateStats(htable);
 
-              // output the first batch; remaining batches will be output
-              // in response to each next() call by a downstream operator
+                // output the first batch; remaining batches will be output
+                // in response to each next() call by a downstream operator
 
-              outputCurrentBatch();
+                outputCurrentBatch();
 
-              // cleanup incoming batch since output of aggregation does not need
-              // any references to the incoming
+                // cleanup incoming batch since output of aggregation does not need
+                // any references to the incoming
 
-              incoming.cleanup();
-              // return setOkAndReturn();
-              return AggOutcome.RETURN_OUTCOME;
+                incoming.cleanup();
+                // return setOkAndReturn();
+                return AggOutcome.RETURN_OUTCOME;
 
-            case STOP:
-            default:
-              outcome = out;
-              return AggOutcome.CLEANUP_AND_RETURN;
+              case STOP:
+              default:
+                outcome = out;
+                return AggOutcome.CLEANUP_AND_RETURN;
             }
           }
 
@@ -388,7 +378,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
   }
 
-  private void allocateOutgoing() {
+  private void allocateOutgoing(int records) {
     // Skip the keys and only allocate for outputting the workspace values
     // (keys will be output through splitAndTransfer)
     Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
@@ -397,7 +387,12 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
     while (outgoingIter.hasNext()) {
       ValueVector vv = outgoingIter.next().getValueVector();
-      vv.allocateNew();
+      MajorType type = vv.getField().getType();
+      if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
+        vv.allocateNew();
+      } else {
+        AllocationHelper.allocate(vv, records, 1);
+      }
     }
   }
 
@@ -483,72 +478,40 @@ public abstract class HashAggTemplate implements HashAggregator {
       return outcome;
     }
 
-    allocateOutgoing();
+    allocateOutgoing(numPendingOutput);
 
-    boolean outputKeysStatus = true;
-    boolean outputValuesStatus = true;
-
-    outputValuesStatus = batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
+    batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
     int numOutputRecords = outNumRecordsHolder.value;
 
     if (EXTRA_DEBUG_1) {
-      logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value,
-          outNumRecordsHolder.value);
-    }
-    if (outputValuesStatus) {
-      outputKeysStatus =
-          this.htable.outputKeys(outBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
+      logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value, outNumRecordsHolder.value);
     }
+    this.htable.outputKeys(outBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
 
-    if (outputKeysStatus && outputValuesStatus) {
-
-      // set the value count for outgoing batch value vectors
-      for (VectorWrapper<?> v : outgoing) {
-        v.getValueVector().getMutator().setValueCount(numOutputRecords);
-      }
+    // set the value count for outgoing batch value vectors
+    for (VectorWrapper<?> v : outgoing) {
+      v.getValueVector().getMutator().setValueCount(numOutputRecords);
+    }
 
-      outputCount += numOutputRecords;
+    outputCount += numOutputRecords;
 
-      if (first) {
-        this.outcome = IterOutcome.OK_NEW_SCHEMA;
-      } else {
-        this.outcome = IterOutcome.OK;
-      }
+    if (first) {
+      this.outcome = IterOutcome.OK_NEW_SCHEMA;
+    } else {
+      this.outcome = IterOutcome.OK;
+    }
 
-      logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, numOutputRecords);
+    logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, numOutputRecords);
 
-      lastBatchOutputCount = numOutputRecords;
-      // If there are no more records to output, go to the next batch. If there are any records left refer to the
-      // same BatchHolder. Remaining records will be outputted in next outputCurrentBatch() call(s).
-      if (batchHolders.get(outBatchIndex).getNumPendingOutput() == 0) {
-        outBatchIndex++;
-      }
-      if (outBatchIndex == batchHolders.size()) {
-        allFlushed = true;
+    lastBatchOutputCount = numOutputRecords;
+    outBatchIndex++;
+    if (outBatchIndex == batchHolders.size()) {
+      allFlushed = true;
 
-        logger.debug("HashAggregate: All batches flushed.");
+      logger.debug("HashAggregate: All batches flushed.");
 
-        // cleanup my internal state since there is nothing more to return
-        this.cleanup();
-      }
-    } else {
-      if (!outputKeysStatus) {
-        logger.debug("Failed to output keys for current batch index: {} ", outBatchIndex);
-        for (VectorWrapper<?> v : outContainer) {
-          logger.debug("At the time of failure, size of valuevector in outContainer = {}.",
-              v.getValueVector().getValueCapacity());
-        }
-        context.fail(new Exception("Failed to output keys for current batch !"));
-      }
-      if (!outputValuesStatus) {
-        logger.debug("Failed to output values for current batch index: {} ", outBatchIndex);
-        for (VectorWrapper<?> v : outContainer) {
-          logger.debug("At the time of failure, size of valuevector in outContainer = {}.",
-              v.getValueVector().getValueCapacity());
-        }
-        context.fail(new Exception("Failed to output values for current batch !"));
-      }
-      this.outcome = IterOutcome.STOP;
+      // cleanup my internal state since there is nothing more to return
+      this.cleanup();
     }
 
     return this.outcome;
@@ -569,72 +532,45 @@ public abstract class HashAggTemplate implements HashAggregator {
   // Check if a group is present in the hash table; if not, insert it in the hash table.
   // The htIdxHolder contains the index of the group in the hash table container; this same
   // index is also used for the aggregation values maintained by the hash aggregate.
-  private boolean checkGroupAndAggrValues(int incomingRowIdx) {
+  private void checkGroupAndAggrValues(int incomingRowIdx) {
     if (incomingRowIdx < 0) {
       throw new IllegalArgumentException("Invalid incoming row index.");
     }
 
     /** for debugging
-    Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector();
-    BigIntVector vv0 = null;
-    BigIntHolder holder = null;
+     Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector();
+     BigIntVector vv0 = null;
+     BigIntHolder holder = null;
 
-    if (tmp != null) {
-      vv0 = ((BigIntVector) tmp);
-      holder = new BigIntHolder();
-      holder.value = vv0.getAccessor().get(incomingRowIdx) ;
-    }
-    */
-
-    HashTable.PutStatus putStatus = htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */);
-
-    if (putStatus != HashTable.PutStatus.PUT_FAILED) {
-      int currentIdx = htIdxHolder.value;
-
-      // get the batch index and index within the batch
-      if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) {
-        addBatchHolder();
-      }
-      BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK);
-      int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
-
-      // Check if we have almost filled up the workspace vectors and add a batch if necessary
-      if ((idxWithinBatch == (bh.capacity - 1)) && (bh.allocatedNextBatch == false)) {
-        htable.addNewKeyBatch();
-        addBatchHolder();
-        bh.allocatedNextBatch = true;
-      }
+     if (tmp != null) {
+     vv0 = ((BigIntVector) tmp);
+     holder = new BigIntHolder();
+     holder.value = vv0.getAccessor().get(incomingRowIdx) ;
+     }
+     */
 
+    htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */);
 
-      if (putStatus == HashTable.PutStatus.KEY_PRESENT) {
-        if (EXTRA_DEBUG_2) {
-          logger.debug("Group-by key already present in hash table, updating the aggregate values");
-        }
+    int currentIdx = htIdxHolder.value;
 
-        // debugging
-        //if (holder.value == 100018 || holder.value == 100021) {
-        //  logger.debug("group-by key = {} already present at hash table index = {}", holder.value, currentIdx) ;
-        //}
+    // get the batch index and index within the batch
+    if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) {
+      addBatchHolder();
+    }
+    BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK);
+    int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
 
-      } else if (putStatus == HashTable.PutStatus.KEY_ADDED) {
-        if (EXTRA_DEBUG_2) {
-          logger.debug("Group-by key was added to hash table, inserting new aggregate values");
-        }
+    // Check if we have almost filled up the workspace vectors and add a batch if necessary
+    if ((idxWithinBatch == (bh.capacity - 1)) && (bh.allocatedNextBatch == false)) {
+      htable.addNewKeyBatch();
+      addBatchHolder();
+      bh.allocatedNextBatch = true;
+    }
 
-        // debugging
-        // if (holder.value == 100018 || holder.value == 100021) {
-        //  logger.debug("group-by key = {} added at hash table index = {}", holder.value, currentIdx) ;
-        //}
-      }
 
-      if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) {
-        numGroupedRecords++;
-        return true;
-      }
+    if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) {
+      numGroupedRecords++;
     }
-
-    logger.debug("HashAggr Put failed ! incomingRowIdx = {}, hash table size = {}.", incomingRowIdx, htable.size());
-    return false;
   }
 
   private void updateStats(HashTable htable) {

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 0f7f394..7cc43ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -17,6 +17,9 @@
  */
 package org.apache.drill.exec.physical.impl.aggregate;
 
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -31,9 +34,6 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 
-import java.io.IOException;
-import java.util.List;
-
 public interface HashAggregator {
 
   public static TemplateClassDefinition<HashAggregator> TEMPLATE_DEFINITION =

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index ef85a36..860627d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -284,9 +284,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     cg.setMappingSet(EVAL);
     for (LogicalExpression ex : valueExprs) {
       HoldingContainer hc = cg.addExpr(ex);
-      cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
     }
-    cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
   }
 
   private final MappingSet RECORD_KEYS = new MappingSet(GeneratorMapping.create("setupInterior", "outputRecordKeys", null, null));
@@ -295,9 +293,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     cg.setMappingSet(RECORD_KEYS);
     for (int i =0; i < keyExprs.length; i++) {
       HoldingContainer hc = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], keyExprs[i], true));
-      cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
     }
-    cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
   }
 
   private final GeneratorMapping PREVIOUS_KEYS_OUT = GeneratorMapping.create("setupInterior", "outputRecordKeysPrev", null, null);
@@ -317,10 +313,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       HoldingContainer innerExpression = cg.addExpr(keyExprs[i], false);
       cg.setMappingSet(RECORD_KEYS_PREV_OUT);
       HoldingContainer outerExpression = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], new HoldingContainerExpression(innerExpression), true), false);
-      cg.getBlock(BlockType.EVAL)._if(outerExpression.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
 
     }
-    cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
   }
 
   private void getIndex(ClassGenerator<StreamingAggregator> g) {

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 556b260..14e6aff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -120,9 +120,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
 
       // pick up a remainder batch if we have one.
       if (remainderBatch != null) {
-        if (!outputToBatch( previousIndex )) {
-          return tooBigFailure();
-        }
+        outputToBatch( previousIndex );
         remainderBatch.clear();
         remainderBatch = null;
         return setOkAndReturn();
@@ -136,9 +134,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
         if (EXTRA_DEBUG) {
           logger.debug("Attempting to output remainder.");
         }
-        if (!outputToBatch( previousIndex)) {
-          return tooBigFailure();
-        }
+        outputToBatch( previousIndex);
       }
 
       if (newSchema) {
@@ -171,26 +167,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
             if (EXTRA_DEBUG) {
               logger.debug("Values were different, outputting previous batch.");
             }
-            if (outputToBatch(previousIndex)) {
-              if (EXTRA_DEBUG) {
-                logger.debug("Output successful.");
-              }
-              addRecordInc(currentIndex);
-            } else {
-              if (EXTRA_DEBUG) {
-                logger.debug("Output failed.");
-              }
-              if (outputCount == 0) {
-                return tooBigFailure();
-              }
-
-              // mark the pending output but move forward for the next cycle.
-              pendingOutput = true;
-              previousIndex = currentIndex;
-              incIndex();
-              return setOkAndReturn();
-
+            outputToBatch(previousIndex);
+            if (EXTRA_DEBUG) {
+              logger.debug("Output successful.");
             }
+            addRecordInc(currentIndex);
           }
           previousIndex = currentIndex;
         }
@@ -215,9 +196,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
               if (first && addedRecordCount == 0) {
                 return setOkAndReturn();
               } else if(addedRecordCount > 0) {
-                if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
-                  remainderBatch = previous;
-                }
+                outputToBatchPrev( previous, previousIndex, outputCount);
                 if (EXTRA_DEBUG) {
                   logger.debug("Received no more batches, returning.");
                 }
@@ -239,9 +218,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                 logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
               }
               if (addedRecordCount > 0) {
-                if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
-                  remainderBatch = previous;
-                }
+                outputToBatchPrev( previous, previousIndex, outputCount);
                 if (EXTRA_DEBUG) {
                   logger.debug("Wrote out end of previous batch, returning.");
                 }
@@ -272,10 +249,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                   }
                   previousIndex = currentIndex;
                   if (addedRecordCount > 0) {
-                    if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
-                      remainderBatch = previous;
-                      return setOkAndReturn();
-                    }
+                    outputToBatchPrev( previous, previousIndex, outputCount);
                     continue outside;
                   }
                 }
@@ -329,21 +303,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     return AggOutcome.RETURN_OUTCOME;
   }
 
-  private final boolean outputToBatch(int inIndex) {
+  private final void outputToBatch(int inIndex) {
 
-    if (!outputRecordKeys(inIndex, outputCount)) {
-      if(EXTRA_DEBUG) {
-        logger.debug("Failure while outputting keys {}", outputCount);
-      }
-      return false;
-    }
+    outputRecordKeys(inIndex, outputCount);
 
-    if (!outputRecordValues(outputCount)) {
-      if (EXTRA_DEBUG) {
-        logger.debug("Failure while outputting values {}", outputCount);
-      }
-      return false;
-    }
+    outputRecordValues(outputCount);
 
     if (EXTRA_DEBUG) {
       logger.debug("{} values output successfully", outputCount);
@@ -351,20 +315,15 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     resetValues();
     outputCount++;
     addedRecordCount = 0;
-    return true;
   }
 
-  private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) {
-    boolean success = outputRecordKeysPrev(b1, inIndex, outIndex) //
-        && outputRecordValues(outIndex) //
-        && resetValues();
-    if (success) {
-      resetValues();
-      outputCount++;
-      addedRecordCount = 0;
-    }
-
-    return success;
+  private final void outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) {
+    outputRecordKeysPrev(b1, inIndex, outIndex);
+    outputRecordValues(outIndex);
+    resetValues();
+    resetValues();
+    outputCount++;
+    addedRecordCount = 0;
   }
 
   private void addRecordInc(int index) {
@@ -383,9 +342,9 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
   public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
   public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
   public abstract void addRecord(@Named("index") int index);
-  public abstract boolean outputRecordKeys(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-  public abstract boolean outputRecordKeysPrev(@Named("previous") InternalBatch previous, @Named("previousIndex") int previousIndex, @Named("outIndex") int outIndex);
-  public abstract boolean outputRecordValues(@Named("outIndex") int outIndex);
+  public abstract void outputRecordKeys(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract void outputRecordKeysPrev(@Named("previous") InternalBatch previous, @Named("previousIndex") int previousIndex, @Named("outIndex") int outIndex);
+  public abstract void outputRecordValues(@Named("outIndex") int outIndex);
   public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
   public abstract boolean resetValues();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index b5cfdca..fd6a3e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -17,13 +17,19 @@
  */
 package org.apache.drill.exec.physical.impl.common;
 
-import com.sun.codemodel.JConditional;
-import com.sun.codemodel.JExpr;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
@@ -45,11 +51,11 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.resolver.TypeCastRules;
+import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
+import com.sun.codemodel.JConditional;
+import com.sun.codemodel.JExpr;
 
 
 public class ChainedHashTable {
@@ -64,8 +70,8 @@ public class ChainedHashTable {
           null /* reset */, null /* cleanup */);
 
   private static final GeneratorMapping GET_HASH_BUILD =
-      GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */, null /* reset */,
-          null /* cleanup */);
+      GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */,
+          null /* reset */, null /* cleanup */);
 
   private static final GeneratorMapping GET_HASH_PROBE =
       GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */, null /* reset */,
@@ -119,7 +125,7 @@ public class ChainedHashTable {
   private final boolean areNullsEqual;
 
   public ChainedHashTable(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
-      RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, boolean areNullsEqual) {
+                          RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, boolean areNullsEqual) {
 
     this.htConfig = htConfig;
     this.context = context;
@@ -150,8 +156,7 @@ public class ChainedHashTable {
 
     int i = 0;
     for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
-      final LogicalExpression expr =
-          ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingBuild, collector, context.getFunctionRegistry());
+      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingBuild, collector, context.getFunctionRegistry());
       if (collector.hasErrors()) {
         throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
       }
@@ -171,9 +176,7 @@ public class ChainedHashTable {
     if (isProbe) {
       i = 0;
       for (NamedExpression ne : htConfig.getKeyExprsProbe()) {
-        final LogicalExpression expr =
-            ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector,
-                context.getFunctionRegistry());
+        final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector, context.getFunctionRegistry());
         if (collector.hasErrors()) {
           throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
         }
@@ -218,9 +221,9 @@ public class ChainedHashTable {
   }
 
 
-  private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping,
-      MappingSet htableMapping, LogicalExpression[] keyExprs, TypedFieldId[] htKeyFieldIds) throws
-      SchemaChangeException {
+  private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping,
+                                       LogicalExpression[] keyExprs, TypedFieldId[] htKeyFieldIds)
+      throws SchemaChangeException {
     cg.setMappingSet(incomingMapping);
 
     if (keyExprs == null || keyExprs.length == 0) {
@@ -261,37 +264,31 @@ public class ChainedHashTable {
   }
 
   private void setupSetValue(ClassGenerator<HashTable> cg, LogicalExpression[] keyExprs,
-      TypedFieldId[] htKeyFieldIds) throws SchemaChangeException {
+                             TypedFieldId[] htKeyFieldIds) throws SchemaChangeException {
 
     cg.setMappingSet(SetValueMapping);
 
     int i = 0;
     for (LogicalExpression expr : keyExprs) {
-      ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(htKeyFieldIds[i++], expr, true);
+      boolean useSetSafe = !Types.isFixedWidthType(expr.getMajorType()) || Types.isRepeated(expr.getMajorType());
+      ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(htKeyFieldIds[i++], expr, useSetSafe);
 
-      HoldingContainer hc = cg.addExpr(vvwExpr, false); // this will write to the htContainer at htRowIdx
-      cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+      cg.addExpr(vvwExpr, false); // this will write to the htContainer at htRowIdx
     }
-
-    cg.getEvalBlock()._return(JExpr.TRUE);
   }
 
-  private void setupOutputRecordKeys(ClassGenerator<HashTable> cg, TypedFieldId[] htKeyFieldIds,
-      TypedFieldId[] outKeyFieldIds) {
+  private void setupOutputRecordKeys(ClassGenerator<HashTable> cg, TypedFieldId[] htKeyFieldIds, TypedFieldId[] outKeyFieldIds) {
 
     cg.setMappingSet(OutputRecordKeysMapping);
 
     if (outKeyFieldIds != null) {
       for (int i = 0; i < outKeyFieldIds.length; i++) {
         ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i]);
-        ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(outKeyFieldIds[i], vvrExpr, true);
-        HoldingContainer hc = cg.addExpr(vvwExpr);
-        cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+        boolean useSetSafe = !Types.isFixedWidthType(vvrExpr.getMajorType()) || Types.isRepeated(vvrExpr.getMajorType());
+        ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(outKeyFieldIds[i], vvrExpr, useSetSafe);
+        cg.addExpr(vvwExpr);
       }
 
-      cg.getEvalBlock()._return(JExpr.TRUE);
-    } else {
-      cg.getEvalBlock()._return(JExpr.FALSE);
     }
   }
 
@@ -320,22 +317,18 @@ public class ChainedHashTable {
 
         if (result == null) {
           throw new DrillRuntimeException(String.format("Join conditions cannot be compared failing build " +
-              "expression:" + " %s failing probe expression: %s", buildExpr.getMajorType().toString(),
+                  "expression:" + " %s failing probe expression: %s", buildExpr.getMajorType().toString(),
               probeExpr.getMajorType().toString()));
         } else if (result != buildType) {
           // Add a cast expression on top of the build expression
-          LogicalExpression castExpr =
-              ExpressionTreeMaterializer.addCastExpression(buildExpr, probeExpr.getMajorType(),
-                  context.getFunctionRegistry(), errorCollector);
+          LogicalExpression castExpr = ExpressionTreeMaterializer.addCastExpression(buildExpr, probeExpr.getMajorType(), context.getFunctionRegistry(), errorCollector);
           // Store the newly casted expression
           keyExprsBuild[i] =
               ExpressionTreeMaterializer.materialize(castExpr, incomingBuild, errorCollector,
                   context.getFunctionRegistry());
         } else if (result != probeType) {
           // Add a cast expression on top of the probe expression
-          LogicalExpression castExpr =
-              ExpressionTreeMaterializer.addCastExpression(probeExpr, buildExpr.getMajorType(),
-                  context.getFunctionRegistry(), errorCollector);
+          LogicalExpression castExpr = ExpressionTreeMaterializer.addCastExpression(probeExpr, buildExpr.getMajorType(), context.getFunctionRegistry(), errorCollector);
           // store the newly casted expression
           keyExprsProbe[i] =
               ExpressionTreeMaterializer.materialize(castExpr, incomingProbe, errorCollector,
@@ -346,7 +339,7 @@ public class ChainedHashTable {
   }
 
   private void setupGetHash(ClassGenerator<HashTable> cg, MappingSet incomingMapping, LogicalExpression[] keyExprs,
-      boolean isProbe) throws SchemaChangeException {
+                            boolean isProbe) throws SchemaChangeException {
 
     cg.setMappingSet(incomingMapping);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 1ec74bf..ef7dadf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -60,7 +60,7 @@ public interface HashTable {
 
   public void updateBatches();
 
-  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount);
+  public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount);
 
   public int containsKey(int incomingRowIdx, boolean isProbe);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index ba980d7..0908e50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -17,7 +17,13 @@
  */
 package org.apache.drill.exec.physical.impl.common;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import javax.inject.Named;
+
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
@@ -29,16 +35,13 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 
-import javax.inject.Named;
-import java.util.ArrayList;
-import java.util.Iterator;
-
 public abstract class HashTableTemplate implements HashTable {
 
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashTable.class);
@@ -116,6 +119,7 @@ public abstract class HashTableTemplate implements HashTable {
     private int batchIndex = 0;
 
     private BatchHolder(int idx) {
+
       this.batchIndex = idx;
 
       htContainer = new VectorContainer();
@@ -188,13 +192,10 @@ public abstract class HashTableTemplate implements HashTable {
 
     // Insert a new <key1, key2...keyN> entry coming from the incoming batch into the hash table
     // container at the specified index
-    private boolean insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch,
-        int lastEntryIdxWithinBatch) {
+    private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) {
       int currentIdxWithinBatch = currentIdx & BATCH_MASK;
 
-      if (!setValue(incomingRowIdx, currentIdxWithinBatch)) {
-        return false;
-      }
+      setValue(incomingRowIdx, currentIdxWithinBatch);
 
       // the previous entry in this hash chain should now point to the entry in this currentIdx
       if (lastEntryBatch != null) {
@@ -212,8 +213,6 @@ public abstract class HashTableTemplate implements HashTable {
         logger.debug("BatchHolder: inserted key at incomingRowIdx = {}, currentIdx = {}, hash value = {}.",
             incomingRowIdx, currentIdx, hashValue);
       }
-
-      return true;
     }
 
     private void updateLinks(int lastEntryIdxWithinBatch, int currentIdx) {
@@ -222,8 +221,7 @@ public abstract class HashTableTemplate implements HashTable {
 
     private void rehash(int numbuckets, IntVector newStartIndices, int batchStartIdx) {
 
-      logger.debug("Rehashing entries within the batch: {}; batchStartIdx = {}, total numBuckets in hash table = {}" +
-          ".", batchIndex, batchStartIdx, numbuckets);
+      logger.debug("Rehashing entries within the batch: {}; batchStartIdx = {}, total numBuckets in hash table = {}.", batchIndex, batchStartIdx, numbuckets);
 
       int size = links.getAccessor().getValueCount();
       IntVector newLinks = allocMetadataVector(size, EMPTY_SLOT);
@@ -411,14 +409,13 @@ public abstract class HashTableTemplate implements HashTable {
     }
 
     @RuntimeOverridden
-    protected boolean setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
-      return false;
+    protected void setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
     }
 
     @RuntimeOverridden
-    protected boolean outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
-      return false;
+    protected void outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
     }
+
   } // class BatchHolder
 
 
@@ -530,25 +527,13 @@ public abstract class HashTableTemplate implements HashTable {
     return rounded;
   }
 
-  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount) {
-    HashTable.PutStatus putStatus = put(incomingRowIdx, htIdxHolder);
-    int count = retryCount;
-    int numBatchHolders;
-    while (putStatus == PutStatus.PUT_FAILED && count > 0) {
-      logger.debug("Put into hash table failed .. Retrying with new batch holder...");
-      numBatchHolders = batchHolders.size();
-      this.addBatchHolder();
-      freeIndex = numBatchHolders * BATCH_SIZE;
-      putStatus = put(incomingRowIdx, htIdxHolder);
-      count--;
-    }
-    return putStatus;
+  public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount) {
+    put(incomingRowIdx, htIdxHolder);
   }
 
   private PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder) {
 
     int hash = getHashBuild(incomingRowIdx);
-    hash = Math.abs(hash);
     int i = getBucketIndex(hash, numBuckets());
     int startIdx = startIndices.getAccessor().get(i);
     int currentIdx;
@@ -569,14 +554,11 @@ public abstract class HashTableTemplate implements HashTable {
             incomingRowIdx, currentIdx);
       }
 
-      if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) {
-        // update the start index array
-        boolean status = startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), currentIdx);
-        assert status : "Unable to set start indices in the hash table.";
-        htIdxHolder.value = currentIdx;
-        return PutStatus.KEY_ADDED;
-      }
-      return PutStatus.PUT_FAILED;
+      insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch);
+      // update the start index array
+      startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), currentIdx);
+      htIdxHolder.value = currentIdx;
+      return PutStatus.KEY_ADDED;
     }
 
     currentIdx = startIdx;
@@ -610,49 +592,38 @@ public abstract class HashTableTemplate implements HashTable {
       addBatchIfNeeded(currentIdx);
 
       if (EXTRA_DEBUG) {
-        logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.",
-            incomingRowIdx, currentIdx);
+        logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
       }
 
-      if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) {
-        htIdxHolder.value = currentIdx;
-        return PutStatus.KEY_ADDED;
-      } else {
-        return PutStatus.PUT_FAILED;
-      }
+      insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch);
+      htIdxHolder.value = currentIdx;
+      return PutStatus.KEY_ADDED;
     }
 
     return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED;
   }
 
-  private boolean insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch,
-      int lastEntryIdx) {
+  private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) {
 
     addBatchIfNeeded(currentIdx);
 
     BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
 
-    if (bh.insertEntry(incomingRowIdx, currentIdx, hashValue, lastEntryBatch, lastEntryIdx)) {
-      numEntries++;
+    bh.insertEntry(incomingRowIdx, currentIdx, hashValue, lastEntryBatch, lastEntryIdx);
+    numEntries++;
 
       /* Resize hash table if needed and transfer the metadata
        * Resize only after inserting the current entry into the hash table
        * Otherwise our calculated lastEntryBatch and lastEntryIdx
        * becomes invalid after resize.
        */
-      resizeAndRehashIfNeeded();
-
-      return true;
-    }
-
-    return false;
+    resizeAndRehashIfNeeded();
   }
 
   // Return -1 if key is not found in the hash table. Otherwise, return the global index of the key
   @Override
   public int containsKey(int incomingRowIdx, boolean isProbe) {
     int hash = isProbe ? getHashProbe(incomingRowIdx) : getHashBuild(incomingRowIdx);
-    hash = Math.abs(hash);
     int i = getBucketIndex(hash, numBuckets());
 
     int currentIdx = startIndices.getAccessor().get(i);
@@ -764,7 +735,6 @@ public abstract class HashTableTemplate implements HashTable {
 
   public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords) {
     assert batchIdx < batchHolders.size();
-
     if (!batchHolders.get(batchIdx).outputKeys(outContainer, outStartIndex, numRecords)) {
       return false;
     }
@@ -788,8 +758,7 @@ public abstract class HashTableTemplate implements HashTable {
   }
 
   // These methods will be code-generated in the context of the outer class
-  protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild,
-      @Named("incomingProbe") RecordBatch incomingProbe);
+  protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe);
 
   protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index e82dd29..02c154f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -351,7 +351,6 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
         ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
         HoldingContainer hc = cg.addExpr(write);
 
-        cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
         logger.debug("Added eval for project expression.");
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index f370dc7..4af0292 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -17,9 +17,9 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
-import com.sun.codemodel.JExpr;
-import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JVar;
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.common.logical.data.NamedExpression;
@@ -49,6 +49,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -56,10 +57,15 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.eigenbase.rel.JoinRelType;
 
-import java.io.IOException;
-import java.util.List;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JVar;
 
 public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
+
+  public static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
+  public static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
+
   // Probe side record batch
   private final RecordBatch left;
 
@@ -107,18 +113,18 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
       GeneratorMapping.create("doSetup"/* setup method */, "projectBuildRecord" /* eval method */, null /* reset */,
           null /* cleanup */);
   // Generator mapping for the build side : constant
-  private static final GeneratorMapping PROJECT_BUILD_CONSTANT =
-      GeneratorMapping.create("doSetup"/* setup method */, "doSetup" /* eval method */, null /* reset */,
-          null /* cleanup */);
+  private static final GeneratorMapping PROJECT_BUILD_CONSTANT = GeneratorMapping.create("doSetup"/* setup method */,
+      "doSetup" /* eval method */,
+      null /* reset */, null /* cleanup */);
 
   // Generator mapping for the probe side : scalar
   private static final GeneratorMapping PROJECT_PROBE =
       GeneratorMapping.create("doSetup" /* setup method */, "projectProbeRecord" /* eval method */, null /* reset */,
           null /* cleanup */);
   // Generator mapping for the probe side : constant
-  private static final GeneratorMapping PROJECT_PROBE_CONSTANT =
-      GeneratorMapping.create("doSetup" /* setup method */, "doSetup" /* eval method */, null /* reset */,
-          null /* cleanup */);
+  private static final GeneratorMapping PROJECT_PROBE_CONSTANT = GeneratorMapping.create("doSetup" /* setup method */,
+      "doSetup" /* eval method */,
+      null /* reset */, null /* cleanup */);
 
 
   // Mapping set for the build side
@@ -127,9 +133,13 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
           "outgoing" /* write container */, PROJECT_BUILD_CONSTANT, PROJECT_BUILD);
 
   // Mapping set for the probe side
-  private final MappingSet projectProbeMapping =
-      new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */, "probeBatch" /* read container */,
-          "outgoing" /* write container */, PROJECT_PROBE_CONSTANT, PROJECT_PROBE);
+  private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
+      "probeBatch" /* read container */,
+      "outgoing" /* write container */,
+      PROJECT_PROBE_CONSTANT, PROJECT_PROBE);
+
+  // indicates if we have previously returned an output batch
+  boolean firstOutputBatch = true;
 
   IterOutcome leftUpstream = IterOutcome.NONE;
   IterOutcome rightUpstream = IterOutcome.NONE;
@@ -156,6 +166,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     return outputRecords;
   }
 
+
   @Override
   protected void buildSchema() throws SchemaChangeException {
     leftUpstream = next(left);
@@ -341,9 +352,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
       case OK:
         int currentRecordCount = right.getRecordCount();
 
-        /* For every new build batch, we store some state in the helper context
-         * Add new state to the helper context
-         */
+                    /* For every new build batch, we store some state in the helper context
+                     * Add new state to the helper context
+                     */
         hjHelper.addNewBatch(currentRecordCount);
 
         // Holder contains the global index where the key is hashed into using the hash table
@@ -352,21 +363,19 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
         // For every record in the build batch , hash the key columns
         for (int i = 0; i < currentRecordCount; i++) {
 
-          HashTable.PutStatus status = hashTable.put(i, htIndex, 1 /* retry count */);
+          hashTable.put(i, htIndex, 1 /* retry count */);
 
-          if (status != HashTable.PutStatus.PUT_FAILED) {
-            /* Use the global index returned by the hash table, to store
-             * the current record index and batch index. This will be used
-             * later when we probe and find a match.
-             */
-            hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
-          }
+                        /* Use the global index returned by the hash table, to store
+                         * the current record index and batch index. This will be used
+                         * later when we probe and find a match.
+                         */
+          hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
         }
 
-        /* Completed hashing all records in this batch. Transfer the batch
-         * to the hyper vector container. Will be used when we want to retrieve
-         * records that have matching keys on the probe side.
-         */
+                    /* Completed hashing all records in this batch. Transfer the batch
+                     * to the hyper vector container. Will be used when we want to retrieve
+                     * records that have matching keys on the probe side.
+                     */
         RecordBatchData nextBatch = new RecordBatchData(right);
         if (hyperContainer == null) {
           hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
@@ -386,8 +395,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
   public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
 
 
-    final CodeGenerator<HashJoinProbe> cg =
-        CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+    final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
     ClassGenerator<HashJoinProbe> g = cg.getRoot();
 
     // Generate the code to project build side records
@@ -411,20 +419,18 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
         }
 
         // Add the vector to our output container
-        //                ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(),
-        // outputType), context.getAllocator());
         container.addOrGet(MaterializedField.create(field.getPath(), outputType));
 
         JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(field.getType(), true, fieldId));
         JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
-        g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
-            .arg(outIndex).arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE);
+        g.getEvalBlock().add(outVV.invoke("copyFromSafe")
+            .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+            .arg(outIndex)
+            .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))));
 
         fieldId++;
       }
     }
-    g.rotateBlock();
-    g.getEvalBlock()._return(JExpr.TRUE);
 
     // Generate the code to project probe side records
     g.setMappingSet(projectProbeMapping);
@@ -432,6 +438,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     int outputFieldId = fieldId;
     fieldId = 0;
     JExpression probeIndex = JExpr.direct("probeIndex");
+    int recordCount = 0;
 
     if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
       for (VectorWrapper<?> vv : left) {
@@ -453,15 +460,13 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
         JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
         JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));
 
-        g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()
-            ._return(JExpr.FALSE);
+        g.getEvalBlock().add(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV));
 
         fieldId++;
         outputFieldId++;
       }
+      recordCount = left.getRecordCount();
     }
-    g.rotateBlock();
-    g.getEvalBlock()._return(JExpr.TRUE);
 
     HashJoinProbe hj = context.getImplementationClass(cg);
 
@@ -518,4 +523,5 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     right.cleanup();
     left.cleanup();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index 7599f9e..42c7010 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -49,6 +49,6 @@ public interface HashJoinProbe {
                                           JoinRelType joinRelType);
   public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
   public abstract int  probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
-  public abstract boolean projectBuildRecord(int buildIndex, int outIndex);
-  public abstract boolean projectProbeRecord(int probeIndex, int outIndex);
+  public abstract void projectBuildRecord(int buildIndex, int outIndex);
+  public abstract void projectProbeRecord(int probeIndex, int outIndex);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index c58f9a3..dcf73b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -98,18 +98,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
   }
 
   public void executeProjectRightPhase() {
-    boolean success = true;
     while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) {
-      success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), outputRecords);
-      if (success) {
-        recordsProcessed++;
-        outputRecords++;
-      } else {
-        if (outputRecords == 0) {
-          throw new IllegalStateException("Too big to fail.");
-        }
-        break;
-      }
+      projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), outputRecords);
+      recordsProcessed++;
+      outputRecords++;
     }
   }
 
@@ -178,64 +170,38 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
              */
             hjHelper.setRecordMatched(currentCompositeIdx);
 
-            boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) //
-                &&  projectProbeRecord(recordsProcessed, outputRecords);
-            if (!success) {
-              // we failed to project.  redo this record.
-              getNextRecord = false;
-              return;
+            projectBuildRecord(currentCompositeIdx, outputRecords);
+            projectProbeRecord(recordsProcessed, outputRecords);
+            outputRecords++;
+            /* Projected single row from the build side with matching key but there
+             * may be more rows with the same key. Check if that's the case
+             */
+            currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+            if (currentCompositeIdx == -1) {
+              /* We only had one row in the build side that matched the current key
+               * from the probe side. Drain the next row in the probe side.
+               */
+              recordsProcessed++;
             } else {
-              outputRecords++;
-
-              /* Projected single row from the build side with matching key but there
-               * may be more rows with the same key. Check if that's the case
+              /* There is more than one row with the same key on the build side
+               * don't drain more records from the probe side till we have projected
+               * all the rows with this key
                */
-              currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
-              if (currentCompositeIdx == -1) {
-                /* We only had one row in the build side that matched the current key
-                 * from the probe side. Drain the next row in the probe side.
-                 */
-                recordsProcessed++;
-              } else {
-                /* There is more than one row with the same key on the build side
-                 * don't drain more records from the probe side till we have projected
-                 * all the rows with this key
-                 */
-                getNextRecord = false;
-              }
+              getNextRecord = false;
             }
-
         } else { // No matching key
 
             // If we have a left outer join, project the keys
             if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
-              boolean success = projectProbeRecord(recordsProcessed, outputRecords);
-              if (!success) {
-                if (outputRecords == 0) {
-                  throw new IllegalStateException("Record larger than single batch.");
-                } else {
-                  // we've output some records but failed to output this one.  return and wait for next call.
-                  return;
-                }
-              }
-              assert success;
+              projectProbeRecord(recordsProcessed, outputRecords);
               outputRecords++;
             }
             recordsProcessed++;
           }
       } else {
         hjHelper.setRecordMatched(currentCompositeIdx);
-        boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) //
-            && projectProbeRecord(recordsProcessed, outputRecords);
-        if (!success) {
-          if (outputRecords == 0) {
-            throw new IllegalStateException("Record larger than single batch.");
-          } else {
-            // we've output some records but failed to output this one.  return and wait for next call.
-            return;
-          }
-        }
-        assert success;
+        projectBuildRecord(currentCompositeIdx, outputRecords);
+        projectProbeRecord(recordsProcessed, outputRecords);
         outputRecords++;
 
         currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
@@ -276,8 +242,8 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
 
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
                                @Named("outgoing") RecordBatch outgoing);
-  public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
+  public abstract void projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
 
-  public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
+  public abstract void projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index c1dffc1..48a0996 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -75,6 +75,8 @@ import org.eigenbase.rel.JoinRelType;
  */
 public abstract class JoinTemplate implements JoinWorker {
 
+  private static final int OUTPUT_BATCH_SIZE = 32*1024;
+
   @Override
   public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException {
     doSetup(context, status, outgoing);
@@ -86,7 +88,7 @@ public abstract class JoinTemplate implements JoinWorker {
    * @return  true of join succeeded; false if the worker needs to be regenerated
    */
   public final boolean doJoin(final JoinStatus status) {
-    while (true) {
+    for (int i = 0; i < OUTPUT_BATCH_SIZE; i++) {
       // for each record
 
       // validate input iterators (advancing to the next record batch if necessary)
@@ -94,9 +96,7 @@ public abstract class JoinTemplate implements JoinWorker {
         if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
           // we've hit the end of the right record batch; copy any remaining values from the left batch
           while (status.isLeftPositionAllowed()) {
-            if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) {
-              return false;
-            }
+            doCopyLeft(status.getLeftPosition(), status.getOutPosition());
 
             status.incOutputPos();
             status.advanceLeft();
@@ -114,9 +114,7 @@ public abstract class JoinTemplate implements JoinWorker {
       case -1:
         // left key < right key
         if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
-          if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) {
-            return false;
-          }
+          doCopyLeft(status.getLeftPosition(), status.getOutPosition());
           status.incOutputPos();
         }
         status.advanceLeft();
@@ -142,13 +140,9 @@ public abstract class JoinTemplate implements JoinWorker {
         int initialRightPosition = status.getRightPosition();
         do {
           // copy all equal right keys to the output record batch
-          if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) {
-            return false;
-          }
+          doCopyLeft(status.getLeftPosition(), status.getOutPosition());
 
-          if (!doCopyRight(status.getRightPosition(), status.getOutPosition())) {
-            return false;
-          }
+          doCopyRight(status.getRightPosition(), status.getOutPosition());
 
           status.incOutputPos();
 
@@ -197,6 +191,7 @@ public abstract class JoinTemplate implements JoinWorker {
         throw new IllegalStateException();
       }
     }
+    return false;
   }
 
   // Generated Methods
@@ -213,8 +208,8 @@ public abstract class JoinTemplate implements JoinWorker {
    * @param outIndex position of the output record batch
    * @return Whether or not the data was copied.
    */
-  public abstract boolean doCopyLeft(@Named("leftIndex") int leftIndex, @Named("outIndex") int outIndex);
-  public abstract boolean doCopyRight(@Named("rightIndex") int rightIndex, @Named("outIndex") int outIndex);
+  public abstract void doCopyLeft(@Named("leftIndex") int leftIndex, @Named("outIndex") int outIndex);
+  public abstract void doCopyRight(@Named("rightIndex") int rightIndex, @Named("outIndex") int outIndex);
 
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index d0f9d7d..8a6e1f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -376,16 +376,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
         JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
                                                          new TypedFieldId(outputType,vectorId));
         // todo: check result of copyFromSafe and grow allocation
-        cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
+        cg.getEvalBlock().add(vvOut.invoke("copyFromSafe")
                                      .arg(copyLeftMapping.getValueReadIndex())
                                      .arg(copyLeftMapping.getValueWriteIndex())
-                                     .arg(vvIn).eq(JExpr.FALSE))
-            ._then()
-            ._return(JExpr.FALSE);
+                                     .arg(vvIn));
         ++vectorId;
       }
     }
-    cg.getEvalBlock()._return(JExpr.lit(true));
 
     // generate copyRight()
     ///////////////////////
@@ -406,16 +403,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
         JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
                                                          new TypedFieldId(outputType,vectorId));
         // todo: check result of copyFromSafe and grow allocation
-        cg.getEvalBlock()._if(vvOut.invoke("copyFromSafe")
+        cg.getEvalBlock().add(vvOut.invoke("copyFromSafe")
                                    .arg(copyRightMappping.getValueReadIndex())
                                    .arg(copyRightMappping.getValueWriteIndex())
-                                   .arg(vvIn).eq(JExpr.FALSE))
-            ._then()
-            ._return(JExpr.FALSE);
+                                   .arg(vvIn));
         ++vectorId;
       }
     }
-    cg.getEvalBlock()._return(JExpr.lit(true));
 
     JoinWorker w = context.getImplementationClass(cg);
     w.setupJoin(context, status, this.container);
@@ -469,7 +463,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     }
 
     for (VectorWrapper w : container) {
-      AllocationHelper.allocate(w.getValueVector(), 5000, 50);
+      AllocationHelper.allocateNew(w.getValueVector(), Character.MAX_VALUE);
     }
 
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
index 2885c52..f2a95b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
@@ -34,7 +34,7 @@ public interface MergingReceiverGeneratorBase {
   public abstract int doEval(int leftIndex,
                                 int rightIndex);
 
-  public abstract boolean doCopy(int inIndex, int outIndex);
+  public abstract void doCopy(int inIndex, int outIndex);
 
   public static TemplateClassDefinition<MergingReceiverGeneratorBase> TEMPLATE_DEFINITION =
       new TemplateClassDefinition<>(MergingReceiverGeneratorBase.class, MergingReceiverTemplate.class);

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
index c29ef75..537ae74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
@@ -35,5 +35,5 @@ public abstract class MergingReceiverTemplate implements MergingReceiverGenerato
   public abstract int doEval(@Named("leftIndex") int leftIndex,
                                 @Named("rightIndex") int rightIndex);
 
-  public abstract boolean doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract void doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index acbb755..d78ba8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -71,7 +71,9 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.CopyUtil;
+import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.eigenbase.rel.RelFieldCollation.Direction;
 
@@ -90,6 +92,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
   private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
   private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
+  private static final int OUTGOING_BATCH_SIZE = 32 * 1024;
 
   private RecordBatchLoader[] batchLoaders;
   private RawFragmentBatchProvider[] fragProviders;
@@ -266,9 +269,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
         // allocate a new value vector
         ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField());
-        outgoingVector.allocateNew();
         ++vectorCount;
       }
+      allocateOutgoing();
 
 
       schema = bldr.build();
@@ -559,7 +562,14 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   }
 
   private void allocateOutgoing() {
-    outgoingContainer.allocateNew();
+    for (VectorWrapper w : outgoingContainer) {
+      ValueVector v = w.getValueVector();
+      if (v instanceof FixedWidthVector) {
+        AllocationHelper.allocate(v, OUTGOING_BATCH_SIZE, 1);
+      } else {
+        v.allocateNewSafe();
+      }
+    }
   }
 
 //  private boolean isOutgoingFull() {
@@ -648,12 +658,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
    */
   private boolean copyRecordToOutgoingBatch(Node node) {
     int inIndex = (node.batchId << 16) + node.valueIndex;
-    if (!merger.doCopy(inIndex, outgoingPosition)) {
+    merger.doCopy(inIndex, outgoingPosition);
+    outgoingPosition++;
+    if (outgoingPosition == OUTGOING_BATCH_SIZE) {
       return false;
-    } else {
-      outgoingPosition++;
-      return true;
     }
+    return true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
index f5068b4..3c4e9e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
@@ -66,9 +66,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit
     int counter = 0;
     for (int i = 0; i < countN; i++, firstOutputIndex++) {
       int partition = getPartition(i);
-      if (!partitionValues.getMutator().setSafe(i, partition)) {
-        throw new RuntimeException();
-      }
+      partitionValues.getMutator().setSafe(i, partition);
       counter++;
     }
     for(TransferPair t : transfers){

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 20f6195..4292c09 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -148,15 +148,7 @@ public abstract class PartitionerTemplate implements Partitioner {
       case NONE:
         for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
           OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(recordId));
-          if (!outgoingBatch.copy(recordId)) {
-            logger.trace(REWRITE_MSG, recordId);
-            outgoingBatch.flush();
-            if (!outgoingBatch.copy(recordId)) {
-              String msg = composeTooBigMsg(recordId, incoming);
-              logger.debug(msg);
-              throw new IOException(msg);
-            }
-          }
+          outgoingBatch.copy(recordId);
         }
         break;
 
@@ -164,15 +156,7 @@ public abstract class PartitionerTemplate implements Partitioner {
         for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
           int svIndex = sv2.getIndex(recordId);
           OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(svIndex));
-          if (!outgoingBatch.copy(svIndex)) {
-            logger.trace(REWRITE_MSG, svIndex);
-            outgoingBatch.flush();
-            if (!outgoingBatch.copy(svIndex)) {
-              String msg = composeTooBigMsg(recordId, incoming);
-              logger.debug(msg);
-              throw new IOException(msg);
-            }
-          }
+          outgoingBatch.copy(svIndex);
         }
         break;
 
@@ -180,15 +164,7 @@ public abstract class PartitionerTemplate implements Partitioner {
         for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
           int svIndex = sv4.get(recordId);
           OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(svIndex));
-          if (!outgoingBatch.copy(svIndex)) {
-            logger.trace(REWRITE_MSG, svIndex);
-            outgoingBatch.flush();
-            if (!outgoingBatch.copy(svIndex)) {
-              String msg = composeTooBigMsg(recordId, incoming);
-              logger.debug(msg);
-              throw new IOException(msg);
-            }
-          }
+          outgoingBatch.copy(svIndex);
         }
         break;
 
@@ -252,16 +228,13 @@ public abstract class PartitionerTemplate implements Partitioner {
       this.statusHandler = statusHandler;
     }
 
-    protected boolean copy(int inIndex) throws IOException {
-      if (doEval(inIndex, recordCount)) {
-        recordCount++;
-        totalRecords++;
-        if (recordCount == DEFAULT_RECORD_BATCH_SIZE) {
-          flush();
-        }
-        return true;
+    protected void copy(int inIndex) throws IOException {
+      doEval(inIndex, recordCount);
+      recordCount++;
+      totalRecords++;
+      if (recordCount == DEFAULT_RECORD_BATCH_SIZE) {
+        flush();
       }
-      return false;
     }
 
     @Override
@@ -273,7 +246,7 @@ public abstract class PartitionerTemplate implements Partitioner {
     protected void doSetup(@Named("incoming") RecordBatch incoming, @Named("outgoing") VectorAccessible outgoing) {};
 
     @RuntimeOverridden
-    protected boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) { return false; };
+    protected void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) { };
 
     public void flush() throws IOException {
       if (dropAll) {

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 539d028..8f7812f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -64,6 +64,8 @@ import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.util.BatchPrinter;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
@@ -225,10 +227,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
   private boolean doAlloc() {
     //Allocate vv in the allocationVectors.
     for (ValueVector v : this.allocationVectors) {
-      //AllocationHelper.allocate(v, remainingRecordCount, 250);
-      if (!v.allocateNewSafe()) {
-        return false;
-      }
+      AllocationHelper.allocateNew(v, incoming.getRecordCount());
     }
 
     //Allocate vv for complexWriters.
@@ -363,8 +362,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
               TypedFieldId fid = container.getValueVectorId(outputField.getPath());
               ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
               HoldingContainer hc = cg.addExpr(write);
-
-              cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
             }
           }
           continue;
@@ -428,18 +425,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
         ValueVector vector = container.addOrGet(outputField, callBack);
         allocationVectors.add(vector);
         TypedFieldId fid = container.getValueVectorId(outputField.getPath());
-        ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
+        boolean useSetSafe = !(vector instanceof FixedWidthVector);
+        ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
         HoldingContainer hc = cg.addExpr(write);
 
-        cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
         logger.debug("Added eval for project expression.");
       }
     }
 
-    cg.rotateBlock();
-    cg.getEvalBlock()._return(JExpr.TRUE);
-
-
     try {
       this.projector = context.getImplementationClass(cg.getCodeGenerator());
       projector.setup(context, incoming, this, transfers);


[3/3] drill git commit: DRILL-1960: Automatic reallocation

Posted by sm...@apache.org.
DRILL-1960: Automatic reallocation


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a22b4724
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a22b4724
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a22b4724

Branch: refs/heads/master
Commit: a22b47243dbfdc5d956a5a7cf7964a6b9ae1418e
Parents: 839ae24
Author: Steven Phillips <sp...@maprtech.com>
Authored: Fri Jan 16 16:30:21 2015 -0800
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Thu Jan 22 17:02:12 2015 -0800

----------------------------------------------------------------------
 .../exec/store/hbase/HBaseRecordReader.java     |  10 +-
 .../exec/store/hive/HiveFieldConverter.java     |  66 ++--
 .../drill/exec/store/hive/HiveRecordReader.java |   6 +-
 .../exec/store/hive/HiveTextRecordReader.java   |   8 +-
 .../exec/store/mongo/MongoRecordReader.java     |   9 +-
 .../main/codegen/templates/ComplexReaders.java  |   8 +-
 .../main/codegen/templates/ComplexWriters.java  |  19 +-
 .../codegen/templates/FixedValueVectors.java    | 123 ++++----
 .../src/main/codegen/templates/MapWriters.java  |  10 +-
 .../codegen/templates/NullableValueVectors.java | 131 +++-----
 .../codegen/templates/RepeatedValueVectors.java |  75 ++---
 .../src/main/codegen/templates/TypeHelper.java  |  17 +-
 .../templates/VariableLengthVectors.java        |  86 ++----
 .../drill/exec/expr/EvaluationVisitor.java      |  18 --
 .../drill/exec/expr/fn/DrillAggFuncHolder.java  |  17 +-
 .../expr/fn/DrillComplexWriterFuncHolder.java   |   6 +-
 .../physical/impl/aggregate/HashAggBatch.java   |  25 +-
 .../impl/aggregate/HashAggTemplate.java         | 302 ++++++++-----------
 .../physical/impl/aggregate/HashAggregator.java |   6 +-
 .../impl/aggregate/StreamingAggBatch.java       |   6 -
 .../impl/aggregate/StreamingAggTemplate.java    |  85 ++----
 .../physical/impl/common/ChainedHashTable.java  |  71 ++---
 .../exec/physical/impl/common/HashTable.java    |   2 +-
 .../physical/impl/common/HashTableTemplate.java |  91 ++----
 .../impl/flatten/FlattenRecordBatch.java        |   1 -
 .../exec/physical/impl/join/HashJoinBatch.java  |  88 +++---
 .../exec/physical/impl/join/HashJoinProbe.java  |   4 +-
 .../impl/join/HashJoinProbeTemplate.java        |  82 ++---
 .../exec/physical/impl/join/JoinTemplate.java   |  25 +-
 .../exec/physical/impl/join/MergeJoinBatch.java |  16 +-
 .../MergingReceiverGeneratorBase.java           |   2 +-
 .../mergereceiver/MergingReceiverTemplate.java  |   2 +-
 .../impl/mergereceiver/MergingRecordBatch.java  |  22 +-
 .../OrderedPartitionProjectorTemplate.java      |   4 +-
 .../partitionsender/PartitionerTemplate.java    |  47 +--
 .../impl/project/ProjectRecordBatch.java        |  17 +-
 .../impl/project/ProjectorTemplate.java         |  10 +-
 .../impl/svremover/CopierTemplate2.java         |  16 +-
 .../impl/svremover/CopierTemplate4.java         |  16 +-
 .../window/StreamingWindowFrameRecordBatch.java |   4 -
 .../window/StreamingWindowFrameTemplate.java    |  14 +-
 .../impl/xsort/PriorityQueueCopierTemplate.java |   7 +-
 .../apache/drill/exec/record/TransferPair.java  |   2 +-
 .../columnreaders/FixedWidthRepeatedReader.java |   6 +-
 .../columnreaders/NullableBitReader.java        |   6 +-
 .../columnreaders/VarLengthColumnReaders.java   |  28 +-
 .../columnreaders/VarLengthValuesColumn.java    |   6 +-
 .../drill/exec/store/pojo/PojoRecordReader.java |   8 +-
 .../drill/exec/store/pojo/PojoWriter.java       |   2 +-
 .../apache/drill/exec/store/pojo/Writers.java   |  57 ++--
 .../exec/store/text/DrillTextRecordReader.java  |  20 +-
 .../drill/exec/vector/AllocationHelper.java     |  15 +
 .../org/apache/drill/exec/vector/BitVector.java |  39 ++-
 .../org/apache/drill/exec/vector/CopyUtil.java  |  18 +-
 .../exec/vector/RepeatedFixedWidthVector.java   |   2 +-
 .../drill/exec/vector/RepeatedMutator.java      |   2 +-
 .../drill/exec/vector/VariableWidthVector.java  |   2 +-
 .../drill/exec/vector/complex/MapVector.java    |  15 +-
 .../exec/vector/complex/RepeatedListVector.java |  30 +-
 .../exec/vector/complex/RepeatedMapVector.java  |  48 ++-
 .../complex/impl/RepeatedListReaderImpl.java    |   4 +-
 .../complex/impl/RepeatedMapReaderImpl.java     |   6 +-
 .../complex/impl/SingleMapReaderImpl.java       |   4 +-
 .../physical/impl/window/TestWindowFrame.java   |   4 +
 .../exec/vector/TestAdaptiveAllocation.java     | 108 -------
 65 files changed, 770 insertions(+), 1236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 33bf376..16ccc15 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -204,10 +204,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
       KeyValue[] kvs = result.raw();
       byte[] bytes = result.getBytes().get();
       if (rowKeyVector != null) {
-        if (!rowKeyVector.getMutator().setSafe(rowCount, bytes, kvs[0].getRowOffset(), kvs[0].getRowLength())) {
-          leftOver = result;
-          break done;
-        }
+        rowKeyVector.getMutator().setSafe(rowCount, bytes, kvs[0].getRowOffset(), kvs[0].getRowLength());
       }
 
       for (KeyValue kv : kvs) {
@@ -221,10 +218,7 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
 
         int valueOffset = kv.getValueOffset();
         int valueLength = kv.getValueLength();
-        if (!v.getMutator().setSafe(rowCount, bytes, valueOffset, valueLength)) {
-          leftOver = result;
-          break done;
-        }
+        v.getMutator().setSafe(rowCount, bytes, valueOffset, valueLength);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveFieldConverter.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveFieldConverter.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveFieldConverter.java
index 82e038c..658dd79 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveFieldConverter.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveFieldConverter.java
@@ -67,7 +67,7 @@ import com.google.common.collect.Maps;
 
 public abstract class HiveFieldConverter {
 
-  public abstract boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex);
+  public abstract void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex);
 
   private static Map<PrimitiveCategory, Class< ? extends HiveFieldConverter>> primMap = Maps.newHashMap();
 
@@ -129,25 +129,25 @@ public abstract class HiveFieldConverter {
 
   public static class Binary extends HiveFieldConverter {
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       final byte[] value = ((BinaryObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      return ((NullableVarBinaryVector) outputVV).getMutator().setSafe(outputIndex, value, 0, value.length);
+      ((NullableVarBinaryVector) outputVV).getMutator().setSafe(outputIndex, value, 0, value.length);
     }
   }
 
   public static class Boolean extends HiveFieldConverter {
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       final boolean value = (boolean) ((BooleanObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      return ((NullableBitVector) outputVV).getMutator().setSafe(outputIndex, value ? 1 : 0);
+      ((NullableBitVector) outputVV).getMutator().setSafe(outputIndex, value ? 1 : 0);
     }
   }
 
   public static class Byte extends HiveFieldConverter {
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       final byte value = (byte) ((ByteObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      return ((NullableTinyIntVector) outputVV).getMutator().setSafe(outputIndex, value);
+      ((NullableTinyIntVector) outputVV).getMutator().setSafe(outputIndex, value);
     }
   }
 
@@ -160,11 +160,11 @@ public abstract class HiveFieldConverter {
     }
 
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       holder.value = DecimalUtility.getDecimal9FromBigDecimal(
           ((HiveDecimalObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue).bigDecimalValue(),
           holder.scale, holder.precision);
-      return ((NullableDecimal9Vector) outputVV).getMutator().setSafe(outputIndex, holder);
+      ((NullableDecimal9Vector) outputVV).getMutator().setSafe(outputIndex, holder);
     }
   }
 
@@ -177,11 +177,11 @@ public abstract class HiveFieldConverter {
     }
 
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       holder.value = DecimalUtility.getDecimal18FromBigDecimal(
           ((HiveDecimalObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue).bigDecimalValue(),
           holder.scale, holder.precision);
-      return ((NullableDecimal18Vector) outputVV).getMutator().setSafe(outputIndex, holder);
+      ((NullableDecimal18Vector) outputVV).getMutator().setSafe(outputIndex, holder);
     }
   }
 
@@ -196,11 +196,11 @@ public abstract class HiveFieldConverter {
     }
 
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       DecimalUtility.getSparseFromBigDecimal(
           ((HiveDecimalObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue).bigDecimalValue(),
           holder.buffer, holder.start, holder.scale, holder.precision, Decimal28SparseHolder.nDecimalDigits);
-      return ((NullableDecimal28SparseVector) outputVV).getMutator().setSafe(outputIndex, holder);
+      ((NullableDecimal28SparseVector) outputVV).getMutator().setSafe(outputIndex, holder);
     }
   }
 
@@ -215,89 +215,89 @@ public abstract class HiveFieldConverter {
     }
 
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       DecimalUtility.getSparseFromBigDecimal(
           ((HiveDecimalObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue).bigDecimalValue(),
           holder.buffer, holder.start, holder.scale, holder.precision, Decimal38SparseHolder.nDecimalDigits);
-      return ((NullableDecimal38SparseVector) outputVV).getMutator().setSafe(outputIndex, holder);
+      ((NullableDecimal38SparseVector) outputVV).getMutator().setSafe(outputIndex, holder);
     }
   }
 
   public static class Double extends HiveFieldConverter {
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       final double value = (double) ((DoubleObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      return ((NullableFloat8Vector) outputVV).getMutator().setSafe(outputIndex, value);
+      ((NullableFloat8Vector) outputVV).getMutator().setSafe(outputIndex, value);
     }
   }
 
   public static class Float extends HiveFieldConverter {
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       final float value = (float) ((FloatObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      return ((NullableFloat4Vector) outputVV).getMutator().setSafe(outputIndex, value);
+      ((NullableFloat4Vector) outputVV).getMutator().setSafe(outputIndex, value);
     }
   }
 
   public static class Int extends HiveFieldConverter {
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       final int value = (int) ((IntObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      return ((NullableIntVector) outputVV).getMutator().setSafe(outputIndex, value);
+      ((NullableIntVector) outputVV).getMutator().setSafe(outputIndex, value);
     }
   }
 
   public static class Long extends HiveFieldConverter {
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       final long value = (long) ((LongObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      return ((NullableBigIntVector) outputVV).getMutator().setSafe(outputIndex, value);
+      ((NullableBigIntVector) outputVV).getMutator().setSafe(outputIndex, value);
     }
   }
 
   public static class Short extends HiveFieldConverter {
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       final short value = (short) ((ShortObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
-      return ((NullableSmallIntVector) outputVV).getMutator().setSafe(outputIndex, value);
+      ((NullableSmallIntVector) outputVV).getMutator().setSafe(outputIndex, value);
     }
   }
 
   public static class String extends HiveFieldConverter {
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       final Text value = ((StringObjectInspector)oi).getPrimitiveWritableObject(hiveFieldValue);
       final byte[] valueBytes = value.getBytes();
       final int len = value.getLength();
-      return ((NullableVarCharVector) outputVV).getMutator().setSafe(outputIndex, valueBytes, 0, len);
+      ((NullableVarCharVector) outputVV).getMutator().setSafe(outputIndex, valueBytes, 0, len);
     }
   }
 
   public static class VarChar extends HiveFieldConverter {
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       final Text value = ((HiveVarcharObjectInspector)oi).getPrimitiveWritableObject(hiveFieldValue).getTextValue();
       final byte[] valueBytes = value.getBytes();
       final int valueLen = value.getLength();
-      return ((NullableVarCharVector) outputVV).getMutator().setSafe(outputIndex, valueBytes, 0, valueLen);
+      ((NullableVarCharVector) outputVV).getMutator().setSafe(outputIndex, valueBytes, 0, valueLen);
     }
   }
 
   public static class Timestamp extends HiveFieldConverter {
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       final java.sql.Timestamp value = ((TimestampObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
       final DateTime ts = new DateTime(value.getTime()).withZoneRetainFields(DateTimeZone.UTC);
-      return ((NullableTimeStampVector) outputVV).getMutator().setSafe(outputIndex, ts.getMillis());
+      ((NullableTimeStampVector) outputVV).getMutator().setSafe(outputIndex, ts.getMillis());
     }
   }
 
   public static class Date extends HiveFieldConverter {
     @Override
-    public boolean setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
+    public void setSafeValue(ObjectInspector oi, Object hiveFieldValue, ValueVector outputVV, int outputIndex) {
       final java.sql.Date value = ((DateObjectInspector)oi).getPrimitiveJavaObject(hiveFieldValue);
       final DateTime date = new DateTime(value.getTime()).withZoneRetainFields(DateTimeZone.UTC);
-      return ((NullableDateVector) outputVV).getMutator().setSafe(outputIndex, date.getMillis());
+      ((NullableDateVector) outputVV).getMutator().setSafe(outputIndex, date.getMillis());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 625a7b2..bad7a4e 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -327,12 +327,8 @@ public class HiveRecordReader extends AbstractRecordReader {
       Object hiveValue = sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName));
 
       if (hiveValue != null) {
-        success = selectedColumnFieldConverters.get(i).setSafeValue(selectedColumnObjInspectors.get(i), hiveValue,
+        selectedColumnFieldConverters.get(i).setSafeValue(selectedColumnObjInspectors.get(i), hiveValue,
             vectors.get(i), outputRecordIndex);
-
-        if (!success) {
-          return false;
-        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
index 5406048..2deb7c5 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
@@ -66,7 +66,7 @@ public class HiveTextRecordReader extends HiveRecordReader {
     numCols = tableColumns.size();
   }
 
-  public boolean setValue(PrimitiveObjectInspector.PrimitiveCategory pCat, ValueVector vv, int index, byte[] bytes, int start) {
+  public void setValue(PrimitiveObjectInspector.PrimitiveCategory pCat, ValueVector vv, int index, byte[] bytes, int start) {
     switch(pCat) {
       case BINARY:
         throw new UnsupportedOperationException();
@@ -86,7 +86,7 @@ public class HiveTextRecordReader extends HiveRecordReader {
         for (int i = start; (b = bytes[i]) != delimiter; i++) {
           value = (value * 10) + b - 48;
         }
-        return ((NullableIntVector) vv).getMutator().setSafe(index, value);
+        ((NullableIntVector) vv).getMutator().setSafe(index, value);
       }
       case LONG: {
         long value = 0;
@@ -94,7 +94,7 @@ public class HiveTextRecordReader extends HiveRecordReader {
         for (int i = start; (b = bytes[i]) != delimiter; i++) {
           value = (value * 10) + b - 48;
         }
-        return ((NullableBigIntVector) vv).getMutator().setSafe(index, value);
+        ((NullableBigIntVector) vv).getMutator().setSafe(index, value);
       }
       case SHORT:
         throw new UnsupportedOperationException();
@@ -107,7 +107,7 @@ public class HiveTextRecordReader extends HiveRecordReader {
           }
           end = bytes.length;
         }
-        return ((NullableVarCharVector) vv).getMutator().setSafe(index, bytes, start, end - start);
+        ((NullableVarCharVector) vv).getMutator().setSafe(index, bytes, start, end - start);
       }
       case TIMESTAMP:
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 79abe60..4b73600 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -235,13 +235,8 @@ public class MongoRecordReader extends AbstractRecordReader {
       for (; rowCount < TARGET_RECORD_COUNT && cursor.hasNext(); rowCount++) {
         String doc = cursor.next().toString();
         byte[] record = doc.getBytes(Charsets.UTF_8);
-        if (!valueVector.getMutator().setSafe(rowCount, record, 0,
-            record.length)) {
-          logger.warn(errMsg, doc);
-          if (rowCount == 0) {
-            break;
-          }
-        }
+        valueVector.getMutator().setSafe(rowCount, record, 0,
+            record.length);
       }
       valueVector.getMutator().setValueCount(rowCount);
       logger.debug("Took {} ms to get {} records",

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexReaders.java b/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
index 027f61d..9d05934 100644
--- a/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
+++ b/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
@@ -77,14 +77,14 @@ public class ${nullMode}${name}ReaderImpl extends AbstractFieldReader {
   public void copyAsValue(${minor.class?cap_first}Writer writer){
     if (writer.ok()) {
       Repeated${minor.class?cap_first}WriterImpl impl = (Repeated${minor.class?cap_first}WriterImpl) writer;
-      impl.inform(impl.vector.copyFromSafe(idx(), impl.idx(), vector));
+      impl.vector.copyFromSafe(idx(), impl.idx(), vector);
     }
   }
   
   public void copyAsField(String name, MapWriter writer){
     if (writer.ok()) {
       Repeated${minor.class?cap_first}WriterImpl impl = (Repeated${minor.class?cap_first}WriterImpl)  writer.list(name).${lowerName}();
-      impl.inform(impl.vector.copyFromSafe(idx(), impl.idx(), vector));
+      impl.vector.copyFromSafe(idx(), impl.idx(), vector);
     }
   }
   
@@ -113,14 +113,14 @@ public class ${nullMode}${name}ReaderImpl extends AbstractFieldReader {
   public void copyAsValue(${minor.class?cap_first}Writer writer){
     if (writer.ok()) {
       ${nullMode}${minor.class?cap_first}WriterImpl impl = (${nullMode}${minor.class?cap_first}WriterImpl) writer;
-      impl.inform(impl.vector.copyFromSafe(idx(), impl.idx(), vector));
+      impl.vector.copyFromSafe(idx(), impl.idx(), vector);
     }
   }
   
   public void copyAsField(String name, MapWriter writer){
     if (writer.ok()) {
       ${nullMode}${minor.class?cap_first}WriterImpl impl = (${nullMode}${minor.class?cap_first}WriterImpl) writer.${lowerName}(name);
-      impl.inform(impl.vector.copyFromSafe(idx(), impl.idx(), vector));
+      impl.vector.copyFromSafe(idx(), impl.idx(), vector);
     }
   }
   

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
index 5ba1c64..576fd83 100644
--- a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
@@ -86,7 +86,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
   public void write(${minor.class?cap_first}Holder h){
     if(ok()){
       // update to inform(addSafe) once available for all repeated vector types for holders.
-      inform(mutator.addSafe(idx(), h));
+      mutator.addSafe(idx(), h);
       vector.setCurrentValueCount(idx());
     }
   }
@@ -94,7 +94,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
   public void write(Nullable${minor.class?cap_first}Holder h){
     if(ok()){
       // update to inform(addSafe) once available for all repeated vector types for holders.
-      inform(mutator.addSafe(idx(), h));
+      mutator.addSafe(idx(), h);
       vector.setCurrentValueCount(idx());
     }
   }
@@ -103,7 +103,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
   public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>){
     if(ok()){
       // update to inform(setSafe) once available for all vector types for holders.
-      inform(mutator.addSafe(idx(), <#list fields as field>${field.name}<#if field_has_next>, </#if></#list>));
+      mutator.addSafe(idx(), <#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
       vector.setCurrentValueCount(idx());
     }
   }
@@ -112,7 +112,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
   public void setPosition(int idx){
     if (ok()){
       super.setPosition(idx);
-      inform(mutator.startNewGroup(idx));
+      mutator.startNewGroup(idx);
     }
   }
   
@@ -121,16 +121,14 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
   
   public void write(${minor.class}Holder h){
     if(ok()){
-      // update to inform(setSafe) once available for all vector types for holders.
-      inform(mutator.setSafe(idx(), h));
+      mutator.setSafe(idx(), h);
       vector.setCurrentValueCount(idx());
     }
   }
   
   public void write(Nullable${minor.class}Holder h){
     if(ok()){
-      // update to inform(setSafe) once available for all vector types for holders.
-      inform(mutator.setSafe(idx(), h));
+      mutator.setSafe(idx(), h);
       vector.setCurrentValueCount(idx());
     }
   }
@@ -138,8 +136,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
   <#if !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")>
   public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>){
     if(ok()){
-      // update to inform(setSafe) once available for all vector types for holders.
-      inform(mutator.setSafe(idx(), <#if mode == "Nullable">1, </#if><#list fields as field>${field.name}<#if field_has_next>, </#if></#list>));
+      mutator.setSafe(idx(), <#if mode == "Nullable">1, </#if><#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
       vector.setCurrentValueCount(idx());
     }
   }
@@ -147,7 +144,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
   <#if mode == "Nullable">
   public void writeNull(){
     if(ok()){
-      inform(mutator.setNull(idx()));
+      mutator.setNull(idx());
       vector.setCurrentValueCount(idx());
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index 58e6ccc..1663534 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -96,6 +96,19 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     this.allocationValueCount = valueCount;
   }
 
+/**
+ * Allocate new buffer with double capacity, and copy data into the new buffer. Replace vector's buffer with new buffer, and release old one
+ */
+  public void reAlloc() {
+    logger.info("Realloc vector {}. [{}] -> [{}]", field, allocationValueCount * ${type.width}, allocationValueCount * 2 * ${type.width});
+    allocationValueCount *= 2;
+    DrillBuf newBuf = allocator.buffer(allocationValueCount * ${type.width});
+    newBuf.setBytes(0, data, 0, data.capacity());
+    newBuf.setZero(newBuf.capacity() / 2, newBuf.capacity() / 2);
+    data.release();
+    data = newBuf;
+  }
+
   /**
    * {@inheritDoc}
    */
@@ -187,12 +200,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     }
     
     @Override
-    public boolean copyValueSafe(int fromIndex, int toIndex) {
-      return to.copyFromSafe(fromIndex, toIndex, ${minor.class}Vector.this);
+    public void copyValueSafe(int fromIndex, int toIndex) {
+      to.copyFromSafe(fromIndex, toIndex, ${minor.class}Vector.this);
     }
   }
   
-  protected void copyFrom(int fromIndex, int thisIndex, ${minor.class}Vector from){
+  public void copyFrom(int fromIndex, int thisIndex, ${minor.class}Vector from){
     <#if (type.width > 8)>
     from.data.getBytes(fromIndex * ${type.width}, data, thisIndex * ${type.width}, ${type.width});
     <#else> <#-- type.width <= 8 -->
@@ -202,13 +215,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     </#if> <#-- type.width -->
   }
   
-  public boolean copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){
+  public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){
     if(thisIndex >= getValueCapacity()) {
-      decrementAllocationMonitor();
-      return false;
+        reAlloc();
     }
     copyFrom(fromIndex, thisIndex, from);
-    return true;
   }
 
   private void decrementAllocationMonitor() {
@@ -551,13 +562,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
      data.setBytes(index * ${type.width}, value, 0, ${type.width});
    }
 
-   public boolean setSafe(int index, <#if (type.width > 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
+   public void setSafe(int index, <#if (type.width > 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
      if(index >= getValueCapacity()) {
-       decrementAllocationMonitor();
-       return false;
+       reAlloc();
      }
      data.setBytes(index * ${type.width}, value, 0, ${type.width});
-     return true;
    }
 
    <#if (minor.class == "TimeStampTZ")>
@@ -566,29 +575,27 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
      data.setInt(((index * ${type.width}) + ${minor.millisecondsSize}), tzindex);
    }
    
-   protected void set(int index, ${minor.class}Holder holder){
+   public void set(int index, ${minor.class}Holder holder){
      set(index, holder.value, holder.index);
    }
 
-   protected void set(int index, Nullable${minor.class}Holder holder){
+   public void set(int index, Nullable${minor.class}Holder holder){
      set(index, holder.value, holder.index);
    }
 
-   public boolean setSafe(int index, long timestamp, int tzindex){
+   public void setSafe(int index, long timestamp, int tzindex){
      if(index >= getValueCapacity()) {
-       decrementAllocationMonitor();
-       return false;
+       reAlloc();
      }
      set(index, timestamp, tzindex);
-     return true;
    }
    
-   public boolean setSafe(int index, ${minor.class}Holder holder){
-     return setSafe(index, holder.value, holder.index);
+   public void setSafe(int index, ${minor.class}Holder holder){
+     setSafe(index, holder.value, holder.index);
    }
 
-   public boolean setSafe(int index, Nullable${minor.class}Holder holder){
-     return setSafe(index, holder.value, holder.index);
+   public void setSafe(int index, Nullable${minor.class}Holder holder){
+     setSafe(index, holder.value, holder.index);
    }
    
    <#elseif (minor.class == "Interval")>
@@ -607,21 +614,19 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
      set(index, holder.months, holder.days, holder.milliseconds);
    }
 
-   public boolean setSafe(int index, int months, int days, int milliseconds){
+   public void setSafe(int index, int months, int days, int milliseconds){
      if(index >= getValueCapacity()) {
-       decrementAllocationMonitor();
-       return false;
+       reAlloc();
      }
      set(index, months, days, milliseconds);
-     return true;
    }
 
-   public boolean setSafe(int index, Nullable${minor.class}Holder holder){
-     return setSafe(index, holder.months, holder.days, holder.milliseconds);
+   public void setSafe(int index, Nullable${minor.class}Holder holder){
+     setSafe(index, holder.months, holder.days, holder.milliseconds);
    }
    
-   public boolean setSafe(int index, ${minor.class}Holder holder){
-     return setSafe(index, holder.months, holder.days, holder.milliseconds);
+   public void setSafe(int index, ${minor.class}Holder holder){
+     setSafe(index, holder.months, holder.days, holder.milliseconds);
    }
    
    <#elseif (minor.class == "IntervalDay")>
@@ -638,21 +643,19 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
      set(index, holder.days, holder.milliseconds);
    }
 
-   public boolean setSafe(int index, int days, int milliseconds){
+   public void setSafe(int index, int days, int milliseconds){
      if(index >= getValueCapacity()) {
-       decrementAllocationMonitor();
-       return false;
+       reAlloc();
      }
      set(index, days, milliseconds);
-     return true;
    }
    
-   public boolean setSafe(int index, ${minor.class}Holder holder){
-     return setSafe(index, holder.days, holder.milliseconds);
+   public void setSafe(int index, ${minor.class}Holder holder){
+     setSafe(index, holder.days, holder.milliseconds);
    }
 
-   public boolean setSafe(int index, Nullable${minor.class}Holder holder){
-     return setSafe(index, holder.days, holder.milliseconds);
+   public void setSafe(int index, Nullable${minor.class}Holder holder){
+     setSafe(index, holder.days, holder.milliseconds);
    }
 
    <#elseif (minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse") || (minor.class == "Decimal28Dense") || (minor.class == "Decimal38Dense")>
@@ -665,20 +668,18 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
      set(index, holder.start, holder.buffer);
    }
    
-   public boolean setSafe(int index,  Nullable${minor.class}Holder holder){
-     return setSafe(index, holder.start, holder.buffer);
+   public void setSafe(int index,  Nullable${minor.class}Holder holder){
+     setSafe(index, holder.start, holder.buffer);
    }
-   public boolean setSafe(int index,  ${minor.class}Holder holder){
-     return setSafe(index, holder.start, holder.buffer);
+   public void setSafe(int index,  ${minor.class}Holder holder){
+     setSafe(index, holder.start, holder.buffer);
    }
    
-   public boolean setSafe(int index, int start, DrillBuf buffer){
+   public void setSafe(int index, int start, DrillBuf buffer){
      if(index >= getValueCapacity()) {
-       decrementAllocationMonitor();
-       return false;
+       reAlloc();
      }
      set(index, start, buffer);
-     return true;
    }
    
    public void set(int index, int start, DrillBuf buffer){
@@ -699,20 +700,18 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
      data.setBytes(index * ${type.width}, buffer, start, ${type.width});
    }
    
-   public boolean setSafe(int index, ${minor.class}Holder holder){
-     return setSafe(index, holder.start, holder.buffer);
+   public void setSafe(int index, ${minor.class}Holder holder){
+     setSafe(index, holder.start, holder.buffer);
    }
-   public boolean setSafe(int index, Nullable${minor.class}Holder holder){
-     return setSafe(index, holder.start, holder.buffer);
+   public void setSafe(int index, Nullable${minor.class}Holder holder){
+     setSafe(index, holder.start, holder.buffer);
    }
    
-   public boolean setSafe(int index, int start, DrillBuf buffer){     
+   public void setSafe(int index, int start, DrillBuf buffer){
      if(index >= getValueCapacity()) {
-       decrementAllocationMonitor();
-       return false;
+       reAlloc();
      }
      set(index, holder);
-     return true;
    }
 
    public void set(int index, Nullable${minor.class}Holder holder){
@@ -740,39 +739,33 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
      data.set${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}, value);
    }
 
-   public boolean setSafe(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
+   public void setSafe(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
      if(index >= getValueCapacity()) {
-       decrementAllocationMonitor();
-       return false;
+       reAlloc();
      }
      set(index, value);
-     return true;
    }
 
    protected void set(int index, ${minor.class}Holder holder){
      data.set${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}, holder.value);
    }
 
-   public boolean setSafe(int index, ${minor.class}Holder holder){
+   public void setSafe(int index, ${minor.class}Holder holder){
      if(index >= getValueCapacity()) {
-       decrementAllocationMonitor();
-       return false;
+       reAlloc();
      }
      set(index, holder);
-     return true;
    }
 
    protected void set(int index, Nullable${minor.class}Holder holder){
      data.set${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}, holder.value);
    }
 
-   public boolean setSafe(int index, Nullable${minor.class}Holder holder){
+   public void setSafe(int index, Nullable${minor.class}Holder holder){
      if(index >= getValueCapacity()) {
-       decrementAllocationMonitor();
-       return false;
+       reAlloc();
      }
      set(index, holder);
-     return true;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/codegen/templates/MapWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/MapWriters.java b/exec/java-exec/src/main/codegen/templates/MapWriters.java
index b8bd73e..b8d5365 100644
--- a/exec/java-exec/src/main/codegen/templates/MapWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/MapWriters.java
@@ -63,7 +63,11 @@ public class ${mode}MapWriter extends AbstractFieldWriter{
   }
 
   public void checkValueCapacity(){
-    inform(container.getValueCapacity() > idx());
+    <#if mode == "Repeated">
+    if (container.getValueCapacity() <= idx()) {
+      container.reAlloc();
+    }
+    </#if>
   }
 
   public MapWriter map(String name){
@@ -146,10 +150,6 @@ public class ${mode}MapWriter extends AbstractFieldWriter{
     }
   }
   public void start(){
-    // check capacity only after we have a non empty container
-    if(container.size() > 0 && ok()) {
-      checkValueCapacity();
-    }
   }
   
   public void end(){

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index b222024..ba7c629 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -54,7 +54,7 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
 
   public ${className}(MaterializedField field, BufferAllocator allocator) {
     super(field, allocator);
-    this.bits = new UInt1Vector(null, allocator);
+    this.bits = new UInt1Vector(MaterializedField.create(field + "_bits", Types.required(MinorType.UINT1)), allocator);
     this.values = new ${minor.class}Vector(field, allocator);
     this.accessor = new Accessor();
     this.mutator = new Mutator();
@@ -288,8 +288,8 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
     }
 
     @Override
-    public boolean copyValueSafe(int fromIndex, int toIndex) {
-      return to.copyFromSafe(fromIndex, toIndex, Nullable${minor.class}Vector.this);
+    public void copyValueSafe(int fromIndex, int toIndex) {
+      to.copyFromSafe(fromIndex, toIndex, Nullable${minor.class}Vector.this);
     }
   }
   
@@ -311,29 +311,27 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
   }
 
   
-  protected void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
+  public void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
     if (!from.getAccessor().isNull(fromIndex)) {
       mutator.set(thisIndex, from.getAccessor().get(fromIndex));
     }
   }
 
   
-  public boolean copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){
+  public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){
     <#if type.major == "VarLen">
-    if(!mutator.fillEmpties(thisIndex)) return false;
+    mutator.fillEmpties(thisIndex);
     </#if>
-    boolean success = values.copyFromSafe(fromIndex, thisIndex, from);
-    success = success && bits.getMutator().setSafe(thisIndex, 1);
-    return success;    
+    values.copyFromSafe(fromIndex, thisIndex, from);
+    bits.getMutator().setSafe(thisIndex, 1);
   }
   
-  public boolean copyFromSafe(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
+  public void copyFromSafe(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
     <#if type.major == "VarLen">
-    if(!mutator.fillEmpties(thisIndex)) return false;
+    mutator.fillEmpties(thisIndex);
     </#if>
-    boolean b1 = bits.copyFromSafe(fromIndex, thisIndex, from.bits);
-    boolean b2 = values.copyFromSafe(fromIndex, thisIndex, from.values);
-    return b1 && b2;
+    bits.copyFromSafe(fromIndex, thisIndex, from.bits);
+    values.copyFromSafe(fromIndex, thisIndex, from.values);
   }
 
   public long getDataAddr(){
@@ -471,40 +469,36 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
     }
 
     <#if type.major == "VarLen">
-    private boolean fillEmpties(int index){
+    private void fillEmpties(int index){
       for (int i = lastSet + 1; i < index; i++) {
-        if(!values.getMutator().setSafe(i, new byte[]{})) return false;
+        values.getMutator().setSafe(i, new byte[]{});
+      }
+      if (index > bits.getValueCapacity()) {
+        bits.reAlloc();
       }
       lastSet = index;
-
-      return true;
     }
 
-    public boolean setValueLengthSafe(int index, int length) {
-      return values.getMutator().setValueLengthSafe(index, length);
+    public void setValueLengthSafe(int index, int length) {
+      values.getMutator().setValueLengthSafe(index, length);
     }
     </#if>
     
-    public boolean setSafe(int index, byte[] value, int start, int length) {
+    public void setSafe(int index, byte[] value, int start, int length) {
       <#if type.major != "VarLen">
       throw new UnsupportedOperationException();
       <#else>
-      if(!fillEmpties(index)) return false;
+      fillEmpties(index);
 
-      boolean b1 = bits.getMutator().setSafe(index, 1);
-      boolean b2 = values.getMutator().setSafe(index, value, start, length);
-      if(b1 && b2){
-        setCount++;
-        <#if type.major == "VarLen">lastSet = index;</#if>
-        return true;
-      }else{
-        return false;
-      }
+      bits.getMutator().setSafe(index, 1);
+      values.getMutator().setSafe(index, value, start, length);
+      setCount++;
+      <#if type.major == "VarLen">lastSet = index;</#if>
       </#if>
     }
 
-    public boolean setNull(int index){
-      return bits.getMutator().setSafe(index, 0);
+    public void setNull(int index){
+      bits.getMutator().setSafe(index, 0);
     }
     
     public void setSkipNull(int index, ${minor.class}Holder holder){
@@ -542,8 +536,6 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
       return outIndex < Nullable${minor.class}Vector.this.getValueCapacity();
     }
 
-    //public boolean setSafe(int index, int isSet<#if type.major == "VarLen" || minor.class == "TimeStampTZ" || minor.class == "Interval" || minor.class == "IntervalDay">Nullable${minor.class}Holder <#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value){
-
     <#assign fields = minor.fields!type.fields />
     public void set(int index, int isSet<#list fields as field><#if field.include!true >, ${field.type} ${field.name}Field</#if></#list> ){
       <#if type.major == "VarLen">
@@ -556,71 +548,48 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
     
-    public boolean setSafe(int index, int isSet<#list fields as field><#if field.include!true >, ${field.type} ${field.name}Field</#if></#list> ) {
+    public void setSafe(int index, int isSet<#list fields as field><#if field.include!true >, ${field.type} ${field.name}Field</#if></#list> ) {
       <#if type.major == "VarLen">
-      if(!fillEmpties(index)) return false;
+      fillEmpties(index);
       </#if>
       
-      boolean b1 = bits.getMutator().setSafe(index, isSet);
-      boolean b2 = values.getMutator().setSafe(index<#list fields as field><#if field.include!true >, ${field.name}Field</#if></#list>);
-      if(b1 && b2){
-        setCount++;
-        <#if type.major == "VarLen">lastSet = index;</#if>
-        return true;
-      }else{
-        return false;
-      }
-
+      bits.getMutator().setSafe(index, isSet);
+      values.getMutator().setSafe(index<#list fields as field><#if field.include!true >, ${field.name}Field</#if></#list>);
+      setCount++;
+      <#if type.major == "VarLen">lastSet = index;</#if>
     }
 
 
-    public boolean setSafe(int index, Nullable${minor.class}Holder value) {
+    public void setSafe(int index, Nullable${minor.class}Holder value) {
 
       <#if type.major == "VarLen">
-      if(!fillEmpties(index)) return false;
+      fillEmpties(index);
       </#if>
-      boolean b1 = bits.getMutator().setSafe(index, value.isSet);
-      boolean b2 = values.getMutator().setSafe(index, value);
-      if(b1 && b2){
-        setCount++;
-        <#if type.major == "VarLen">lastSet = index;</#if>
-        return true;
-      }else{
-        return false;
-      }
-
+      bits.getMutator().setSafe(index, value.isSet);
+      values.getMutator().setSafe(index, value);
+      setCount++;
+      <#if type.major == "VarLen">lastSet = index;</#if>
     }
 
-    public boolean setSafe(int index, ${minor.class}Holder value) {
+    public void setSafe(int index, ${minor.class}Holder value) {
 
       <#if type.major == "VarLen">
-      if(!fillEmpties(index)) return false;
+      fillEmpties(index);
       </#if>
-      boolean b1 = bits.getMutator().setSafe(index, 1);
-      boolean b2 = values.getMutator().setSafe(index, value);
-      if(b1 && b2){
-        setCount++;
-        <#if type.major == "VarLen">lastSet = index;</#if>
-        return true;
-      }else{
-        return false;
-      }
-
+      bits.getMutator().setSafe(index, 1);
+      values.getMutator().setSafe(index, value);
+      setCount++;
+      <#if type.major == "VarLen">lastSet = index;</#if>
     }
     
     <#if !(type.major == "VarLen" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense" || minor.class == "TimeStampTZ" || minor.class == "Interval" || minor.class == "IntervalDay")>
-      public boolean setSafe(int index, ${minor.javaType!type.javaType} value) {
+      public void setSafe(int index, ${minor.javaType!type.javaType} value) {
         <#if type.major == "VarLen">
-        if(!fillEmpties(index)) return false;
+        fillEmpties(index);
         </#if>
-        boolean b1 = bits.getMutator().setSafe(index, 1);
-        boolean b2 = values.getMutator().setSafe(index, value);
-        if(b1 && b2){
-          setCount++;
-          return true;
-        }else{
-          return false;
-        }
+        bits.getMutator().setSafe(index, 1);
+        values.getMutator().setSafe(index, value);
+        setCount++;
       }
 
     </#if>

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index d261050..572181e 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -152,8 +152,8 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
     }
     
     @Override
-    public boolean copyValueSafe(int fromIndex, int toIndex) {
-      return to.copyFromSafe(fromIndex, toIndex, Repeated${minor.class}Vector.this);
+    public void copyValueSafe(int fromIndex, int toIndex) {
+      to.copyFromSafe(fromIndex, toIndex, Repeated${minor.class}Vector.this);
     }
   }
 
@@ -165,15 +165,12 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
       }
     }
 
-    public boolean copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector v){
+    public void copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector v){
       int count = v.getAccessor().getCount(inIndex);
-      if(!getMutator().startNewGroup(outIndex)) return false;
+      getMutator().startNewGroup(outIndex);
       for (int i = 0; i < count; i++) {
-        if (!getMutator().addSafe(outIndex, v.getAccessor().get(inIndex, i))) {
-          return false;
-        }
+        getMutator().addSafe(outIndex, v.getAccessor().get(inIndex, i));
       }
-      return true;
     }
 
   public boolean allocateNewSafe(){
@@ -403,8 +400,8 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
     private Mutator(){
     }
 
-    public boolean setRepetitionAtIndexSafe(int index, int repetitionCount) {
-      return offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index) + repetitionCount);
+    public void setRepetitionAtIndexSafe(int index, int repetitionCount) {
+      offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index) + repetitionCount);
     }
 
     public BaseDataValueVector getDataVector() {
@@ -418,11 +415,8 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
       offsets.getMutator().setValueCount(parentValueCount == 0 ? 0 : parentValueCount + 1);
     }
 
-    public boolean startNewGroup(int index) {
-      if(getValueCapacity() <= index){
-        return false;
-      }
-      return offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
+    public void startNewGroup(int index) {
+      offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
     }
 
     /**
@@ -439,66 +433,53 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
     }
 
     <#if type.major == "VarLen">
-    public boolean addSafe(int index, byte[] bytes) {
-      return addSafe(index, bytes, 0, bytes.length);
+    public void addSafe(int index, byte[] bytes) {
+      addSafe(index, bytes, 0, bytes.length);
     }
 
-    public boolean addSafe(int index, byte[] bytes, int start, int length) {
-      if(offsets.getValueCapacity() <= index+1) {
-        return false;
-      }
+    public void addSafe(int index, byte[] bytes, int start, int length) {
       int nextOffset = offsets.getAccessor().get(index+1);
-      boolean b1 = values.getMutator().setSafe(nextOffset, bytes, start, length);
-      boolean b2 = offsets.getMutator().setSafe(index+1, nextOffset+1);
-      return (b1 && b2);
+      values.getMutator().setSafe(nextOffset, bytes, start, length);
+      offsets.getMutator().setSafe(index+1, nextOffset+1);
     }
 
     <#else>
 
-    public boolean addSafe(int index, ${minor.javaType!type.javaType} srcValue) {
-      if(offsets.getValueCapacity() <= index+1) return false;
+    public void addSafe(int index, ${minor.javaType!type.javaType} srcValue) {
       int nextOffset = offsets.getAccessor().get(index+1);
-      boolean b1 = values.getMutator().setSafe(nextOffset, srcValue);
-      boolean b2 = offsets.getMutator().setSafe(index+1, nextOffset+1);
-      return (b1 && b2);
+      values.getMutator().setSafe(nextOffset, srcValue);
+      offsets.getMutator().setSafe(index+1, nextOffset+1);
     }
         
     </#if>
 
     
-    public boolean setSafe(int index, Repeated${minor.class}Holder h){
+    public void setSafe(int index, Repeated${minor.class}Holder h){
       ${minor.class}Holder ih = new ${minor.class}Holder();
       getMutator().startNewGroup(index);
       for(int i = h.start; i < h.end; i++){
         h.vector.getAccessor().get(i, ih);
-        if(!getMutator().addSafe(index, ih) ) return false;
+        getMutator().addSafe(index, ih);
       }
-      return true;
     }
     
-    public boolean addSafe(int index, ${minor.class}Holder holder){
-      if(offsets.getValueCapacity() <= index+1) return false;
+    public void addSafe(int index, ${minor.class}Holder holder){
       int nextOffset = offsets.getAccessor().get(index+1);
-      boolean b1 = values.getMutator().setSafe(nextOffset, holder);
-      boolean b2 = offsets.getMutator().setSafe(index+1, nextOffset+1);
-      return (b1 && b2);
+      values.getMutator().setSafe(nextOffset, holder);
+      offsets.getMutator().setSafe(index+1, nextOffset+1);
     }
     
-    public boolean addSafe(int index, Nullable${minor.class}Holder holder){
-      if(offsets.getValueCapacity() <= index+1) return false;
+    public void addSafe(int index, Nullable${minor.class}Holder holder){
       int nextOffset = offsets.getAccessor().get(index+1);
-      boolean b1 = values.getMutator().setSafe(nextOffset, holder);
-      boolean b2 = offsets.getMutator().setSafe(index+1, nextOffset+1);
-      return (b1 && b2);
+      values.getMutator().setSafe(nextOffset, holder);
+      offsets.getMutator().setSafe(index+1, nextOffset+1);
     }
     
     <#if (fields?size > 1) && !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")>
-    public boolean addSafe(int arrayIndex, <#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>){
-      if(offsets.getValueCapacity() <= arrayIndex+1) return false;
+    public void addSafe(int arrayIndex, <#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>){
       int nextOffset = offsets.getAccessor().get(arrayIndex+1);
-      boolean b1 = values.getMutator().setSafe(nextOffset, <#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
-      boolean b2 = offsets.getMutator().setSafe(arrayIndex+1, nextOffset+1);
-      return (b1 && b2);
+      values.getMutator().setSafe(nextOffset, <#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
+      offsets.getMutator().setSafe(arrayIndex+1, nextOffset+1);
     }
     </#if>
     

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/codegen/templates/TypeHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
index c83c301..ba5372f 100644
--- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
@@ -378,7 +378,7 @@ public class TypeHelper {
     }
   }
 
-  public static boolean setValueSafe(ValueVector vector, int index, ValueHolder holder) {
+  public static void setValueSafe(ValueVector vector, int index, ValueHolder holder) {
     MajorType type = vector.getField().getType();
 
     switch(type.getMinorType()) {
@@ -387,23 +387,20 @@ public class TypeHelper {
       case ${minor.class?upper_case} :
       switch (type.getMode()) {
         case REQUIRED:
-          return ((${minor.class}Vector) vector).getMutator().setSafe(index, (${minor.class}Holder) holder);
+          ((${minor.class}Vector) vector).getMutator().setSafe(index, (${minor.class}Holder) holder);
+          return;
         case OPTIONAL:
           if (((Nullable${minor.class}Holder) holder).isSet == 1) {
-            if (! ((Nullable${minor.class}Vector) vector).getMutator().setSafe(index, (Nullable${minor.class}Holder) holder) ) {
-              return false;
-            }
+            ((Nullable${minor.class}Vector) vector).getMutator().setSafe(index, (Nullable${minor.class}Holder) holder);
           } else {
-            if (!((Nullable${minor.class}Vector) vector).getMutator().isSafe(index)) {
-              return false;
-            }
+            ((Nullable${minor.class}Vector) vector).getMutator().isSafe(index);
           }
-          return true;
+          return;
       }
       </#list>
       </#list>
       case GENERIC_OBJECT:
-        return ((ObjectVector) vector).getMutator().setSafe(index, (ObjectHolder) holder);
+        ((ObjectVector) vector).getMutator().setSafe(index, (ObjectHolder) holder);
       default:
         throw new UnsupportedOperationException(type.getMinorType() + " type is not supported.");
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index b8ffe5d..9c6454e 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -200,17 +200,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(thisIndex * ${type.width});
     
     if(data.capacity() < outputStart + len) {
-        decrementAllocationMonitor();
-        return false;
+        reAlloc();
     }
 
-    if (!offsetVector.getMutator().setSafe(thisIndex + 1, outputStart + len)) {
-       decrementAllocationMonitor();
-       return false;
-    }
+    offsetVector.getMutator().setSafe(thisIndex + 1, outputStart + len);
 
     from.data.getBytes(start, data, outputStart, len);
-    offsetVector.data.set${(minor.javaType!type.javaType)?cap_first}( (thisIndex+1) * ${type.width}, outputStart + len);
+    offsetVector.getMutator().setSafe( (thisIndex+1) * ${type.width}, outputStart + len);
 
     return true;
   }
@@ -240,8 +236,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     }
     
     @Override
-    public boolean copyValueSafe(int fromIndex, int toIndex) {
-      return to.copyFromSafe(fromIndex, toIndex, ${minor.class}Vector.this);
+    public void copyValueSafe(int fromIndex, int toIndex) {
+      to.copyFromSafe(fromIndex, toIndex, ${minor.class}Vector.this);
     }
   }
 
@@ -285,6 +281,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     offsetVector.zeroVector();
   }
 
+    public void reAlloc() {
+      allocationTotalByteCount *= 2;
+      DrillBuf newBuf = allocator.buffer(allocationTotalByteCount);
+      newBuf.setBytes(0, data, 0, data.capacity());
+      data.release();
+      data = newBuf;
+    }
+
     private void decrementAllocationMonitor() {
       if (allocationMonitor > 0) {
         allocationMonitor = 0;
@@ -403,20 +407,16 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       data.setBytes(currentOffset, bytes, 0, bytes.length);
     }
 
-    public boolean setSafe(int index, byte[] bytes) {
+    public void setSafe(int index, byte[] bytes) {
       assert index >= 0;
 
       int currentOffset = offsetVector.getAccessor().get(index);
       if (data.capacity() < currentOffset + bytes.length) {
-        decrementAllocationMonitor();
-        return false;
-      }
-      if (!offsetVector.getMutator().setSafe(index + 1, currentOffset + bytes.length)) {
-        return false;
+        reAlloc();
       }
+      offsetVector.getMutator().setSafe(index + 1, currentOffset + bytes.length);
       offsetVector.getMutator().set(index + 1, currentOffset + bytes.length);
       data.setBytes(currentOffset, bytes, 0, bytes.length);
-      return true;
     }
 
     /**
@@ -434,52 +434,42 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       data.setBytes(currentOffset, bytes, start, length);
     }
 
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+    public void setSafe(int index, byte[] bytes, int start, int length) {
       assert index >= 0;
 
       int currentOffset = offsetVector.getAccessor().get(index);
 
       if (data.capacity() < currentOffset + length) {
-        decrementAllocationMonitor();
-        return false;
-      }
-      if (!offsetVector.getMutator().setSafe(index + 1, currentOffset + length)) {
-        return false;
+        reAlloc();
       }
+      offsetVector.getMutator().setSafe(index + 1, currentOffset + length);
       data.setBytes(currentOffset, bytes, start, length);
-      return true;
     }
 
-    public boolean setValueLengthSafe(int index, int length) {
+    public void setValueLengthSafe(int index, int length) {
       int offset = offsetVector.getAccessor().get(index);
       if(data.capacity() < offset + length ) {
-        decrementAllocationMonitor();
-        return false;
+        reAlloc();
       }
-      return offsetVector.getMutator().setSafe(index + 1, offsetVector.getAccessor().get(index) + length);
+      offsetVector.getMutator().setSafe(index + 1, offsetVector.getAccessor().get(index) + length);
     }
 
 
-    public boolean setSafe(int index, int start, int end, DrillBuf buffer){
+    public void setSafe(int index, int start, int end, DrillBuf buffer){
       int len = end - start;
       
       int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
       
       if(data.capacity() < outputStart + len) {
-        decrementAllocationMonitor();
-        return false;
+        reAlloc();
       }
       
-      if (!offsetVector.getMutator().setSafe( index+1,  outputStart + len)) {
-        return false;
-      }
+      offsetVector.getMutator().setSafe( index+1,  outputStart + len);
       buffer.getBytes(start, data, outputStart, len);
-
-      return true;
     }
     
     
-    public boolean setSafe(int index, Nullable${minor.class}Holder holder){
+    public void setSafe(int index, Nullable${minor.class}Holder holder){
       assert holder.isSet == 1;
 
       int start = holder.start;
@@ -489,21 +479,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
       
       if(data.capacity() < outputStart + len) {
-        decrementAllocationMonitor();
-        return false;
+        reAlloc();
       }
       
       holder.buffer.getBytes(start, data, outputStart, len);
-      if (!offsetVector.getMutator().setSafe( index+1,  outputStart + len)) {
-        return false;
-      }
-
-      // set(index, holder);
-
-      return true;
+      offsetVector.getMutator().setSafe( index+1,  outputStart + len);
     }
     
-    public boolean setSafe(int index, ${minor.class}Holder holder){
+    public void setSafe(int index, ${minor.class}Holder holder){
 
       int start = holder.start;
       int end =   holder.end;
@@ -512,18 +495,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
       
       if(data.capacity() < outputStart + len) {
-        decrementAllocationMonitor();
-        return false;
+        reAlloc();
       }
       
       holder.buffer.getBytes(start, data, outputStart, len);
-      if (!offsetVector.getMutator().setSafe( index+1,  outputStart + len)) {
-        return false;
-      }
-
-      // set(index, holder);
-
-      return true;
+      offsetVector.getMutator().setSafe( index+1,  outputStart + len);
     }
     
     protected void set(int index, int start, int length, DrillBuf buffer){

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 5cf4a35..a5758fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -315,29 +315,11 @@ public class EvaluationVisitor {
       } else {
 
         final JInvocation setMeth = GetSetVectorHelper.write(e.getChild().getMajorType(), vv, inputContainer, outIndex, e.isSafe() ? "setSafe" : "set");
-        final String isSafeMethod = "isSafe";
-
-        if (e.isSafe()) {
-          HoldingContainer outputContainer = generator.declare(Types.REQUIRED_BIT);
-          block.assign(outputContainer.getValue(), JExpr.lit(1));
-          if (inputContainer.isOptional()) {
-            // block._if(vv.invoke("getMutator").invoke(setMethod).arg(outIndex).not())._then().assign(outputContainer.getValue(),
-            // JExpr.lit(0));
-            JConditional jc = block._if(inputContainer.getIsSet().eq(JExpr.lit(0)).not());
-            block = jc._then();
-            jc._else()._if(vv.invoke("getMutator").invoke(isSafeMethod).arg(outIndex).not())._then()
-                .assign(outputContainer.getValue(), JExpr.lit(0));
-          }
-          block._if(setMeth.not())._then().assign(outputContainer.getValue(), JExpr.lit(0));
-          return outputContainer;
-        } else {
           if (inputContainer.isOptional()) {
-            // block.add(vv.invoke("getMutator").invoke(setMethod).arg(outIndex));
             JConditional jc = block._if(inputContainer.getIsSet().eq(JExpr.lit(0)).not());
             block = jc._then();
           }
           block.add(setMeth);
-        }
 
       }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java
index 6e0b282..add3734 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillAggFuncHolder.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.ClassGenerator.BlockType;
@@ -229,7 +230,6 @@ class DrillAggFuncHolder extends DrillFuncHolder{
 
     Preconditions.checkNotNull(body);
     sub.directStatement(body);
-    JVar successVar = sub.decl(JType.parse(g.getModel(), "boolean"), "success", JExpr.lit(false));
 
     // reassign workspace variables back.
     for(int i =0; i < workspaceJVars.length; i++){
@@ -241,17 +241,20 @@ class DrillAggFuncHolder extends DrillFuncHolder{
       }
       //Change workspaceVar through workspace vector.
       JInvocation setMeth;
-      if (Types.usesHolderForGet(workspaceVars[i].majorType)) {
-        setMeth = g.getWorkspaceVectors().get(workspaceVars[i]).invoke("getMutator").invoke("setSafe").arg(wsIndexVariable).arg(workspaceJVars[i]);
+      MajorType type = workspaceVars[i].majorType;
+      if (Types.usesHolderForGet(type)) {
+          setMeth = g.getWorkspaceVectors().get(workspaceVars[i]).invoke("getMutator").invoke("setSafe").arg(wsIndexVariable).arg(workspaceJVars[i]);
       }else{
-        setMeth = g.getWorkspaceVectors().get(workspaceVars[i]).invoke("getMutator").invoke("setSafe").arg(wsIndexVariable).arg(workspaceJVars[i].ref("value"));
+        if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) {
+          setMeth = g.getWorkspaceVectors().get(workspaceVars[i]).invoke("getMutator").invoke("setSafe").arg(wsIndexVariable).arg(workspaceJVars[i].ref("value"));
+        } else {
+          setMeth = g.getWorkspaceVectors().get(workspaceVars[i]).invoke("getMutator").invoke("set").arg(wsIndexVariable).arg(workspaceJVars[i].ref("value"));
+        }
       }
 
-      sub.assign(successVar, setMeth);
+      sub.add(setMeth);
 
       JClass drillRunTimeException = g.getModel().ref(DrillRuntimeException.class);
-
-      sub._if(successVar.eq(JExpr.lit(false)))._then()._throw(JExpr._new(drillRunTimeException).arg(JExpr.lit("setsafe() failed; cannot set holder value into the vector")));
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java
index df56174..9999c36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java
@@ -87,11 +87,11 @@ public class DrillComplexWriterFuncHolder extends DrillSimpleFuncHolder{
     addProtectedBlock(g, sub, body, inputVariables, workspaceJVars, false);
 
 
-    JConditional jc = g.getEvalBlock()._if(complexWriter.invoke("ok").not());
+//    JConditional jc = g.getEvalBlock()._if(complexWriter.invoke("ok").not());
 
-    jc._then().add(complexWriter.invoke("reset"));
+//    jc._then().add(complexWriter.invoke("reset"));
     //jc._then().directStatement("System.out.println(\"debug : write ok fail!, inIndex = \" + inIndex);");
-    jc._then()._return(JExpr.FALSE);
+//    jc._then()._return(JExpr.FALSE);
 
     //jc._else().directStatement("System.out.println(\"debug : write successful, inIndex = \" + inIndex);");
 

http://git-wip-us.apache.org/repos/asf/drill/blob/a22b4724/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 35faf22..0e2a017 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -17,8 +17,8 @@
  */
 package org.apache.drill.exec.physical.impl.aggregate;
 
-import com.sun.codemodel.JExpr;
-import com.sun.codemodel.JVar;
+import java.io.IOException;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -51,7 +51,8 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.ValueVector;
 
-import java.io.IOException;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JVar;
 
 public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatch.class);
@@ -76,8 +77,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
           "aggrValuesContainer" /* workspace container */, UPDATE_AGGR_INSIDE, UPDATE_AGGR_OUTSIDE, UPDATE_AGGR_INSIDE);
 
 
-  public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) throws
-      ExecutionSetupException {
+  public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException {
     super(popConfig, context);
     this.incoming = incoming;
   }
@@ -266,8 +266,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
         new HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(),
             HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */);
 
-    agg.setup(popConfig, htConfig, context, this.stats, oContext.getAllocator(), incoming, this, aggrExprs,
-        cgInner.getWorkspaceTypes(), groupByOutFieldIds, this.container);
+    agg.setup(popConfig, htConfig, context, this.stats,
+        oContext.getAllocator(), incoming, this,
+        aggrExprs,
+        cgInner.getWorkspaceTypes(),
+        groupByOutFieldIds,
+        this.container);
 
     return agg;
   }
@@ -277,10 +281,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
     for (LogicalExpression aggr : aggrExprs) {
       HoldingContainer hc = cg.addExpr(aggr);
-      cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
     }
-
-    cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
   }
 
   private void setupGetIndex(ClassGenerator<HashAggregator> cg) {
@@ -302,9 +303,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       return;
     }
 
-    default:
-      throw new IllegalStateException();
     }
+
   }
 
   @Override
@@ -320,4 +320,5 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
   protected void killIncoming(boolean sendUpstream) {
     incoming.kill(sendUpstream);
   }
+
 }