You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2019/11/20 11:40:15 UTC

[drill] 03/04: DRILL-7441: Fix issues with fillEmpties, offset vectors

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 5b9380e54d9487b2db21b52dc60a21317b127146
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Thu Nov 7 20:56:13 2019 -0800

    DRILL-7441: Fix issues with fillEmpties, offset vectors
    
    Fixes subtle issues with offset vectors and "fill empties"
    logic.
    
    Drill has an informal standard that if a batch has no rows, then
    offset vectors within that batch should have zero size. Contrast
    this with batches of size 1 that should have offset vectors of
    size 2. Changed to enforce this rule throughout.
    
    Nullable, repeated and variable-width vectors have "fill empties"
    logic that is used in two places: when setting the value count and
    when preparing to write a new value. The current logic is not
    quite right for either case. Added tests and fixed the code to
    properly handle each case.
    
    Revised the batch validator to enforce the offset-vector length of 0 for
    0-sized batches rule. The result was much simpler code.
    
    Added tools to easily print a batch, restoring some code that
    was recently lost when the RowSet classes were moved.
    
    Code cleanup in all files touched.
    
    Added logic to "dirty" allocated buffers when testing to ensure
    logic is not sensitive to the "pristine" state of new buffers.
    
    Added logic to the column writers to enforce the zero-size-batch rule
    for offset vectors. Added unit tests for this case.
    
    Fixed the column writers to set the "lastSet" mutator value for
    nullable types since other code relies on this value.
    
    Removed the "setCount" field in nullable vectors: turns out
    it is not actually used.
    
    closes #1896
---
 .../apache/drill/exec/hive/TestHiveStorage.java    |  12 +-
 .../physical/impl/validate/BatchValidator.java     | 174 +++---
 .../drill/exec/vector/complex/fn/JsonReader.java   |   1 +
 .../drill/exec/work/batch/BaseRawBatchBuffer.java  |   1 -
 .../test/java/org/apache/drill/TestBugFixes.java   |  31 +-
 .../test/java/org/apache/drill/TestCTASJson.java   |  20 +-
 .../java/org/apache/drill/TestFrameworkTest.java   |  12 +-
 .../java/org/apache/drill/TestJoinNullable.java    |   1 -
 .../exec/compile/TestLargeFileCompilation.java     |  22 +-
 .../physical/impl/TestNestedDateTimeTimestamp.java |  10 +-
 .../physical/impl/join/TestMergeJoinAdvanced.java  |  22 +-
 .../physical/impl/validate/TestBatchValidator.java |   2 +-
 .../writer/TestCorruptParquetDateCorrection.java   |  23 +-
 .../exec/physical/rowSet/TestMapAccessors.java     |  16 +-
 .../physical/rowSet/TestOffsetVectorWriter.java    |  14 +-
 .../exec/physical/rowSet/TestScalarAccessors.java  | 217 ++++++-
 .../drill/exec/planner/sql/TestDrillSQLWorker.java |   8 +-
 .../org/apache/drill/exec/server/TestOptions.java  |  12 +-
 .../java/org/apache/drill/exec/sql/TestCTAS.java   |  16 +-
 .../java/org/apache/drill/exec/sql/TestCTTAS.java  |  30 +-
 .../complex/fn/TestJsonReaderWithSparseFiles.java  |  44 +-
 .../exec/vector/complex/writer/TestJsonNanInf.java |  20 +-
 .../exec/vector/complex/writer/TestJsonReader.java |  11 +-
 .../TestLimit0VsRegularQueriesMetadata.java        |  12 +-
 .../java/org/apache/drill/test/BaseFixture.java    |  27 +-
 .../java/org/apache/drill/test/BaseTestQuery.java  | 108 ++--
 .../org/apache/drill/test/DrillTestWrapper.java    |   8 +-
 .../org/apache/drill/test/SubOperatorTest.java     |   1 +
 .../apache/drill/exec/memory/BaseAllocator.java    |  14 +-
 .../src/main/codegen/templates/ComplexWriters.java |  14 +-
 .../main/codegen/templates/FixedValueVectors.java  |  22 +-
 .../src/main/codegen/templates/ListWriters.java    |   1 -
 .../src/main/codegen/templates/MapWriters.java     |  24 +-
 .../codegen/templates/NullableValueVectors.java    | 119 ++--
 .../codegen/templates/VariableLengthVectors.java   | 190 ++++---
 .../apache/drill/exec/vector/NullableVector.java   |   8 +-
 .../exec/vector/accessor/impl/VectorPrinter.java   |   9 +
 .../accessor/writer/NullableScalarWriter.java      |   3 +-
 .../accessor/writer/OffsetVectorWriterImpl.java    |  24 +-
 .../vector/complex/AbstractRepeatedMapVector.java  |  32 +-
 .../vector/complex/BaseRepeatedValueVector.java    |  16 +-
 .../exec/vector/complex/EmptyValuePopulator.java   |  42 +-
 .../exec/vector/VariableLengthVectorTest.java      |  52 +-
 .../org/apache/drill/exec/vector/VectorTest.java   | 623 +++++++++++++++++++++
 44 files changed, 1494 insertions(+), 574 deletions(-)

diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index fea795c..ae993e6 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -17,6 +17,11 @@
  */
 package org.apache.drill.exec.hive;
 
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 import java.math.BigDecimal;
 import java.util.Collections;
 import java.util.HashMap;
@@ -40,11 +45,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 
 @Category({SlowTest.class, HiveStorageTest.class})
 public class TestHiveStorage extends HiveTestBase {
@@ -397,7 +397,7 @@ public class TestHiveStorage extends HiveTestBase {
       verifyColumnsMetadata(client.createPreparedStatement(String.format("select * from (%s) t limit 0", query)).get()
           .getPreparedStatement().getColumnsList(), expectedResult);
     } finally {
-      test("alter session reset `%s`", ExecConstants.EARLY_LIMIT0_OPT_KEY);
+      resetSessionOption(ExecConstants.EARLY_LIMIT0_OPT_KEY);
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index f7e63e4..5c4f8b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -69,6 +69,30 @@ import org.slf4j.LoggerFactory;
  * Only handles single (non-hyper) vectors at present. Current form is
  * self-contained. Better checks can be done by moving checks inside
  * vectors or by exposing more metadata from vectors.
+ * <p>
+ * Drill is not clear on how to handle a batch of zero records. Offset
+ * vectors normally have one more entry than the record count. If a
+ * batch has 1 record, the offset vector has 2 entries. The entry at
+ * 0 is always 0, the entry at 1 marks the end of the 0th record.
+ * <p>
+ * But, this gets a bit murky. If a batch has one record, and contains
+ * a repeated map, and the map has no entries, then the nested offset
+ * vector usually has 0 entries, not 1.
+ * <p>
+ * Generalizing, sometimes when a batch has zero records, the "top-level"
+ * offset vectors have 1 items, sometimes zero items.
+ * <p>
+ * The simplest solution would be to simply enforce here that all offset
+ * vectors must have n+1 entries, where n is the row count (top-level
+ * vectors) or item count (nested vectors.)
+ * <p>
+ * But, after fighting with the code, this seems an unobtainable goal.
+ * For one thing, deserialization seems to rely on nested offset vectors
+ * having zero entries when the value count is zero.
+ * <p>
+ * Instead, this code assumes that any offset vector, top-level or
+ * nested, will have zero entries if the value count is zero. That is
+ * an offset vector has either zero entries or n+1 entries.
  */
 
 public class BatchValidator {
@@ -166,22 +190,15 @@ public class BatchValidator {
     /** Check only batch, container counts. */
     COUNTS,
     /** Check vector value counts. */
-    VECTORS,
-    /** Check special handling of zero-sized vectors.
-     *  This is the most strict form of checking, but
-     *  many operators fail this check.
-     */
-    ZERO_SIZE
+    VECTORS
     };
 
   private static final Map<Class<? extends CloseableRecordBatch>, CheckMode> checkRules = buildRules();
 
   private final ErrorReporter errorReporter;
-  private final boolean checkZeroSize;
 
-  public BatchValidator(ErrorReporter errorReporter, boolean checkZeroSize) {
+  public BatchValidator(ErrorReporter errorReporter) {
     this.errorReporter = errorReporter;
-    this.checkZeroSize = checkZeroSize;
   }
 
   /**
@@ -192,20 +209,20 @@ public class BatchValidator {
    */
   private static Map<Class<? extends CloseableRecordBatch>, CheckMode> buildRules() {
     Map<Class<? extends CloseableRecordBatch>, CheckMode> rules = new IdentityHashMap<>();
-    rules.put(OperatorRecordBatch.class, CheckMode.ZERO_SIZE);
-    rules.put(ScanBatch.class, CheckMode.ZERO_SIZE);
-    rules.put(ProjectRecordBatch.class, CheckMode.ZERO_SIZE);
+    rules.put(OperatorRecordBatch.class, CheckMode.VECTORS);
+    rules.put(ScanBatch.class, CheckMode.VECTORS);
+    rules.put(ProjectRecordBatch.class, CheckMode.VECTORS);
     rules.put(FilterRecordBatch.class, CheckMode.VECTORS);
     rules.put(PartitionLimitRecordBatch.class, CheckMode.VECTORS);
     rules.put(UnnestRecordBatch.class, CheckMode.VECTORS);
     rules.put(HashAggBatch.class, CheckMode.VECTORS);
-    rules.put(RemovingRecordBatch.class, CheckMode.ZERO_SIZE);
-    rules.put(StreamingAggBatch.class, CheckMode.ZERO_SIZE);
-    rules.put(RuntimeFilterRecordBatch.class, CheckMode.ZERO_SIZE);
-    rules.put(FlattenRecordBatch.class, CheckMode.ZERO_SIZE);
-    rules.put(MergeJoinBatch.class, CheckMode.ZERO_SIZE);
-    rules.put(NestedLoopJoinBatch.class, CheckMode.ZERO_SIZE);
-    rules.put(LimitRecordBatch.class, CheckMode.ZERO_SIZE);
+    rules.put(RemovingRecordBatch.class, CheckMode.VECTORS);
+    rules.put(StreamingAggBatch.class, CheckMode.VECTORS);
+    rules.put(RuntimeFilterRecordBatch.class, CheckMode.VECTORS);
+    rules.put(FlattenRecordBatch.class, CheckMode.VECTORS);
+    rules.put(MergeJoinBatch.class, CheckMode.VECTORS);
+    rules.put(NestedLoopJoinBatch.class, CheckMode.VECTORS);
+    rules.put(LimitRecordBatch.class, CheckMode.VECTORS);
     return rules;
   }
 
@@ -215,6 +232,11 @@ public class BatchValidator {
   }
 
   public static boolean validate(RecordBatch batch) {
+    // This is a handy place to trace batches as they flow up
+    // the DAG. Works best for single-threaded runs with few records.
+    // System.out.println(batch.getClass().getSimpleName());
+    // RowSetFormatter.print(batch);
+
     CheckMode checkMode = lookup(batch);
 
     // If no rule, don't check this batch.
@@ -286,17 +308,15 @@ public class BatchValidator {
         break;
       }
     }
-    if (checkMode == CheckMode.VECTORS ||
-        checkMode == CheckMode.ZERO_SIZE) {
-      new BatchValidator(reporter, checkMode == CheckMode.ZERO_SIZE)
-          .validateBatch(batch, valueCount);
+    if (checkMode == CheckMode.VECTORS) {
+      new BatchValidator(reporter).validateBatch(batch, valueCount);
     }
     return reporter.errorCount() == 0;
   }
 
   public static boolean validate(VectorAccessible batch) {
     ErrorReporter reporter = errorReporter(batch);
-    new BatchValidator(reporter, false).validateBatch(batch, batch.getRecordCount());
+    new BatchValidator(reporter).validateBatch(batch, batch.getRecordCount());
     return reporter.errorCount() == 0;
   }
 
@@ -317,61 +337,61 @@ public class BatchValidator {
 
   private void validateWrapper(int rowCount, VectorWrapper<? extends ValueVector> w) {
     if (w instanceof SimpleVectorWrapper) {
-      validateVector(rowCount, w.getValueVector(), true);
+      validateVector(rowCount, w.getValueVector());
     }
   }
 
-  private void validateVector(int expectedCount, ValueVector vector, boolean topLevel) {
-    validateVector(vector.getField().getName(), expectedCount, vector, topLevel);
+  private void validateVector(int expectedCount, ValueVector vector) {
+    validateVector(vector.getField().getName(), expectedCount, vector);
   }
 
-  private void validateVector(String name, int expectedCount, ValueVector vector, boolean topLevel) {
+  private void validateVector(String name, int expectedCount, ValueVector vector) {
     int valueCount = vector.getAccessor().getValueCount();
     if (valueCount != expectedCount) {
       error(name, vector,
           String.format("Row count = %d, but value count = %d",
               expectedCount, valueCount));
     }
-    validateVector(name, vector, topLevel);
+    validateVector(name, vector);
   }
 
-  private void validateVector(String name, ValueVector vector, boolean topLevel) {
+  private void validateVector(String name, ValueVector vector) {
     if (vector instanceof BitVector) {
       validateBitVector(name, (BitVector) vector);
     } else if (vector instanceof RepeatedBitVector) {
-      validateRepeatedBitVector(name, (RepeatedBitVector) vector, topLevel);
+      validateRepeatedBitVector(name, (RepeatedBitVector) vector);
     } else if (vector instanceof NullableVector) {
-      validateNullableVector(name, (NullableVector) vector, topLevel);
+      validateNullableVector(name, (NullableVector) vector);
     } else if (vector instanceof VarCharVector) {
-      validateVarCharVector(name, (VarCharVector) vector, topLevel);
+      validateVarCharVector(name, (VarCharVector) vector);
     } else if (vector instanceof VarBinaryVector) {
-      validateVarBinaryVector(name, (VarBinaryVector) vector, topLevel);
+      validateVarBinaryVector(name, (VarBinaryVector) vector);
     } else if (vector instanceof FixedWidthVector ||
                vector instanceof ZeroVector) {
       // Not much to do. The only item to check is the vector
       // count itself, which was already done. There is no inner
       // structure to check.
     } else if (vector instanceof BaseRepeatedValueVector) {
-      validateRepeatedVector(name, (BaseRepeatedValueVector) vector, topLevel);
+      validateRepeatedVector(name, (BaseRepeatedValueVector) vector);
     } else if (vector instanceof RepeatedMapVector) {
-      validateRepeatedMapVector(name, (RepeatedMapVector) vector, topLevel);
+      validateRepeatedMapVector(name, (RepeatedMapVector) vector);
     } else if (vector instanceof MapVector) {
       validateMapVector(name, (MapVector) vector);
     } else if (vector instanceof RepeatedListVector) {
-      validateRepeatedListVector(name, (RepeatedListVector) vector, topLevel);
+      validateRepeatedListVector(name, (RepeatedListVector) vector);
     } else if (vector instanceof DictVector) {
-      validateDictVector(name, (DictVector) vector, topLevel);
+      validateDictVector(name, (DictVector) vector);
     } else if (vector instanceof UnionVector) {
       validateUnionVector(name, (UnionVector) vector);
     } else if (vector instanceof VarDecimalVector) {
-      validateVarDecimalVector(name, (VarDecimalVector) vector, topLevel);
+      validateVarDecimalVector(name, (VarDecimalVector) vector);
     } else {
       logger.debug("Don't know how to validate vector: {}  of class {}",
           name, vector.getClass().getSimpleName());
     }
   }
 
-  private void validateNullableVector(String name, NullableVector vector, boolean topLevel) {
+  private void validateNullableVector(String name, NullableVector vector) {
     int outerCount = vector.getAccessor().getValueCount();
     ValueVector valuesVector = vector.getValuesVector();
     int valueCount = valuesVector.getAccessor().getValueCount();
@@ -381,44 +401,36 @@ public class BatchValidator {
           outerCount, valueCount));
     }
     verifyIsSetVector(vector, (UInt1Vector) vector.getBitsVector());
-    validateVector(name + "-values", valuesVector, topLevel);
+    validateVector(name + "-values", valuesVector);
   }
 
-  private void validateVarCharVector(String name, VarCharVector vector, boolean topLevel) {
+  private void validateVarCharVector(String name, VarCharVector vector) {
     int dataLength = vector.getBuffer().writerIndex();
-    validateVarWidthVector(name, vector, dataLength, topLevel);
+    validateVarWidthVector(name, vector, dataLength);
   }
 
-  private void validateVarBinaryVector(String name, VarBinaryVector vector, boolean topLevel) {
+  private void validateVarBinaryVector(String name, VarBinaryVector vector) {
     int dataLength = vector.getBuffer().writerIndex();
-    validateVarWidthVector(name, vector, dataLength, topLevel);
+    validateVarWidthVector(name, vector, dataLength);
   }
 
-  private void validateVarDecimalVector(String name, VarDecimalVector vector,
-      boolean topLevel) {
+  private void validateVarDecimalVector(String name, VarDecimalVector vector) {
     int dataLength = vector.getBuffer().writerIndex();
-    validateVarWidthVector(name, vector, dataLength, topLevel);
+    validateVarWidthVector(name, vector, dataLength);
   }
 
-  private void validateVarWidthVector(String name, VariableWidthVector vector, int dataLength, boolean topLevel) {
+  private void validateVarWidthVector(String name, VariableWidthVector vector, int dataLength) {
     int valueCount = vector.getAccessor().getValueCount();
-
-    // Disabled because a large number of operators
-    // set up offset vectors wrongly.
-    if (valueCount == 0 && !checkZeroSize) {
-      return;
-    }
-
     validateOffsetVector(name + "-offsets", vector.getOffsetVector(),
-        valueCount, dataLength, topLevel);
+        valueCount, dataLength);
   }
 
-  private void validateRepeatedVector(String name, BaseRepeatedValueVector vector, boolean topLevel) {
+  private void validateRepeatedVector(String name, BaseRepeatedValueVector vector) {
     ValueVector dataVector = vector.getDataVector();
     int dataLength = dataVector.getAccessor().getValueCount();
     int valueCount = vector.getAccessor().getValueCount();
     int itemCount = validateOffsetVector(name + "-offsets", vector.getOffsetVector(),
-        valueCount, dataLength, topLevel);
+        valueCount, dataLength);
 
     if (dataLength != itemCount) {
       error(name, vector, String.format(
@@ -429,14 +441,14 @@ public class BatchValidator {
     // Special handling of repeated VarChar vectors
     // The nested data vectors are not quite exactly like top-level vectors.
 
-    validateVector(name + "-data", dataVector, false);
+    validateVector(name + "-data", dataVector);
   }
 
-  private void validateRepeatedBitVector(String name, RepeatedBitVector vector, boolean topLevel) {
+  private void validateRepeatedBitVector(String name, RepeatedBitVector vector) {
     int valueCount = vector.getAccessor().getValueCount();
     int maxBitCount = valueCount * 8;
     int elementCount = validateOffsetVector(name + "-offsets",
-        vector.getOffsetVector(), valueCount, maxBitCount, topLevel);
+        vector.getOffsetVector(), valueCount, maxBitCount);
     BitVector dataVector = vector.getDataVector();
     if (dataVector.getAccessor().getValueCount() != elementCount) {
       error(name, vector, String.format(
@@ -461,34 +473,34 @@ public class BatchValidator {
   private void validateMapVector(String name, MapVector vector) {
     int valueCount = vector.getAccessor().getValueCount();
     for (ValueVector child: vector) {
-      validateVector(valueCount, child, false);
+      validateVector(valueCount, child);
     }
   }
 
   private void validateRepeatedMapVector(String name,
-      RepeatedMapVector vector, boolean topLevel) {
+      RepeatedMapVector vector) {
     int valueCount = vector.getAccessor().getValueCount();
     int elementCount = validateOffsetVector(name + "-offsets",
-        vector.getOffsetVector(), valueCount, Integer.MAX_VALUE, topLevel);
+        vector.getOffsetVector(), valueCount, Integer.MAX_VALUE);
     for (ValueVector child: vector) {
-      validateVector(elementCount, child, false);
+      validateVector(elementCount, child);
     }
   }
 
-  private void validateDictVector(String name, DictVector vector, boolean topLevel) {
+  private void validateDictVector(String name, DictVector vector) {
     int valueCount = vector.getAccessor().getValueCount();
     int elementCount = validateOffsetVector(name + "-offsets",
-        vector.getOffsetVector(), valueCount, Integer.MAX_VALUE, topLevel);
-    validateVector(elementCount, vector.getKeys(), false);
-    validateVector(elementCount, vector.getValues(), false);
+        vector.getOffsetVector(), valueCount, Integer.MAX_VALUE);
+    validateVector(elementCount, vector.getKeys());
+    validateVector(elementCount, vector.getValues());
   }
 
   private void validateRepeatedListVector(String name,
-      RepeatedListVector vector, boolean topLevel) {
+      RepeatedListVector vector) {
     int valueCount = vector.getAccessor().getValueCount();
     int elementCount = validateOffsetVector(name + "-offsets",
-        vector.getOffsetVector(), valueCount, Integer.MAX_VALUE, topLevel);
-    validateVector(elementCount, vector.getDataVector(), false);
+        vector.getOffsetVector(), valueCount, Integer.MAX_VALUE);
+    validateVector(elementCount, vector.getDataVector());
   }
 
   private void validateUnionVector(String name, UnionVector vector) {
@@ -511,30 +523,22 @@ public class BatchValidator {
             type.name()));
       } else {
         validateVector(name + "-type-" + type.name(),
-            valueCount, child, false);
+            valueCount, child);
       }
     }
   }
 
   private int validateOffsetVector(String name, UInt4Vector offsetVector,
-      int valueCount, int maxOffset, boolean topLevel) {
+      int valueCount, int maxOffset) {
+    // VectorPrinter.printOffsets(offsetVector, valueCount + 1);
     UInt4Vector.Accessor accessor = offsetVector.getAccessor();
     int offsetCount = accessor.getValueCount();
-    // TODO: Disabled because a large number of operators
-    // set up offset vectors incorrectly. Either that, or semantics
-    // are ill-defined: some vectors assume an offset vector length
-    // of 0 if the "outer" value count is zero (which, while fiddly, is
-    // a workable definition.)
-//    if (checkZeroSize && topLevel && offsetCount == 0) {
-//      error(name, offsetVector,
-//          "Offset vector has length 0, expected 1+");
-//    }
     if (valueCount == 0 && offsetCount > 1 || valueCount > 0 && offsetCount != valueCount + 1) {
       error(name, offsetVector, String.format(
           "Outer vector has %d values, but offset vector has %d, expected %d",
           valueCount, offsetCount, valueCount + 1));
     }
-    if (valueCount == 0 && (!topLevel || !checkZeroSize)) {
+    if (valueCount == 0) {
       return 0;
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 6965bf9..4dd146e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -429,6 +429,7 @@ public class JsonReader extends BaseJsonReader {
         case START_ARRAY:
           writeData(list.list());
           break;
+
         case START_OBJECT:
           if (!writeListDataIfTyped(list)) {
             writeData(list.map(), FieldSelection.ALL_VALID, false);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 5487d95..db25fd4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -239,7 +239,6 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
 
     assertAckSent(b);
     return b;
-
   }
 
   private void assertAckSent(RawFragmentBatch batch) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
index 9717077..c913888 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
@@ -17,10 +17,17 @@
  */
 package org.apache.drill;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import static org.junit.Assert.fail;
+
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.test.BaseTestQuery;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -28,14 +35,8 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.nio.file.Paths;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 @Category(UnlikelyTest.class)
 public class TestBugFixes extends BaseTestQuery {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBugFixes.class);
 
   @BeforeClass
   public static void setupTestFiles() {
@@ -111,21 +112,23 @@ public class TestBugFixes extends BaseTestQuery {
     }
   }
 
-  @Test (expected = UserException.class)
+  @Test
   // Should be "Failure while parsing sql. Node [rel#26:Subset#6.LOGICAL.ANY([]).[]] could not be implemented;".
   // Drill will hit CanNotPlan, until we add code fix to transform the local LHS filter in left outer join properly.
   public void testDRILL1337_LocalLeftFilterLeftOutJoin() throws Exception {
     try {
-      test("select count(*) from cp.`tpch/nation.parquet` n left outer join cp.`tpch/region.parquet` r on n.n_regionkey = r.r_regionkey and n.n_nationkey > 10;");
+      test("select count(*) from cp.`tpch/nation.parquet` n left outer join " +
+           "cp.`tpch/region.parquet` r on n.n_regionkey = r.r_regionkey and n.n_nationkey > 10;");
+      fail();
     } catch (UserException e) {
-      logger.info("***** Test resulted in expected failure: " + e.getMessage());
-      throw e;
+      // Expected;
     }
   }
 
   @Test
   public void testDRILL1337_LocalRightFilterLeftOutJoin() throws Exception {
-    test("select * from cp.`tpch/nation.parquet` n left outer join cp.`tpch/region.parquet` r on n.n_regionkey = r.r_regionkey and r.r_name not like '%ASIA' order by r.r_name;");
+    test("select * from cp.`tpch/nation.parquet` n left outer join " +
+         "cp.`tpch/region.parquet` r on n.n_regionkey = r.r_regionkey and r.r_name not like '%ASIA' order by r.r_name;");
   }
 
   @Test
@@ -297,8 +300,8 @@ public class TestBugFixes extends BaseTestQuery {
           "    SELECT count(1) `a_count` FROM cp.`tpch/nation.parquet`\n" +
           ") `t5` ON TRUE\n");
     } finally {
-      test("ALTER SESSION RESET `planner.enable_nljoin_for_scalar_only`");
-      test("ALTER SESSION RESET `planner.slice_target`");
+      resetSessionOption("planner.enable_nljoin_for_scalar_only");
+      resetSessionOption("planner.slice_target");
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestCTASJson.java b/exec/java-exec/src/test/java/org/apache/drill/TestCTASJson.java
index bbcd00b..7594fb7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestCTASJson.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestCTASJson.java
@@ -45,8 +45,8 @@ public class TestCTASJson extends PlanTestBase {
           .run();
     } finally {
       test("drop table " + testName + "_json");
-      test("alter session reset `store.format` ");
-      test("alter session reset store.json.writer.skip_null_fields ");
+      resetSessionOption("store.format");
+      resetSessionOption("store.json.writer.skip_null_fields");
     }
   }
 
@@ -73,10 +73,9 @@ public class TestCTASJson extends PlanTestBase {
           .run();
     } finally{
       test("drop table " + testName + "_json" );
-      test("alter session reset `store.format` ");
-      test("alter session reset store.json.writer.skip_null_fields ");
+      resetSessionOption("store.format");
+      resetSessionOption("store.json.writer.skip_null_fields");
     }
-
   }
 
   @Test
@@ -103,10 +102,9 @@ public class TestCTASJson extends PlanTestBase {
           .run();
     }finally{
       test("drop table " + testName + "_json" );
-      test("alter session reset `store.format` ");
-      test("alter session reset store.json.writer.skip_null_fields ");
+      resetSessionOption("store.format");
+      resetSessionOption("store.json.writer.skip_null_fields");
     }
-
   }
 
   @Test
@@ -133,10 +131,8 @@ public class TestCTASJson extends PlanTestBase {
           .run();
     } finally {
       test("drop table " + testName + "_json" );
-      test("alter session reset `store.format` ");
-      test("alter session reset store.json.writer.skip_null_fields ");
+      resetSessionOption("store.format");
+      resetSessionOption("store.json.writer.skip_null_fields");
     }
-
   }
-
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java b/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java
index d55da30..27da7e0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestFrameworkTest.java
@@ -34,17 +34,19 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.test.TestBuilder;
 import org.hamcrest.CoreMatchers;
 import org.junit.Test;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+// TODO - update framework to remove any dependency on the Drill engine
+// for reading baseline result sets currently using it with the assumption
+// that the csv and json readers are well tested, and handling diverse
+// types in the test framework would require doing some redundant work
+// to enable casting outside of Drill or some better tooling to generate
+// parquet files that have all of the parquet types
 
-// TODO - update framework to remove any dependency on the Drill engine for reading baseline result sets
-// currently using it with the assumption that the csv and json readers are well tested, and handling diverse
-// types in the test framework would require doing some redundant work to enable casting outside of Drill or
-// some better tooling to generate parquet files that have all of the parquet types
 public class TestFrameworkTest extends BaseTestQuery {
 
   private static String CSV_COLS = " cast(columns[0] as bigint) employee_id, columns[1] as first_name, columns[2] as last_name ";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java b/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java
index 13f5dd8..c31c77c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestJoinNullable.java
@@ -26,7 +26,6 @@ import org.junit.experimental.categories.Category;
 
 @Category(OperatorTest.class)
 public class TestJoinNullable extends BaseTestQuery {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJoinNullable.class);
 
   private static void enableJoin(boolean hj, boolean mj) throws Exception {
     test("alter session set `planner.enable_hashjoin` = %s", hj);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
index a2b696a..35e58c1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
@@ -20,9 +20,9 @@ package org.apache.drill.exec.compile;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.test.TestTools;
-import org.apache.drill.exec.ExecConstants;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -223,9 +223,9 @@ public class TestLargeFileCompilation extends BaseTestQuery {
       testNoResult(LARGE_TABLE_WRITER, tableName);
       testNoResult(QUERY_WITH_JOIN, tableName);
     } finally {
-      testNoResult("alter session reset `planner.enable_mergejoin`");
-      testNoResult("alter session reset `planner.enable_nestedloopjoin`");
-      testNoResult("alter session reset `%s`", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      resetSessionOption("planner.enable_mergejoin");
+      resetSessionOption("planner.enable_nestedloopjoin");
+      resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
       testNoResult("drop table if exists %s", tableName);
     }
   }
@@ -241,9 +241,9 @@ public class TestLargeFileCompilation extends BaseTestQuery {
       testNoResult(LARGE_TABLE_WRITER, tableName);
       testNoResult(QUERY_WITH_JOIN, tableName);
     } finally {
-      testNoResult("alter session reset `planner.enable_hashjoin`");
-      testNoResult("alter session reset `planner.enable_nestedloopjoin`");
-      testNoResult("alter session reset `%s`", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      resetSessionOption("planner.enable_hashjoin");
+      resetSessionOption("planner.enable_nestedloopjoin");
+      resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
       testNoResult("drop table if exists %s", tableName);
     }
   }
@@ -260,10 +260,10 @@ public class TestLargeFileCompilation extends BaseTestQuery {
       testNoResult(LARGE_TABLE_WRITER, tableName);
       testNoResult(QUERY_WITH_JOIN, tableName);
     } finally {
-      testNoResult("alter session reset `planner.enable_nljoin_for_scalar_only`");
-      testNoResult("alter session reset `planner.enable_hashjoin`");
-      testNoResult("alter session reset `planner.enable_mergejoin`");
-      testNoResult("alter session reset `%s`", ClassCompilerSelector.JAVA_COMPILER_OPTION);
+      resetSessionOption("planner.enable_nljoin_for_scalar_only");
+      resetSessionOption("planner.enable_hashjoin");
+      resetSessionOption("planner.enable_mergejoin");
+      resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
       testNoResult("drop table if exists %s", tableName);
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestNestedDateTimeTimestamp.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestNestedDateTimeTimestamp.java
index 321dd53..8d79ebe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestNestedDateTimeTimestamp.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestNestedDateTimeTimestamp.java
@@ -129,8 +129,8 @@ public class TestNestedDateTimeTimestamp extends BaseTestQuery {
       testBuilder().sqlQuery(readQuery).ordered().jsonBaselineFile("baseline_nested_datetime.json").build().run();
     } finally {
       test("drop table " + testName + "_json");
-      test("alter session reset `store.format` ");
-      test("alter session reset store.json.extended_types ");
+      resetSessionOption("store.format");
+      resetSessionOption("store.json.extended_types");
     }
   }
 
@@ -152,8 +152,8 @@ public class TestNestedDateTimeTimestamp extends BaseTestQuery {
       testBuilder().sqlQuery(readQuery).ordered().jsonBaselineFile("datetime.parquet").build().run();
     } finally {
       test("drop table " + testName + "_json");
-      test("alter session reset `store.format` ");
-      test("alter session reset store.json.extended_types ");
+      resetSessionOption("store.format");
+      resetSessionOption("store.json.extended_types");
     }
   }
 
@@ -174,7 +174,7 @@ public class TestNestedDateTimeTimestamp extends BaseTestQuery {
       testBuilder().sqlQuery(readQuery).ordered().jsonBaselineFile("datetime.parquet").build().run();
     } finally {
       test("drop table " + testName + "_parquet");
-      test("alter session reset `store.format` ");
+      resetSessionOption("store.format");
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
index 2ae0aa6..4816fa9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
@@ -17,30 +17,30 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Random;
+
+import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
-import org.apache.drill.test.TestTools;
-import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.test.TestTools;
 import org.hamcrest.CoreMatchers;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.Ignore;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TestRule;
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.Random;
-
 @Category(OperatorTest.class)
 public class TestMergeJoinAdvanced extends JoinTestBase {
   private static final String LEFT = "merge-join-left.json";
@@ -123,7 +123,7 @@ public class TestMergeJoinAdvanced extends JoinTestBase {
     try {
       test("select * from dfs.`join/j1` j1 left outer join dfs.`join/j2` j2 on (j1.c_varchar = j2.c_varchar)");
     } finally {
-      test("ALTER SESSION RESET ALL");
+      resetAllSessionOptions();
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
index 3b5a133..a30bda9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestBatchValidator.java
@@ -151,7 +151,7 @@ public class TestBatchValidator extends SubOperatorTest {
 
   private static void checkForError(SingleRowSet batch, String expectedError) {
     CapturingReporter cr = new CapturingReporter();
-    new BatchValidator(cr, true).validateBatch(batch.vectorAccessible(), batch.rowCount());
+    new BatchValidator(cr).validateBatch(batch.vectorAccessible(), batch.rowCount());
     assertTrue(cr.errors.size() > 0);
     Pattern p = Pattern.compile(expectedError);
     Matcher m = p.matcher(cr.errors.get(0));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
index d4a0917..f6fb35f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestCorruptParquetDateCorrection.java
@@ -19,23 +19,23 @@ package org.apache.drill.exec.physical.impl.writer;
 
 import static java.lang.String.format;
 
-import org.apache.drill.categories.ParquetTest;
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.LocalDate;
+
 import org.apache.drill.PlanTestBase;
-import org.apache.drill.test.TestBuilder;
+import org.apache.drill.categories.ParquetTest;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
+import org.apache.drill.test.TestBuilder;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.File;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.time.LocalDate;
-
 /**
  * Tests for compatibility reading old parquet files after date corruption
  * issue was fixed in DRILL-4203.
@@ -148,7 +148,7 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
             .go();
       }
     } finally {
-      test("alter session reset all");
+      resetAllSessionOptions();
     }
   }
 
@@ -211,7 +211,7 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
           .contains("Year out of range"));
       throw e;
     } finally {
-      test("alter session reset all");
+      resetAllSessionOptions();
     }
   }
 
@@ -243,7 +243,7 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
         }
       }
     } finally {
-      test("alter session reset all");
+      resetAllSessionOptions();
     }
   }
 
@@ -284,9 +284,8 @@ public class TestCorruptParquetDateCorrection extends PlanTestBase {
       // read all of the types with the complex reader
       readFilesWithUserDisabledAutoCorrection();
     } finally {
-      test("alter session reset all");
+      resetAllSessionOptions();
     }
-
   }
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java
index 581e164..d128fcd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestMapAccessors.java
@@ -30,14 +30,17 @@ import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.ArrayReader;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
 import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.TupleReader;
 import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.exec.vector.complex.MapVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.test.ClientFixture;
 import org.apache.drill.test.ClusterFixture;
@@ -98,10 +101,21 @@ public class TestMapAccessors extends SubOperatorTest {
 
     rootWriter.addRow(20, mapValue(210, "barney"), "bam-bam");
 
+    RowSet result = builder.build();
+    assertEquals(2, result.rowCount());
+
+    // Validate internal structure.
+
+    VectorContainer container = result.container();
+    assertEquals(3, container.getNumberOfColumns());
+    ValueVector v = container.getValueVector(1).getValueVector();
+    assertTrue(v instanceof MapVector);
+    MapVector mv = (MapVector) v;
+    assertEquals(2, mv.getAccessor().getValueCount());
+
     // Validate data. Do so using the readers to avoid verifying
     // using the very mechanisms we want to test.
 
-    RowSet result = builder.build();
     RowSetReader rootReader = result.reader();
     final ScalarReader aReader = rootReader.scalar("a");
     final TupleReader mReader = rootReader.tuple("m");
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestOffsetVectorWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestOffsetVectorWriter.java
index b396234..8fce484 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestOffsetVectorWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestOffsetVectorWriter.java
@@ -28,7 +28,6 @@ import org.apache.drill.exec.physical.rowSet.TestFixedWidthWriter.TestIndex;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.vector.UInt4Vector;
-import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.ValueType;
 import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriterImpl;
@@ -38,8 +37,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import io.netty.buffer.DrillBuf;
-
 /**
  * The offset vector writer is unique: it follows the same API as
  * the other writers, but has a unique twist because offsets are written
@@ -62,16 +59,7 @@ public class TestOffsetVectorWriter extends SubOperatorTest {
 
   @BeforeClass
   public static void setup() {
-    DrillBuf bufs[] = new DrillBuf[100];
-    for (int i = 0; i < bufs.length; i++) {
-      bufs[i] = fixture.allocator().buffer(ValueVector.MAX_BUFFER_SIZE);
-      for (int j = 0; j < ValueVector.MAX_BUFFER_SIZE; j++) {
-        bufs[i].setByte(j, (byte) (j & 0x7f));
-      }
-    }
-    for (int i = 0; i < bufs.length; i++) {
-      bufs[i].close();
-    }
+    fixture.dirtyMemory(100);
   }
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestScalarAccessors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestScalarAccessors.java
index efe543d..d0aed14 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestScalarAccessors.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestScalarAccessors.java
@@ -33,18 +33,29 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.BasicTypeHelper;
 import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
-import org.apache.drill.exec.record.SimpleVectorWrapper;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.BitVector;
 import org.apache.drill.exec.vector.DateUtilities;
+import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.NullableVector;
+import org.apache.drill.exec.vector.RepeatedVarCharVector;
+import org.apache.drill.exec.vector.UInt1Vector;
+import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarCharVector;
 import org.apache.drill.exec.vector.accessor.ArrayReader;
 import org.apache.drill.exec.vector.accessor.ArrayWriter;
 import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSetUtilities;
@@ -176,6 +187,23 @@ public class TestScalarAccessors extends SubOperatorTest {
         .build();
     assertEquals(3, rs.rowCount());
 
+    // Verify vector state
+
+    VectorContainer container = rs.container();
+    assertEquals(1, container.getNumberOfColumns());
+    ValueVector v = container.getValueVector(0).getValueVector();
+    assertTrue(v instanceof NullableVector);
+    NullableVector nv = (NullableVector) v;
+    assertEquals(3, nv.getAccessor().getValueCount());
+    assertEquals(3 * BasicTypeHelper.getSize(Types.required(type)),
+        ((BaseDataValueVector) v).getBuffer().writerIndex());
+
+    // Verify bits vector. (Assumes UInt1 implementation.)
+
+    UInt1Vector bv = (UInt1Vector) nv.getBitsVector();
+    assertEquals(3, bv.getAccessor().getValueCount());
+    assertEquals(3, bv.getBuffer().writerIndex());
+
     RowSetReader reader = rs.reader();
     ScalarReader colReader = reader.scalar(0);
 
@@ -211,6 +239,28 @@ public class TestScalarAccessors extends SubOperatorTest {
         .build();
     assertEquals(2, rs.rowCount());
 
+    // Verify vector state
+
+    VectorContainer container = rs.container();
+    assertEquals(1, container.getNumberOfColumns());
+    ValueVector v = container.getValueVector(0).getValueVector();
+    assertTrue(v instanceof RepeatedValueVector);
+    RepeatedValueVector rv = (RepeatedValueVector) v;
+    assertEquals(2, rv.getAccessor().getValueCount());
+
+    // Data vector: 3 values written above.
+
+    ValueVector vv = rv.getDataVector();
+    assertEquals(3, vv.getAccessor().getValueCount());
+    assertEquals(3 * BasicTypeHelper.getSize(Types.required(type)),
+        ((BaseDataValueVector) vv).getBuffer().writerIndex());
+
+    // Offsets vector: one more than row count
+
+    UInt4Vector ov = rv.getOffsetVector();
+    assertEquals(3, ov.getAccessor().getValueCount());
+    assertEquals(3 * 4, ov.getBuffer().writerIndex());
+
     RowSetReader reader = rs.reader();
     ArrayReader arrayReader = reader.array(0);
     ScalarReader colReader = arrayReader.scalar();
@@ -681,9 +731,34 @@ public class TestScalarAccessors extends SubOperatorTest {
         .addRow("abcd")
         .build();
     assertEquals(3, rs.rowCount());
-    SimpleVectorWrapper<?> vw = (SimpleVectorWrapper<?>) rs.container().getValueVector(0);
-    NullableVarCharVector v = (NullableVarCharVector) vw.getValueVector();
-    assertEquals(3, v.getMutator().getLastSet());
+
+    // Verify vector state
+
+    VectorContainer container = rs.container();
+    assertEquals(1, container.getNumberOfColumns());
+    ValueVector v = container.getValueVector(0).getValueVector();
+    assertTrue(v instanceof NullableVarCharVector);
+    NullableVarCharVector nvcv = (NullableVarCharVector) v;
+    assertEquals(3, nvcv.getAccessor().getValueCount());
+    assertEquals(2, nvcv.getMutator().getLastSet());
+
+    // Data vector: 3 values written above.
+
+    VarCharVector vv = nvcv.getValuesVector();
+    assertEquals(3, vv.getAccessor().getValueCount());
+
+    // Offsets vector: one more than row count
+
+    UInt4Vector ov = vv.getOffsetVector();
+    assertEquals(4, ov.getAccessor().getValueCount());
+    assertEquals(4 * 4, ov.getBuffer().writerIndex());
+
+    // Last offset and bytes buf length must agree
+
+    int lastIndex = ov.getAccessor().get(3);
+    assertEquals(lastIndex, vv.getBuffer().writerIndex());
+
+    // Verify using the reader
 
     RowSetReader reader = rs.reader();
     ScalarReader colReader = reader.scalar(0);
@@ -712,8 +787,8 @@ public class TestScalarAccessors extends SubOperatorTest {
         .addArray("col", MinorType.VARCHAR)
         .buildSchema();
     SingleRowSet rs = fixture.rowSetBuilder(schema)
-        .addSingleCol(new String[] {})
-        .addSingleCol(new String[] {"fred", "", "wilma"})
+        .addSingleCol(strArray())
+        .addSingleCol(strArray("fred", "", "wilma"))
         .build();
     assertEquals(2, rs.rowCount());
 
@@ -753,6 +828,57 @@ public class TestScalarAccessors extends SubOperatorTest {
   }
 
   /**
+   * Test for the special case for the "inner" offset vector
+   * as explained in the Javadoc for
+   * @{link org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriterImpl}
+   */
+
+  @Test
+  public void testEmptyVarcharArray() {
+    TupleMetadata schema = new SchemaBuilder()
+        .addArray("col", MinorType.VARCHAR)
+        .add("b", MinorType.VARCHAR)
+        .buildSchema();
+    SingleRowSet rs = fixture.rowSetBuilder(schema)
+        .addRow(strArray(), "first")
+        .addRow(strArray(), "second")
+        .addRow(strArray(), "third")
+        .build();
+    assertEquals(3, rs.rowCount());
+
+    // Verify vector state
+
+    VectorContainer container = rs.container();
+    assertEquals(2, container.getNumberOfColumns());
+    ValueVector v = container.getValueVector(0).getValueVector();
+    assertTrue(v instanceof RepeatedVarCharVector);
+    RepeatedVarCharVector rvc = (RepeatedVarCharVector) v;
+    assertEquals(3, rvc.getAccessor().getValueCount());
+
+    // Verify outer offsets vector
+
+    UInt4Vector oov = rvc.getOffsetVector();
+    assertEquals(4, oov.getAccessor().getValueCount());
+    assertEquals(4 * 4, oov.getBuffer().writerIndex());
+
+    // Inner vector
+
+    VarCharVector iv = rvc.getDataVector();
+    assertEquals(0, iv.getAccessor().getValueCount());
+    assertEquals(0, iv.getBuffer().writerIndex());
+
+    // Inner offset vector. Has 0 entries, not 1 as would be
+    // expected according to the general rule:
+    // offset vector length = value length + 1
+
+    UInt4Vector iov = iv.getOffsetVector();
+    assertEquals(0, iov.getAccessor().getValueCount());
+    assertEquals(0, iov.getBuffer().writerIndex());
+
+    rs.clear();
+  }
+
+  /**
    * Test the low-level interval-year utilities used by the column accessors.
    */
 
@@ -1658,6 +1784,19 @@ public class TestScalarAccessors extends SubOperatorTest {
         .build();
     assertEquals(6, rs.rowCount());
 
+    // Verify vector state
+
+    VectorContainer container = rs.container();
+    assertEquals(1, container.getNumberOfColumns());
+    ValueVector v = container.getValueVector(0).getValueVector();
+    assertTrue(v instanceof BitVector);
+    BitVector bv = (BitVector) v;
+    assertEquals(6, bv.getAccessor().getValueCount());
+    assertEquals(1,
+        ((BaseDataValueVector) v).getBuffer().writerIndex());
+
+    // Verify using a reader
+
     RowSetReader reader = rs.reader();
     ScalarReader colReader = reader.scalar(0);
 
@@ -1684,6 +1823,72 @@ public class TestScalarAccessors extends SubOperatorTest {
     rs.clear();
   }
 
+  @Test
+  public void testNullableBitRW() {
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("col", MinorType.BIT)
+        .buildSchema();
+
+    SingleRowSet rs = fixture.rowSetBuilder(schema)
+        .addSingleCol(true)
+        .addSingleCol(false)
+        .addSingleCol(null)
+        .addSingleCol(1)
+        .addSingleCol(2)
+        .addSingleCol(null)
+        .build();
+    assertEquals(6, rs.rowCount());
+
+    // Verify vector state
+
+    VectorContainer container = rs.container();
+    assertEquals(1, container.getNumberOfColumns());
+    ValueVector v = container.getValueVector(0).getValueVector();
+    assertTrue(v instanceof NullableBitVector);
+    NullableBitVector nv = (NullableBitVector) v;
+    assertEquals(6, nv.getAccessor().getValueCount());
+
+    BitVector dv = nv.getValuesVector();
+    assertEquals(6, dv.getAccessor().getValueCount());
+    assertEquals(1, dv.getBuffer().writerIndex());
+
+    // Verify bits vector. (Assumes UInt1 implementation.)
+
+    UInt1Vector bv = nv.getBitsVector();
+    assertEquals(6, bv.getAccessor().getValueCount());
+    assertEquals(6, bv.getBuffer().writerIndex());
+
+    // Verify using a reader
+
+    RowSetReader reader = rs.reader();
+    ScalarReader colReader = reader.scalar(0);
+
+    assertTrue(reader.next());
+    assertEquals(true, colReader.getBoolean());
+    assertEquals(1, colReader.getInt());
+    assertFalse(colReader.isNull());
+    assertTrue(reader.next());
+    assertEquals(false, colReader.getBoolean());
+    assertEquals(0, colReader.getInt());
+    assertFalse(colReader.isNull());
+    assertTrue(reader.next());
+    assertTrue(colReader.isNull());
+    assertTrue(reader.next());
+    assertEquals(true, colReader.getBoolean());
+    assertEquals(1, colReader.getInt());
+    assertTrue(reader.next());
+    assertFalse(colReader.isNull());
+    assertEquals(true, colReader.getBoolean());
+    assertEquals(1, colReader.getInt());
+    assertFalse(colReader.isNull());
+    assertTrue(reader.next());
+    assertTrue(colReader.isNull());
+
+    assertFalse(reader.next());
+    rs.clear();
+  }
+
   /**
    * The bit reader/writer are special and use the BitVector directly.
    * Ensure that resize works in this special case.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/TestDrillSQLWorker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/TestDrillSQLWorker.java
index 941cdfc..346fc26 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/TestDrillSQLWorker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/sql/TestDrillSQLWorker.java
@@ -21,10 +21,10 @@ import static org.junit.Assert.assertEquals;
 
 import org.apache.calcite.avatica.util.Quoting;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.drill.exec.planner.sql.parser.impl.DrillSqlParseException;
-import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.categories.SqlTest;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.sql.parser.impl.DrillSqlParseException;
+import org.apache.drill.test.BaseTestQuery;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -76,6 +76,8 @@ public class TestDrillSQLWorker extends BaseTestQuery {
       // Mix of different quotes in the one SQL statement is not acceptable
       errorMsgTestHelper("select \"employee_id\", \"full_name\" from cp.`employee.json` limit 1", "Encountered: \"`\"");
     } finally {
+      // Note: can't use the resetSessionOption(), it assumes
+      // default back-ticks which are not in effect here.
       test("ALTER SESSION RESET %s", PlannerSettings.QUOTING_IDENTIFIERS_KEY);
     }
   }
@@ -92,6 +94,8 @@ public class TestDrillSQLWorker extends BaseTestQuery {
           .baselineValues(1L, "Sheri Nowmer")
           .go();
     } finally {
+      // Note: can't use the resetSessionOption(), it assumes
+      // default back-ticks which are not in effect here.
       test("ALTER SESSION RESET %s", PlannerSettings.QUOTING_IDENTIFIERS_KEY);
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java
index 2c46db0..2544324 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestOptions.java
@@ -17,17 +17,17 @@
  */
 package org.apache.drill.exec.server;
 
-import org.apache.drill.test.BaseTestQuery;
+import static org.apache.drill.exec.ExecConstants.ENABLE_VERBOSE_ERRORS_KEY;
+import static org.apache.drill.exec.ExecConstants.SLICE_TARGET;
+import static org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType.VALIDATION;
+
 import org.apache.drill.categories.OptionsTest;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.test.UserExceptionMatcher;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static org.apache.drill.exec.ExecConstants.ENABLE_VERBOSE_ERRORS_KEY;
-import static org.apache.drill.exec.ExecConstants.SLICE_TARGET;
-import static org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType.VALIDATION;
-
 @Category(OptionsTest.class)
 public class TestOptions extends BaseTestQuery {
 //  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestOptions.class);
@@ -135,7 +135,7 @@ public class TestOptions extends BaseTestQuery {
   }
 
   @Test
-  public void resetAllSessionOptions() throws Exception {
+  public void testResetAllSessionOptions() throws Exception {
     // change options
     test("SET `%s` = %b;", ENABLE_VERBOSE_ERRORS_KEY, true);
     // check changed
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java
index 6e9f456..ddbdb25 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java
@@ -17,24 +17,24 @@
  */
 package org.apache.drill.exec.sql;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-import org.apache.drill.test.BaseTestQuery;
+import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+
 import org.apache.drill.categories.SqlTest;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.store.StorageStrategy;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.test.BaseTestQuery;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.util.Map;
-
-import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA;
-import static org.junit.Assert.assertEquals;
-
 @Category(SqlTest.class)
 public class TestCTAS extends BaseTestQuery {
   @Test // DRILL-2589
@@ -250,7 +250,7 @@ public class TestCTAS extends BaseTestQuery {
       assertEquals("File permission should match",
           storageStrategy.getFilePermission(), fs.listLocatedStatus(tableLocation).next().getPermission());
     } finally {
-      test("alter session reset `%s`", ExecConstants.PERSISTENT_TABLE_UMASK);
+      resetSessionOption(ExecConstants.PERSISTENT_TABLE_UMASK);
       test("drop table if exists %s", tableName);
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
index e616984..869ba3d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java
@@ -17,30 +17,30 @@
  */
 package org.apache.drill.exec.sql;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
+import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA;
+import static org.hamcrest.CoreMatchers.containsString;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.drill.categories.SqlTest;
+import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.WorkspaceConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.BaseTestQuery;
-import org.apache.drill.categories.SqlTest;
-import org.apache.drill.common.exceptions.UserRemoteException;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 
-import java.io.File;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
-import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA;
-import static org.hamcrest.CoreMatchers.containsString;
-
 @Category(SqlTest.class)
 public class TestCTTAS extends BaseTestQuery {
 
@@ -100,7 +100,7 @@ public class TestCTTAS extends BaseTestQuery {
             .go();
       }
     } finally {
-      test("alter session reset `store.format`");
+      resetSessionOption("store.format");
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
index be04e74..89c7e41 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
@@ -21,12 +21,12 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.List;
 
-import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.exec.util.JsonStringHashMap;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.BaseTestQuery;
 import org.junit.Test;
 
 public class TestJsonReaderWithSparseFiles extends BaseTestQuery {
@@ -58,12 +58,12 @@ public class TestJsonReaderWithSparseFiles extends BaseTestQuery {
       assertEquals("invalid record count returned", count, loader.getRecordCount());
 
       for (int r = 0; r < values.length; r++) {
-        final Object[] row = values[r];
+        Object[] row = values[r];
         for (int c = 0; c<values[r].length; c++) {
-          final Object expected = row[c];
-          final Object unconverted = loader.getValueAccessorById(ValueVector.class, c)
+          Object expected = row[c];
+          Object unconverted = loader.getValueAccessorById(ValueVector.class, c)
               .getValueVector().getAccessor().getObject(r);
-          final Object actual = converter.convert(unconverted);
+          Object actual = converter.convert(unconverted);
           assertEquals(String.format("row:%d - col:%d - expected:%s[%s] - actual:%s[%s]",
                 r, c, expected,
                 expected == null ? "null" : expected.getClass().getSimpleName(),
@@ -75,16 +75,16 @@ public class TestJsonReaderWithSparseFiles extends BaseTestQuery {
     }
   }
 
-  protected void query(final String query, final Function<RecordBatchLoader> testBody) throws Exception {
-    final List<QueryDataBatch> batches = testSqlWithResults(query);
-    final RecordBatchLoader loader = new RecordBatchLoader(client.getAllocator());
+  protected void query(String query, Function<RecordBatchLoader> testBody) throws Exception {
+    List<QueryDataBatch> batches = testSqlWithResults(query);
+    RecordBatchLoader loader = new RecordBatchLoader(client.getAllocator());
     try {
       // first batch at index 0 is empty and used for fast schema return. Load the second one for the tests
-      final QueryDataBatch batch = batches.get(0);
+      QueryDataBatch batch = batches.get(0);
       loader.load(batch.getHeader().getDef(), batch.getData());
       testBody.apply(loader);
     } finally {
-      for (final QueryDataBatch batch:batches) {
+      for (QueryDataBatch batch:batches) {
         batch.release();
       }
       loader.clear();
@@ -92,10 +92,10 @@ public class TestJsonReaderWithSparseFiles extends BaseTestQuery {
   }
 
   @Test
-  public void testIfDrillCanReadSparseRecords() throws Exception {
-    final String sql = "select * from cp.`vector/complex/fn/sparse.json`";
-    //XXX: make sure value order matches vector order
-    final Object[][] values = new Object[][] {
+  public void testReadSparseRecords() throws Exception {
+    String sql = "select * from cp.`vector/complex/fn/sparse.json`";
+    // TODO: make sure value order matches vector order
+    Object[][] values = new Object[][] {
         {null, null},
         {1L, null},
         {null, 2L},
@@ -105,10 +105,10 @@ public class TestJsonReaderWithSparseFiles extends BaseTestQuery {
   }
 
   @Test
-  public void testIfDrillCanReadSparseNestedRecordsWithoutRaisingException() throws Exception {
-    final String sql = "select * from cp.`vector/complex/fn/nested-with-nulls.json`";
-    //XXX: make sure value order matches vector order
-    final Object[][] values = new Object[][] {
+  public void testReadSparseNestedRecords() throws Exception {
+    String sql = "select * from cp.`vector/complex/fn/nested-with-nulls.json`";
+    // TODO: make sure value order matches vector order
+    Object[][] values = new Object[][] {
         {"[{},{},{},{\"name\":\"doe\"},{}]"},
         {"[]"},
         {"[{\"name\":\"john\",\"id\":10}]"},
@@ -118,10 +118,10 @@ public class TestJsonReaderWithSparseFiles extends BaseTestQuery {
   }
 
   @Test
-  public void testIfDrillCanQuerySingleRecordWithEmpties() throws Exception {
-    final String sql = "select * from cp.`vector/complex/fn/single-record-with-empties.json`";
-    //XXX: make sure value order matches vector order
-    final Object[][] values = new Object[][] {
+  public void testSingleRecordWithEmpties() throws Exception {
+    String sql = "select * from cp.`vector/complex/fn/single-record-with-empties.json`";
+    // TODO: make sure value order matches vector order
+    Object[][] values = new Object[][] {
         {"[{},{}]"},
     };
     query(sql, new Verifier(1, values));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonNanInf.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonNanInf.java
index b7b3744..8b8581a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonNanInf.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonNanInf.java
@@ -17,6 +17,15 @@
  */
 package org.apache.drill.exec.vector.complex.writer;
 
+import static org.apache.drill.test.TestBuilder.mapOf;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.List;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.drill.common.exceptions.UserRemoteException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -30,15 +39,6 @@ import org.apache.drill.test.BaseTestQuery;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.io.File;
-import java.util.List;
-
-import static org.apache.drill.test.TestBuilder.mapOf;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
 public class TestJsonNanInf extends BaseTestQuery {
 
 
@@ -184,7 +184,7 @@ public class TestJsonNanInf extends BaseTestQuery {
       assertThat(e.getMessage(), containsString("Error parsing JSON"));
       throw e;
     } finally {
-      test("alter session reset `%s`", ExecConstants.JSON_READER_NAN_INF_NUMBERS);
+      resetSessionOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS);
       FileUtils.deleteQuietly(file);
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index 18f5fae..79aa1d3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -37,6 +37,7 @@ import java.util.zip.GZIPOutputStream;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.RecordBatchLoader;
@@ -110,7 +111,7 @@ public class TestJsonReader extends BaseTestQuery {
                   "inner_3", mapOf()))
           .go();
     } finally {
-      test("alter session set `store.json.all_text_mode` = false");
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
     }
   }
 
@@ -655,7 +656,7 @@ public class TestJsonReader extends BaseTestQuery {
         .go();
 
     } finally {
-      testNoResult("alter session reset `store.json.all_text_mode`");
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
     }
   }
 
@@ -684,8 +685,8 @@ public class TestJsonReader extends BaseTestQuery {
         .go();
 
     } finally {
-      testNoResult("alter session reset `store.json.all_text_mode`");
-      testNoResult("alter session reset `exec.enable_union_type`");
+      resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE);
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 
@@ -743,7 +744,7 @@ public class TestJsonReader extends BaseTestQuery {
         .go();
 
     } finally {
-      testNoResult("alter session reset `exec.enable_union_type`");
+      resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY);
     }
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/prepare/TestLimit0VsRegularQueriesMetadata.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/prepare/TestLimit0VsRegularQueriesMetadata.java
index 99d5176..e140889 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/prepare/TestLimit0VsRegularQueriesMetadata.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/prepare/TestLimit0VsRegularQueriesMetadata.java
@@ -17,14 +17,14 @@
  */
 package org.apache.drill.exec.work.prepare;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.util.List;
+
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.junit.Test;
 
-import java.util.List;
-
 public class TestLimit0VsRegularQueriesMetadata extends PreparedStatementTestBase {
 
   @Test
@@ -64,7 +64,7 @@ public class TestLimit0VsRegularQueriesMetadata extends PreparedStatementTestBas
 
       verifyResults(query, expectedMetadata);
     } finally {
-      test("alter session reset `planner.enable_decimal_data_type`");
+      resetSessionOption("planner.enable_decimal_data_type");
     }
   }
 
@@ -308,7 +308,7 @@ public class TestLimit0VsRegularQueriesMetadata extends PreparedStatementTestBas
       verifyMetadata(expectedMetadata, createPrepareStmt(String.format("select * from (%s) t limit 0", query), false, null)
           .getColumnsList());
     } finally {
-      test("alter session reset `%s`", ExecConstants.EARLY_LIMIT0_OPT_KEY);
+      resetSessionOption(ExecConstants.EARLY_LIMIT0_OPT_KEY);
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BaseFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/BaseFixture.java
index 85e6bef..e8f51ea 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseFixture.java
@@ -22,9 +22,11 @@ import java.io.File;
 import org.apache.commons.io.FileUtils;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.memory.BufferAllocator;
-
+import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.shaded.guava.com.google.common.io.Files;
 
+import io.netty.buffer.DrillBuf;
+
 /**
  * Base class for "fixtures." Provides the basics such as the Drill
  * configuration, a memory allocator and so on.
@@ -58,4 +60,27 @@ public class BaseFixture {
 
   public BufferAllocator allocator() { return allocator; }
   public DrillConfig config() { return config; }
+
+  /**
+   * Party over enough memory that the uninitialized nature of
+   * vectors under the new writers will cause test to fail if
+   * the writer's don't correctly fill in all values.
+   */
+
+  public void dirtyMemory(int blockCount) {
+    dirtyMemory(allocator, blockCount);
+  }
+
+  public static void dirtyMemory(BufferAllocator allocator, int blockCount) {
+    DrillBuf bufs[] = new DrillBuf[blockCount];
+    for (int i = 0; i < bufs.length; i++) {
+      bufs[i] = allocator.buffer(ValueVector.MAX_BUFFER_SIZE);
+      for (int j = 0; j < ValueVector.MAX_BUFFER_SIZE / 4; j++) {
+        bufs[i].setInt(j * 4, 0xdeadbeef);
+      }
+    }
+    for (int i = 0; i < bufs.length; i++) {
+      bufs[i].close();
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
index dad5c46..4418f4d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/BaseTestQuery.java
@@ -70,6 +70,8 @@ import org.apache.drill.shaded.guava.com.google.common.io.Resources;
 import org.apache.drill.test.DrillTestWrapper.TestServices;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * deprecated Use {@link ClusterTest} instead.
@@ -78,7 +80,7 @@ import org.junit.BeforeClass;
  */
 //@Deprecated
 public class BaseTestQuery extends ExecTest {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
+  private static final Logger logger = LoggerFactory.getLogger(BaseTestQuery.class);
 
   private static final int MAX_WIDTH_PER_NODE = 2;
 
@@ -138,7 +140,7 @@ public class BaseTestQuery extends ExecTest {
           config = newConfig;
         }
         openClient(properties);
-      } catch(Exception e) {
+      } catch (Exception e) {
         throw new RuntimeException("Failure while updating the test Drillbit cluster.", e);
       }
     }
@@ -155,9 +157,9 @@ public class BaseTestQuery extends ExecTest {
   }
 
   protected static Properties cloneDefaultTestConfigProperties() {
-    final Properties props = new Properties();
+    Properties props = new Properties();
 
-    for(String propName : TEST_CONFIGURATIONS.stringPropertyNames()) {
+    for (String propName : TEST_CONFIGURATIONS.stringPropertyNames()) {
       props.put(propName, TEST_CONFIGURATIONS.getProperty(propName));
     }
 
@@ -182,11 +184,11 @@ public class BaseTestQuery extends ExecTest {
     dirTestWatcher.newDfsTestTmpDir();
 
     bits = new Drillbit[drillbitCount];
-    for(int i = 0; i < drillbitCount; i++) {
+    for (int i = 0; i < drillbitCount; i++) {
       bits[i] = new Drillbit(config, serviceSet, classpathScan);
       bits[i].run();
 
-      final StoragePluginRegistry pluginRegistry = bits[i].getContext().getStorage();
+      StoragePluginRegistry pluginRegistry = bits[i].getContext().getStorage();
       StoragePluginTestUtils.configureFormatPlugins(pluginRegistry);
 
       StoragePluginTestUtils.updateSchemaLocation(StoragePluginTestUtils.DFS_PLUGIN_NAME, pluginRegistry,
@@ -237,8 +239,8 @@ public class BaseTestQuery extends ExecTest {
    * executed after this method call use the new <i>client</i>.
    * @param user
    */
-  public static void updateClient(final String user, final String password) throws Exception {
-    final Properties props = new Properties();
+  public static void updateClient(String user, String password) throws Exception {
+    Properties props = new Properties();
     props.setProperty(DrillProperties.USER, user);
     if (password != null) {
       props.setProperty(DrillProperties.PASSWORD, password);
@@ -271,7 +273,7 @@ public class BaseTestQuery extends ExecTest {
     }
 
     @Override
-    public List<QueryDataBatch> testRunAndReturn(final QueryType type, final Object query) throws Exception {
+    public List<QueryDataBatch> testRunAndReturn(QueryType type, Object query) throws Exception {
       return BaseTestQuery.testRunAndReturn(type, query);
     }
   }
@@ -288,7 +290,7 @@ public class BaseTestQuery extends ExecTest {
     }
 
     if (bits != null) {
-      for(final Drillbit bit : bits) {
+      for (Drillbit bit : bits) {
         if (bit != null) {
           bit.close();
         }
@@ -313,7 +315,7 @@ public class BaseTestQuery extends ExecTest {
   }
 
   protected static void runSQL(String sql) throws Exception {
-    final AwaitableUserResultsListener listener = new AwaitableUserResultsListener(new SilentListener());
+    AwaitableUserResultsListener listener = new AwaitableUserResultsListener(new SilentListener());
     testWithListener(QueryType.SQL, sql, listener);
     listener.await();
   }
@@ -346,7 +348,7 @@ public class BaseTestQuery extends ExecTest {
     return client.executePreparedStatement(handle);
   }
 
-  public static int testRunAndPrint(final QueryType type, final String query) throws Exception {
+  public static int testRunAndPrint(QueryType type, String query) throws Exception {
     return QueryTestUtil.testRunAndLog(client, type, query);
   }
 
@@ -362,7 +364,7 @@ public class BaseTestQuery extends ExecTest {
     String valueStr = ClusterFixture.stringify(value);
     try {
       test("ALTER SESSION SET `%s` = %s", option, valueStr);
-    } catch(final Exception e) {
+    } catch (Exception e) {
       fail(String.format("Failed to set session option `%s` = %s, Error: %s",
           option, valueStr, e.toString()));
     }
@@ -371,18 +373,26 @@ public class BaseTestQuery extends ExecTest {
   public static void resetSessionOption(String option) {
     try {
       test("ALTER SESSION RESET `%s`", option);
-    } catch(final Exception e) {
+    } catch (Exception e) {
       fail(String.format("Failed to reset session option `%s`, Error: %s",
           option, e.toString()));
     }
   }
 
+  public static void resetAllSessionOptions() {
+    try {
+      test("ALTER SESSION RESET ALL");
+    } catch (Exception e) {
+      fail("Failed to reset all session options");
+    }
+  }
+
   protected static void testNoResult(int interation, String query, Object... args) throws Exception {
     query = String.format(query, args);
     logger.debug("Running query:\n--------------\n" + query);
     for (int i = 0; i < interation; i++) {
-      final List<QueryDataBatch> results = client.runQuery(QueryType.SQL, query);
-      for (final QueryDataBatch queryDataBatch : results) {
+      List<QueryDataBatch> results = client.runQuery(QueryType.SQL, query);
+      for (QueryDataBatch queryDataBatch : results) {
         queryDataBatch.release();
       }
     }
@@ -392,7 +402,7 @@ public class BaseTestQuery extends ExecTest {
     QueryTestUtil.testRunAndLog(client, String.format(query, args));
   }
 
-  public static void test(final String query) throws Exception {
+  public static void test(String query) throws Exception {
     QueryTestUtil.testRunAndLog(client, query);
   }
 
@@ -414,11 +424,11 @@ public class BaseTestQuery extends ExecTest {
    * @param testSqlQuery Test query
    * @param expectedErrorMsg Expected error message.
    */
-  protected static void errorMsgTestHelper(final String testSqlQuery, final String expectedErrorMsg) throws Exception {
+  protected static void errorMsgTestHelper(String testSqlQuery, String expectedErrorMsg) throws Exception {
     try {
       test(testSqlQuery);
       fail("Expected a UserException when running " + testSqlQuery);
-    } catch (final UserException actualException) {
+    } catch (UserException actualException) {
       try {
         assertThat("message of UserException when running " + testSqlQuery, actualException.getMessage(), containsString(expectedErrorMsg));
       } catch (AssertionError e) {
@@ -434,12 +444,12 @@ public class BaseTestQuery extends ExecTest {
    * the given message.
    * @param testSqlQuery Test query
    */
-  protected static void parseErrorHelper(final String testSqlQuery) throws Exception {
+  protected static void parseErrorHelper(String testSqlQuery) throws Exception {
     errorMsgTestHelper(testSqlQuery, UserBitShared.DrillPBError.ErrorType.PARSE.name());
   }
 
   public static String getFile(String resource) throws IOException{
-    final URL url = Resources.getResource(resource);
+    URL url = Resources.getResource(resource);
     if (url == null) {
       throw new IOException(String.format("Unable to find path %s.", resource));
     }
@@ -452,29 +462,29 @@ public class BaseTestQuery extends ExecTest {
    * @return the file path
    * @throws IOException
    */
-  public static String getPhysicalFileFromResource(final String resource) throws IOException {
-    final File file = File.createTempFile("tempfile", ".txt");
+  public static String getPhysicalFileFromResource(String resource) throws IOException {
+    File file = File.createTempFile("tempfile", ".txt");
     file.deleteOnExit();
-    final PrintWriter printWriter = new PrintWriter(file);
+    PrintWriter printWriter = new PrintWriter(file);
     printWriter.write(BaseTestQuery.getFile(resource));
     printWriter.close();
 
     return file.getPath();
   }
 
-  protected static void setSessionOption(final String option, final boolean value) {
+  protected static void setSessionOption(String option, boolean value) {
     alterSession(option, value);
   }
 
-  protected static void setSessionOption(final String option, final long value) {
+  protected static void setSessionOption(String option, long value) {
     alterSession(option, value);
   }
 
-  protected static void setSessionOption(final String option, final double value) {
+  protected static void setSessionOption(String option, double value) {
     alterSession(option, value);
   }
 
-  protected static void setSessionOption(final String option, final String value) {
+  protected static void setSessionOption(String option, String value) {
     alterSession(option, value);
   }
 
@@ -494,7 +504,7 @@ public class BaseTestQuery extends ExecTest {
 
     @Override
     public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
-      final int rows = result.getHeader().getRowCount();
+      int rows = result.getHeader().getRowCount();
       if (result.getData() != null) {
         count.addAndGet(rows);
       }
@@ -525,8 +535,8 @@ public class BaseTestQuery extends ExecTest {
 
   protected int logResult(List<QueryDataBatch> results) throws SchemaChangeException {
     int rowCount = 0;
-    final RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-    for(final QueryDataBatch result : results) {
+    RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+    for (QueryDataBatch result : results) {
       rowCount += result.getHeader().getRowCount();
       loader.load(result.getHeader().getDef(), result.getData());
       // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
@@ -538,17 +548,17 @@ public class BaseTestQuery extends ExecTest {
     return rowCount;
   }
 
-  protected int printResult(final List<QueryDataBatch> results) throws SchemaChangeException {
+  protected int printResult(List<QueryDataBatch> results) throws SchemaChangeException {
     int result = PrintingUtils.printAndThrow(() -> logResult(results));
     return result;
   }
 
   protected static String getResultString(List<QueryDataBatch> results, String delimiter)
       throws SchemaChangeException {
-    final StringBuilder formattedResults = new StringBuilder();
+    StringBuilder formattedResults = new StringBuilder();
     boolean includeHeader = true;
-    final RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-    for(final QueryDataBatch result : results) {
+    RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+    for (QueryDataBatch result : results) {
       loader.load(result.getHeader().getDef(), result.getData());
       if (loader.getRecordCount() <= 0) {
         continue;
@@ -587,23 +597,21 @@ public class BaseTestQuery extends ExecTest {
     }
 
     @Override public boolean equals(Object o) {
-      boolean result = false;
-
       if (this == o) {
-        result = true;
-      } else if (o instanceof TestResultSet) {
-        TestResultSet that = (TestResultSet) o;
-        assertEquals(this.size(), that.size());
-        for (int i = 0; i < this.rows.size(); i++) {
-          assertEquals(this.rows.get(i).size(), that.rows.get(i).size());
-          for (int j = 0; j < this.rows.get(i).size(); ++j) {
-            assertEquals(this.rows.get(i).get(j), that.rows.get(i).get(j));
-          }
+        return true;
+      }
+      if (! (o instanceof TestResultSet)) {
+        return false;
+      }
+      TestResultSet that = (TestResultSet) o;
+      assertEquals(this.size(), that.size());
+      for (int i = 0; i < this.rows.size(); i++) {
+        assertEquals(this.rows.get(i).size(), that.rows.get(i).size());
+        for (int j = 0; j < this.rows.get(i).size(); ++j) {
+          assertEquals(this.rows.get(i).get(j), that.rows.get(i).get(j));
         }
-        result = true;
       }
-
-      return result;
+      return true;
     }
 
     private void convert(List<QueryDataBatch> batches) throws SchemaChangeException {
@@ -627,4 +635,4 @@ public class BaseTestQuery extends ExecTest {
       }
     }
   }
- }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
index db1e023..d943d75 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -513,8 +513,9 @@ public class DrillTestWrapper {
   }
 
   /**
-   * Use this method only if necessary to validate one query against another. If you are just validating against a
-   * baseline file use one of the simpler interfaces that will write the validation query for you.
+   * Use this method only if necessary to validate one query against another. If
+   * you are just validating against a baseline file use one of the simpler
+   * interfaces that will write the validation query for you.
    *
    * @throws Exception
    */
@@ -544,7 +545,8 @@ public class DrillTestWrapper {
         checkColumnDef(loader.getSchema());
       }
 
-      // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
+      // If baseline data was not provided to the test builder directly, we must
+      // run a query for the baseline, this includes
       // the cases where the baseline is stored in a file.
       if (baselineRecords == null) {
         if (expectedNumRecords != DrillTestWrapper.EXPECTED_NUM_RECORDS_NOT_SET) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java b/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
index c93853f..ac06f63 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/SubOperatorTest.java
@@ -31,6 +31,7 @@ public class SubOperatorTest extends DrillTest {
   @BeforeClass
   public static void classSetup() throws Exception {
     fixture = OperatorFixture.standardFixture(dirTestWatcher);
+    fixture.dirtyMemory(10);
   }
 
   @AfterClass
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index 408b865..ffa0cc8 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -17,10 +17,6 @@
  */
 package org.apache.drill.exec.memory;
 
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.DrillBuf;
-import io.netty.buffer.UnsafeDirectLittleEndian;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -35,11 +31,16 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.ops.BufferManager;
 import org.apache.drill.exec.util.AssertionUtil;
-
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.DrillBuf;
+import io.netty.buffer.UnsafeDirectLittleEndian;
 
 public abstract class BaseAllocator extends Accountant implements BufferAllocator {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseAllocator.class);
+  private static final Logger logger = LoggerFactory.getLogger(BaseAllocator.class);
 
   public static final String DEBUG_ALLOCATOR = "drill.memory.debug.allocator";
 
@@ -465,7 +466,6 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
     return new Reservation();
   }
 
-
   @Override
   public synchronized void close() {
     /*
diff --git a/exec/vector/src/main/codegen/templates/ComplexWriters.java b/exec/vector/src/main/codegen/templates/ComplexWriters.java
index 434f618..3539c62 100644
--- a/exec/vector/src/main/codegen/templates/ComplexWriters.java
+++ b/exec/vector/src/main/codegen/templates/ComplexWriters.java
@@ -79,13 +79,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
     vector.clear();
   }
 
-  @Override
-  protected int idx() {
-    return super.idx();
-  }
-
   <#if mode == "Repeated">
-
   public void write(${minor.class?cap_first}Holder h) {
     mutator.addSafe(idx(), h);
     vector.getMutator().setValueCount(idx()+1);
@@ -101,8 +95,8 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
     mutator.addSafe(idx()<#list fields as field><#if field.include!true>, ${field.name}</#if></#list>);
     vector.getMutator().setValueCount(idx() + 1);
   }
+  
   </#if>
-
   <#if minor.class?contains("VarDecimal")>
   public void writeVarDecimal(BigDecimal value) {
     mutator.addSafe(idx(), value.unscaledValue().toByteArray());
@@ -114,10 +108,8 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
     super.setPosition(idx);
     mutator.startNewValue(idx);
   }
-
-
+  
   <#else>
-
   public void write(${minor.class}Holder h) {
     mutator.setSafe(idx(), h);
     vector.getMutator().setValueCount(idx()+1);
@@ -133,8 +125,8 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
     mutator.setNull(idx());
     vector.getMutator().setValueCount(idx() + 1);
   }
+  
   </#if>
-
   <#if !(minor.class == "Decimal9" || minor.class == "Decimal18")>
   public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>) {
     mutator.setSafe(idx()<#if mode == "Nullable">, 1</#if><#list fields as field><#if field.include!true>, ${field.name}</#if></#list>);
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
index 0278749..a188c40 100644
--- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
@@ -356,7 +356,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   }
 
   public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from) {
-    while(thisIndex >= getValueCapacity()) {
+    while (thisIndex >= getValueCapacity()) {
         reAlloc();
     }
     copyFrom(fromIndex, thisIndex, from);
@@ -616,7 +616,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     }
 
     public void setSafe(int index, <#if (type.width > 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
-      while(index >= getValueCapacity()) {
+      while (index >= getValueCapacity()) {
         reAlloc();
       }
       data.setBytes(index * VALUE_WIDTH, value, 0, VALUE_WIDTH);
@@ -631,7 +631,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     }
 
     public void setSafe(int index, int months, int days, int milliseconds) {
-      while(index >= getValueCapacity()) {
+      while (index >= getValueCapacity()) {
         reAlloc();
       }
       set(index, months, days, milliseconds);
@@ -661,7 +661,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     }
 
     public void setSafe(int index, int days, int milliseconds) {
-      while(index >= getValueCapacity()) {
+      while (index >= getValueCapacity()) {
         reAlloc();
       }
       set(index, days, milliseconds);
@@ -685,7 +685,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     <#elseif minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense">
 
     public void setSafe(int index, int start, DrillBuf buffer) {
-      while(index >= getValueCapacity()) {
+      while (index >= getValueCapacity()) {
         reAlloc();
       }
       set(index, start, buffer);
@@ -714,7 +714,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     }
 
     public void setSafe(int index, BigDecimal value) {
-      while(index >= getValueCapacity()) {
+      while (index >= getValueCapacity()) {
         reAlloc();
       }
       set(index, value);
@@ -757,7 +757,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
      * @param index item to write
      */
     public void setSafe(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) {
-      while(index >= getValueCapacity()) {
+      while (index >= getValueCapacity()) {
         reAlloc();
       }
       set(index, value);
@@ -768,7 +768,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     }
 
     public void setSafe(int index, ${minor.class}Holder holder) {
-      while(index >= getValueCapacity()) {
+      while (index >= getValueCapacity()) {
         reAlloc();
       }
       set(index, holder);
@@ -779,7 +779,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     }
 
     public void setSafe(int index, Nullable${minor.class}Holder holder) {
-      while(index >= getValueCapacity()) {
+      while (index >= getValueCapacity()) {
         reAlloc();
       }
       set(index, holder);
@@ -817,7 +817,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     public void setValueCount(int valueCount) {
       final int currentValueCapacity = getValueCapacity();
       final int idx = VALUE_WIDTH * valueCount;
-      while(valueCount > getValueCapacity()) {
+      while (valueCount > getValueCapacity()) {
         reAlloc();
       }
       if (valueCount > 0 && currentValueCapacity > valueCount * 2) {
@@ -958,7 +958,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
         return; // NOOP
       }
 
-      while((currentIdx + numElements -1) >= parent.getValueCapacity()) {
+      while ((currentIdx + numElements -1) >= parent.getValueCapacity()) {
         parent.reAlloc();
       }
 
diff --git a/exec/vector/src/main/codegen/templates/ListWriters.java b/exec/vector/src/main/codegen/templates/ListWriters.java
index 866ac90..e2d84cf 100644
--- a/exec/vector/src/main/codegen/templates/ListWriters.java
+++ b/exec/vector/src/main/codegen/templates/ListWriters.java
@@ -38,7 +38,6 @@ package org.apache.drill.exec.vector.complex.impl;
 /*
  * This class is generated using FreeMarker and the ${.template_name} template.
  */
-@SuppressWarnings("unused")
 public class ${mode}ListWriter extends AbstractFieldWriter {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${mode}ListWriter.class);
 
diff --git a/exec/vector/src/main/codegen/templates/MapWriters.java b/exec/vector/src/main/codegen/templates/MapWriters.java
index 64be1bf..4822b24 100644
--- a/exec/vector/src/main/codegen/templates/MapWriters.java
+++ b/exec/vector/src/main/codegen/templates/MapWriters.java
@@ -59,7 +59,7 @@ public class ${className} extends AbstractFieldWriter {
 
   protected final ${containerClass} container;
   <#if mode == "Repeated">protected<#else>private</#if> final Map<String, FieldWriter> fields = new HashMap<>();
-  <#if mode == "Repeated">protected int currentChildIndex = 0;</#if>
+  <#if mode == "Repeated">protected int currentChildIndex;</#if>
 
   private final boolean unionEnabled;
 
@@ -91,15 +91,15 @@ public class ${className} extends AbstractFieldWriter {
   @Override
   public MapWriter map(String name) {
     FieldWriter writer = fields.get(name.toLowerCase());
-    if(writer == null){
-      int vectorCount=container.size();
+    if (writer == null) {
+      int vectorCount = container.size();
         MapVector vector = container.addOrGet(name, MapVector.TYPE, MapVector.class);
-      if(!unionEnabled){
+      if (!unionEnabled) {
         writer = new SingleMapWriter(vector, this);
       } else {
         writer = new PromotableWriter(vector, container);
       }
-      if(vectorCount != container.size()) {
+      if (vectorCount != container.size()) {
         writer.allocate();
       }
       writer.setPosition(${index});
@@ -115,7 +115,7 @@ public class ${className} extends AbstractFieldWriter {
       int vectorCount = container.size();
       UnionVector vector = container.addOrGet(name, Types.optional(MinorType.UNION), UnionVector.class);
       writer = new UnionVectorWriter(vector, this);
-      if(vectorCount != container.size()) {
+      if (vectorCount != container.size()) {
         writer.allocate();
       }
       writer.setPosition(${index});
@@ -134,7 +134,7 @@ public class ${className} extends AbstractFieldWriter {
       writer = new SingleDictWriter(vector, this);
 
       fields.put(name.toLowerCase(), writer);
-      if(vectorCount != container.size()) {
+      if (vectorCount != container.size()) {
         writer.allocate();
       }
       writer.setPosition(${index});
@@ -168,9 +168,9 @@ public class ${className} extends AbstractFieldWriter {
   public ListWriter list(String name) {
     FieldWriter writer = fields.get(name.toLowerCase());
     int vectorCount = container.size();
-    if(writer == null) {
-      if (!unionEnabled){
-        writer = new SingleListWriter(name,container,this);
+    if (writer == null) {
+      if (!unionEnabled) {
+        writer = new SingleListWriter(name, container, this);
       } else{
         writer = new PromotableWriter(container.addOrGet(name, Types.optional(MinorType.LIST), ListVector.class), container);
       }
@@ -231,10 +231,10 @@ public class ${className} extends AbstractFieldWriter {
   public ${minor.class}Writer ${lowerName}(String name) {
   </#if>
     FieldWriter writer = fields.get(name.toLowerCase());
-    if(writer == null) {
+    if (writer == null) {
       ValueVector vector;
       ValueVector currentVector = container.getChild(name);
-      if (unionEnabled){
+      if (unionEnabled) {
         ${vectName}Vector v = container.addOrGet(name, ${upperName}_TYPE, ${vectName}Vector.class);
         writer = new PromotableWriter(v, container);
         vector = v;
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index d2bfb87..29cbe39 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -93,9 +93,9 @@ public final class ${className} extends BaseDataValueVector implements <#if type
 
   @Override
   public DrillBuf[] getBuffers(boolean clear) {
-    final DrillBuf[] buffers = ObjectArrays.concat(bits.getBuffers(false), values.getBuffers(false), DrillBuf.class);
+    DrillBuf[] buffers = ObjectArrays.concat(bits.getBuffers(false), values.getBuffers(false), DrillBuf.class);
     if (clear) {
-      for (final DrillBuf buffer:buffers) {
+      for (DrillBuf buffer:buffers) {
         buffer.retain(1);
       }
       clear();
@@ -123,7 +123,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
   }
 
   @Override
-  public int getBufferSizeFor(final int valueCount) {
+  public int getBufferSizeFor(int valueCount) {
     if (valueCount == 0) {
       return 0;
     }
@@ -188,6 +188,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     } finally {
       if (!success) {
         clear();
+        return false;
       }
     }
     bits.zeroVector();
@@ -291,12 +292,12 @@ public final class ${className} extends BaseDataValueVector implements <#if type
   public void load(SerializedField metadata, DrillBuf buffer) {
     clear();
     // the bits vector is the first child (the order in which the children are added in getMetadataBuilder is significant)
-    final SerializedField bitsField = metadata.getChild(0);
+    SerializedField bitsField = metadata.getChild(0);
     bits.load(bitsField, buffer);
 
-    final int capacity = buffer.capacity();
-    final int bitsLength = bitsField.getBufferLength();
-    final SerializedField valuesField = metadata.getChild(1);
+    int capacity = buffer.capacity();
+    int bitsLength = bitsField.getBufferLength();
+    SerializedField valuesField = metadata.getChild(1);
     values.load(valuesField, buffer.slice(bitsLength, capacity - bitsLength));
   }
 
@@ -318,7 +319,6 @@ public final class ${className} extends BaseDataValueVector implements <#if type
   public void transferTo(Nullable${minor.class}Vector target) {
     bits.transferTo(target.bits);
     values.transferTo(target.values);
-    target.mutator.setCount = mutator.setCount;
     <#if type.major == "VarLen">
     target.mutator.lastSet = mutator.lastSet;
     </#if>
@@ -394,7 +394,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
   }
 
   public void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from) {
-    final Accessor fromAccessor = from.getAccessor();
+    Accessor fromAccessor = from.getAccessor();
     if (!fromAccessor.isNull(fromIndex)) {
       mutator.set(thisIndex, fromAccessor.get(fromIndex));
     }
@@ -419,6 +419,10 @@ public final class ${className} extends BaseDataValueVector implements <#if type
   @Override
   public void copyEntry(int toIndex, ValueVector from, int fromIndex) {
     <#if type.major == "VarLen">
+
+    // Called by HashJoinBatch for OUTER joins; may skip values,
+    // so must fill empties.
+
     mutator.fillEmpties(toIndex);
     </#if>
 
@@ -430,12 +434,6 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     }
 
     Nullable${minor.class}Vector fromVector = (Nullable${minor.class}Vector) from;
-    <#if type.major == "VarLen">
-
-    // This method is to be called only for loading the vector
-    // sequentially, so there should be no empties to fill.
-
-    </#if>
     bits.copyFromSafe(fromIndex, toIndex, fromVector.bits);
     values.copyFromSafe(fromIndex, toIndex, fromVector.values);
   }
@@ -448,13 +446,6 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     mutator.exchange(other.getMutator());
   }
 
-  @Override
-  public void finalizeLastSet(int count) {
-    <#if type.major = "VarLen">
-    mutator.lastSet = count;
-    </#if>
-  }
-
   <#if type.major != "VarLen">
   @Override
   public void toNullable(ValueVector nullableVector) {
@@ -538,8 +529,9 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     public void reset() {}
   }
 
-  public final class Mutator extends BaseDataValueVector.BaseMutator implements NullableVectorDefinitionSetter<#if type.major = "VarLen">, VariableWidthVector.VariableWidthMutator</#if> {
-    private int setCount;
+  public final class Mutator extends BaseDataValueVector.BaseMutator
+      implements NullableVectorDefinitionSetter<#if type.major = "VarLen">, VariableWidthVector.VariableWidthMutator</#if>,
+                 NullableVector.Mutator {
     <#if type.major = "VarLen">private int lastSet = -1;</#if>
 
     private Mutator() { }
@@ -573,9 +565,8 @@ public final class ${className} extends BaseDataValueVector implements <#if type
      */
 
     public void set(int index, <#if type.major == "VarLen">byte[]<#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) {
-      setCount++;
-      final ${valuesName}.Mutator valuesMutator = values.getMutator();
-      final UInt1Vector.Mutator bitsMutator = bits.getMutator();
+      ${valuesName}.Mutator valuesMutator = values.getMutator();
+      UInt1Vector.Mutator bitsMutator = bits.getMutator();
       <#if type.major == "VarLen">
       valuesMutator.fillEmpties(lastSet, index);
       </#if>
@@ -585,12 +576,24 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     }
 
     <#if type.major == "VarLen">
-    private void fillEmpties(int index) {
-      final ${valuesName}.Mutator valuesMutator = values.getMutator();
-      valuesMutator.fillEmpties(lastSet, index+1);
+    /**
+     * Fill in missing values up to, but not including, the given
+     * index.
+     * 
+     * @param index the index about to be written, or the total
+     * vector length about to be set
+     */
+    
+    @VisibleForTesting
+    protected void fillEmpties(int index) {
+      values.getMutator().fillEmpties(lastSet, index);
       while (index > bits.getValueCapacity()) {
         bits.reAlloc();
       }
+      
+      // Set last set to the given index; which the caller
+      // will write to
+      
       lastSet = index;
     }
 
@@ -607,7 +610,6 @@ public final class ${className} extends BaseDataValueVector implements <#if type
 
       bits.getMutator().setSafe(index, 1);
       values.getMutator().setSafe(index, value, start, length);
-      setCount++;
       lastSet = index;
     }
 
@@ -618,7 +620,6 @@ public final class ${className} extends BaseDataValueVector implements <#if type
 
       bits.getMutator().setSafe(index, 1);
       values.getMutator().setSafe(index, value, start, length);
-      setCount++;
       lastSet = index;
     }
 
@@ -636,7 +637,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     }
 
     public void set(int index, Nullable${minor.class}Holder holder) {
-      final ${valuesName}.Mutator valuesMutator = values.getMutator();
+      ${valuesName}.Mutator valuesMutator = values.getMutator();
       <#if type.major == "VarLen">
       valuesMutator.fillEmpties(lastSet, index);
       </#if>
@@ -646,7 +647,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     }
 
     public void set(int index, ${minor.class}Holder holder) {
-      final ${valuesName}.Mutator valuesMutator = values.getMutator();
+      ${valuesName}.Mutator valuesMutator = values.getMutator();
       <#if type.major == "VarLen">
       valuesMutator.fillEmpties(lastSet, index);
       </#if>
@@ -661,7 +662,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
 
     <#assign fields = minor.fields!type.fields />
     public void set(int index, int isSet<#list fields as field><#if field.include!true >, ${field.type} ${field.name}Field</#if></#list> ) {
-      final ${valuesName}.Mutator valuesMutator = values.getMutator();
+      ${valuesName}.Mutator valuesMutator = values.getMutator();
       <#if type.major == "VarLen">
       valuesMutator.fillEmpties(lastSet, index);
       </#if>
@@ -678,7 +679,6 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       </#if>
       bits.getMutator().setSafe(index, isSet);
       values.getMutator().setSafe(index<#list fields as field><#if field.include!true >, ${field.name}Field</#if></#list>);
-      setCount++;
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
 
@@ -690,7 +690,6 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       </#if>
       bits.getMutator().setSafe(index, value.isSet);
       values.getMutator().setSafe(index, value);
-      setCount++;
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
 
@@ -702,7 +701,6 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       </#if>
       bits.getMutator().setSafe(index, 1);
       values.getMutator().setSafe(index, value);
-      setCount++;
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
 
@@ -715,7 +713,6 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       </#if>
       bits.getMutator().setSafe(index, 1);
       values.getMutator().setSafe(index, value);
-      setCount++;
     }
 
     </#if>
@@ -728,7 +725,6 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       </#if>
       bits.getMutator().set(index, 1);
       values.getMutator().set(index, value);
-      setCount++;
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
 
@@ -740,7 +736,6 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       </#if>
       bits.getMutator().setSafe(index, 1);
       values.getMutator().setSafe(index, value);
-      setCount++;
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
 
@@ -750,6 +745,10 @@ public final class ${className} extends BaseDataValueVector implements <#if type
       assert valueCount >= 0;
       <#if type.major == "VarLen">
       fillEmpties(valueCount);
+      // fillEmpties assumes we will write to the valueCount
+      // position, but we've actually only written the previous
+      // value.
+      lastSet = valueCount - 1;
       </#if>
       values.getMutator().setValueCount(valueCount);
       bits.getMutator().setValueCount(valueCount);
@@ -769,10 +768,10 @@ public final class ${className} extends BaseDataValueVector implements <#if type
 
       /** {@inheritDoc} */
       @Override
-      public void onNewBulkEntry(final VarLenBulkEntry entry) {
-        final int[] lengths = entry.getValuesLength();
-        final ByteBuffer buffer = bitsMutator.getByteBuffer();
-        final byte[] bufferArray = buffer.array();
+      public void onNewBulkEntry(VarLenBulkEntry entry) {
+        int[] lengths = entry.getValuesLength();
+        ByteBuffer buffer = bitsMutator.getByteBuffer();
+        byte[] bufferArray = buffer.array();
         int remaining = entry.getNumValues();
         int srcPos = 0;
 
@@ -783,17 +782,16 @@ public final class ${className} extends BaseDataValueVector implements <#if type
             bitsMutator.flush();
           }
 
-          final int toCopy      = Math.min(remaining, buffer.remaining());
-          final int startTgtPos = buffer.position();
-          final int maxTgtPos   = startTgtPos + toCopy;
+          int toCopy      = Math.min(remaining, buffer.remaining());
+          int startTgtPos = buffer.position();
+          int maxTgtPos   = startTgtPos + toCopy;
 
           if (entry.hasNulls()) {
             for (int idx = startTgtPos; idx < maxTgtPos; idx++) {
-              final int valLen = lengths[srcPos++];
+              int valLen = lengths[srcPos++];
 
               if (valLen >= 0) {
                 bufferArray[idx] = 1;
-                ++setCount;
               } else {
                 // This is a null entry
                 bufferArray[idx] = 0;
@@ -803,7 +801,6 @@ public final class ${className} extends BaseDataValueVector implements <#if type
             for (int idx = startTgtPos; idx < maxTgtPos; idx++) {
               bufferArray[idx] = 1;
             }
-            setCount += toCopy;
           }
 
           // Update counters
@@ -844,29 +841,35 @@ public final class ${className} extends BaseDataValueVector implements <#if type
 
     @Override
     public void reset() {
-      setCount = 0;
       <#if type.major = "VarLen">lastSet = -1;</#if>
     }
 
     <#if type.major = "VarLen">
     @VisibleForTesting
     public int getLastSet() { return lastSet; }
-    
+
     </#if>
+    @Override
+    public void setSetCount(int n) {
+      <#if type.major = "VarLen">lastSet = n - 1;</#if>
+    }
+    
     // For nullable vectors, exchanging buffers (done elsewhere)
     // requires also exchanging mutator state (done here.)
 
     @Override
     public void exchange(ValueVector.Mutator other) {
-      final Mutator target = (Mutator) other;
-      int temp = setCount;
-      setCount = target.setCount;
-      target.setCount = temp;
+      <#if type.major == "VarLen">
+      Mutator target = (Mutator) other;
+      int temp = lastSet;
+      lastSet = target.lastSet;
+      target.lastSet = temp;
+      </#if>
     }
 
     public void fromNotNullable(${minor.class}Vector srce) {
       clear();
-      final int valueCount = srce.getAccessor().getValueCount();
+      int valueCount = srce.getAccessor().getValueCount();
 
       // Create a new bits vector, all values non-null
 
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index 1bb758f..dc26b47 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -96,12 +96,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   }
 
   @Override
-  public int getBufferSizeFor(final int valueCount) {
+  public int getBufferSizeFor(int valueCount) {
     if (valueCount == 0) {
       return 0;
     }
 
-    final int idx = offsetVector.getAccessor().get(valueCount);
+    int idx = offsetVector.getAccessor().get(valueCount);
     return offsetVector.getBufferSizeFor(valueCount + 1) + idx;
   }
 
@@ -131,7 +131,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
    * @return
    */
   public int getVarByteLength(){
-    final int valueCount = getAccessor().getValueCount();
+    int valueCount = getAccessor().getValueCount();
     if(valueCount == 0) {
       return 0;
     }
@@ -150,11 +150,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   @Override
   public void load(SerializedField metadata, DrillBuf buffer) {
     // the bits vector is the first child (the order in which the children are added in getMetadataBuilder is significant)
-    final SerializedField offsetField = metadata.getChild(0);
+    SerializedField offsetField = metadata.getChild(0);
     offsetVector.load(offsetField, buffer);
 
-    final int capacity = buffer.capacity();
-    final int offsetsLength = offsetField.getBufferLength();
+    int capacity = buffer.capacity();
+    int offsetsLength = offsetField.getBufferLength();
     data = buffer.slice(offsetsLength, capacity - offsetsLength);
     data.retain();
   }
@@ -167,10 +167,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
 
   @Override
   public DrillBuf[] getBuffers(boolean clear) {
-    final DrillBuf[] buffers = ObjectArrays.concat(offsetVector.getBuffers(false), super.getBuffers(false), DrillBuf.class);
+    DrillBuf[] buffers = ObjectArrays.concat(offsetVector.getBuffers(false), super.getBuffers(false), DrillBuf.class);
     if (clear) {
       // does not make much sense but we have to retain buffers even when clear is set. refactor this interface.
-      for (final DrillBuf buffer:buffers) {
+      for (DrillBuf buffer:buffers) {
         buffer.retain(1);
       }
       clear();
@@ -212,12 +212,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
 
   public void splitAndTransferTo(int startIndex, int length, ${minor.class}Vector target) {
     UInt${type.width}Vector.Accessor offsetVectorAccessor = this.offsetVector.getAccessor();
-    final int startPoint = offsetVectorAccessor.get(startIndex);
-    final int sliceLength = offsetVectorAccessor.get(startIndex + length) - startPoint;
+    int startPoint = offsetVectorAccessor.get(startIndex);
+    int sliceLength = offsetVectorAccessor.get(startIndex + length) - startPoint;
     target.clear();
     target.offsetVector.allocateNew(length + 1);
     offsetVectorAccessor = this.offsetVector.getAccessor();
-    final UInt4Vector.Mutator targetOffsetVectorMutator = target.offsetVector.getMutator();
+    UInt4Vector.Mutator targetOffsetVectorMutator = target.offsetVector.getMutator();
     for (int i = 0; i < length + 1; i++) {
       targetOffsetVectorMutator.set(i, offsetVectorAccessor.get(startIndex + i) - startPoint);
     }
@@ -226,30 +226,29 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
 }
 
   protected void copyFrom(int fromIndex, int thisIndex, ${minor.class}Vector from){
-    final UInt4Vector.Accessor fromOffsetVectorAccessor = from.offsetVector.getAccessor();
-    final int start = fromOffsetVectorAccessor.get(fromIndex);
-    final int end = fromOffsetVectorAccessor.get(fromIndex + 1);
-    final int len = end - start;
+    UInt4Vector.Accessor fromOffsetVectorAccessor = from.offsetVector.getAccessor();
+    int start = fromOffsetVectorAccessor.get(fromIndex);
+    int end = fromOffsetVectorAccessor.get(fromIndex + 1);
+    int len = end - start;
 
-    final int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(thisIndex * ${type.width});
+    int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(thisIndex * ${type.width});
     from.data.getBytes(start, data, outputStart, len);
     offsetVector.data.set${(minor.javaType!type.javaType)?cap_first}( (thisIndex+1) * ${type.width}, outputStart + len);
   }
 
-  public boolean copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){
-    final UInt${type.width}Vector.Accessor fromOffsetVectorAccessor = from.offsetVector.getAccessor();
-    final int start = fromOffsetVectorAccessor.get(fromIndex);
-    final int end =   fromOffsetVectorAccessor.get(fromIndex + 1);
-    final int len = end - start;
-    final int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(thisIndex * ${type.width});
+  public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){
+    UInt${type.width}Vector.Accessor fromOffsetVectorAccessor = from.offsetVector.getAccessor();
+    int start = fromOffsetVectorAccessor.get(fromIndex);
+    int end =   fromOffsetVectorAccessor.get(fromIndex + 1);
+    int len = end - start;
+    int outputStart = offsetVector.getAccessor().get(thisIndex);
 
     while(data.capacity() < outputStart + len) {
-        reAlloc();
+      reAlloc();
     }
 
     offsetVector.getMutator().setSafe(thisIndex + 1, outputStart + len);
     from.data.getBytes(start, data, outputStart, len);
-    return true;
   }
 
   @Override
@@ -313,8 +312,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   }
 
   @Override
-  public void setInitialCapacity(final int valueCount) {
-    final long size = 1L * valueCount * ${type.width};
+  public void setInitialCapacity(int valueCount) {
+    long size = 1L * valueCount * ${type.width};
     if (size > MAX_ALLOCATION_SIZE) {
       throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size");
     }
@@ -351,7 +350,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
      * clear all the memory that we allocated
      */
     try {
-      final int requestedSize = (int)curAllocationSize;
+      int requestedSize = (int)curAllocationSize;
       data = allocator.buffer(requestedSize);
       allocationSizeInBytes = requestedSize;
       offsetVector.allocateNew();
@@ -407,7 +406,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
 
   @Override
   public DrillBuf reallocRaw(int newAllocationSize) {
-    final DrillBuf newBuf = allocator.buffer(newAllocationSize);
+    DrillBuf newBuf = allocator.buffer(newAllocationSize);
     newBuf.setBytes(0, data, 0, data.capacity());
     data.release();
     data = newBuf;
@@ -458,17 +457,17 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
 
     public byte[] get(int index) {
       assert index >= 0;
-      final int startIdx = oAccessor.get(index);
-      final int length = oAccessor.get(index + 1) - startIdx;
+      int startIdx = oAccessor.get(index);
+      int length = oAccessor.get(index + 1) - startIdx;
       assert length >= 0;
-      final byte[] dst = new byte[length];
+      byte[] dst = new byte[length];
       data.getBytes(startIdx, dst, 0, length);
       return dst;
     }
 
     @Override
     public int getValueLength(int index) {
-      final UInt${type.width}Vector.Accessor offsetVectorAccessor = offsetVector.getAccessor();
+      UInt${type.width}Vector.Accessor offsetVectorAccessor = offsetVector.getAccessor();
       return offsetVectorAccessor.get(index + 1) - offsetVectorAccessor.get(index);
     }
 
@@ -601,7 +600,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
      */
     protected void set(int index, byte[] bytes) {
       assert index >= 0;
-      final int currentOffset = offsetVector.getAccessor().get(index);
+      int currentOffset = offsetVector.getAccessor().get(index);
       offsetVector.getMutator().set(index + 1, currentOffset + bytes.length);
       data.setBytes(currentOffset, bytes, 0, bytes.length);
     }
@@ -609,7 +608,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     public void setSafe(int index, byte[] bytes) {
       assert index >= 0;
 
-      final int currentOffset = offsetVector.getAccessor().get(index);
+      int currentOffset = offsetVector.getAccessor().get(index);
       while (data.capacity() < currentOffset + bytes.length) {
         reAlloc();
       }
@@ -673,7 +672,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
      */
     protected void set(int index, byte[] bytes, int start, int length) {
       assert index >= 0;
-      final int currentOffset = offsetVector.getAccessor().get(index);
+      int currentOffset = offsetVector.getAccessor().get(index);
       offsetVector.getMutator().set(index + 1, currentOffset + length);
       data.setBytes(currentOffset, bytes, start, length);
     }
@@ -681,7 +680,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     public void setSafe(int index, ByteBuffer bytes, int start, int length) {
       assert index >= 0;
 
-      final int currentOffset = offsetVector.getAccessor().get(index);
+      int currentOffset = offsetVector.getAccessor().get(index);
 
       while (data.capacity() < currentOffset + length) {
         reAlloc();
@@ -693,7 +692,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     public void setSafe(int index, byte[] bytes, int start, int length) {
       assert index >= 0;
 
-      final int currentOffset = offsetVector.getAccessor().get(index);
+      int currentOffset = offsetVector.getAccessor().get(index);
 
       while (data.capacity() < currentOffset + length) {
         reAlloc();
@@ -704,7 +703,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
 
     @Override
     public void setValueLengthSafe(int index, int length) {
-      final int offset = offsetVector.getAccessor().get(index);
+      int offset = offsetVector.getAccessor().get(index);
       while(data.capacity() < offset + length ) {
         reAlloc();
       }
@@ -712,8 +711,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     }
 
     public void setSafe(int index, int start, int end, DrillBuf buffer) {
-      final int len = end - start;
-      final int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
+      int len = end - start;
+      int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
 
       while (data.capacity() < outputStart + len) {
         reAlloc();
@@ -725,11 +724,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     public void setSafe(int index, Nullable${minor.class}Holder holder) {
       assert holder.isSet == 1;
 
-      final int start = holder.start;
-      final int end =   holder.end;
-      final int len = end - start;
+      int start = holder.start;
+      int end =   holder.end;
+      int len = end - start;
 
-      final int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
+      int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
 
       while (data.capacity() < outputStart + len) {
         reAlloc();
@@ -751,10 +750,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     </#if>
 
     public void setSafe(int index, ${minor.class}Holder holder) {
-      final int start = holder.start;
-      final int end =   holder.end;
-      final int len = end - start;
-      final int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
+      int start = holder.start;
+      int end =   holder.end;
+      int len = end - start;
+      int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
 
       while(data.capacity() < outputStart + len) {
         reAlloc();
@@ -764,46 +763,59 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     }
 
     /**
-     * Backfill missing offsets from the given last written position to the
-     * given current write position. Used by the "new" size-safe column
-     * writers to allow skipping values. The <tt>set()</tt> and <tt>setSafe()</tt>
-     * <b>do not</b> fill empties. See DRILL-5529.
-     * @param lastWrite the position of the last valid write: the offset
-     * to be copied forward
-     * @param index the current write position filling occurs up to,
-     * but not including, this position
+     * Backfill missing offsets from the given last written position up to, but
+     * not including the given current write position. Used by nullable
+     * vectors to allow skipping values. The <tt>set()</tt> and
+     * <tt>setSafe()</tt> <b>do not</b> fill empties. See DRILL-5529.
+     * 
+     * @param lastWrite
+     *          the position of the last valid write: the offset to be copied
+     *          forward
+     * @param index
+     *          the current write position filling occurs up to, but not
+     *          including, this position
      */
 
     public void fillEmpties(int lastWrite, int index) {
+
       // If last write was 2, offsets are [0, 3, 6]
       // If next write is 4, offsets must be: [0, 3, 6, 6, 6]
       // Remember the offsets are one more than row count.
 
-      final int fillOffset = offsetVector.getAccessor().get(lastWrite+1);
-      final UInt4Vector.Mutator offsetMutator = offsetVector.getMutator();
-      for (int i = lastWrite; i < index; i++) {
-        offsetMutator.setSafe(i + 1, fillOffset);
+      int startWrite = lastWrite + 1;
+      if (startWrite < index) {
+
+        // Don't access the offset vector if nothing to fill.
+        // This handles the special case of a zero-size batch
+        // in which the 0th position of the offset vector does
+        // not even exist.
+
+        int fillOffset = offsetVector.getAccessor().get(startWrite);
+        UInt4Vector.Mutator offsetMutator = offsetVector.getMutator();
+        for (int i = startWrite; i < index; i++) {
+          offsetMutator.setSafe(i + 1, fillOffset);
+        }
       }
     }
 
     protected void set(int index, int start, int length, DrillBuf buffer){
       assert index >= 0;
-      final int currentOffset = offsetVector.getAccessor().get(index);
+      int currentOffset = offsetVector.getAccessor().get(index);
       offsetVector.getMutator().set(index + 1, currentOffset + length);
-      final DrillBuf bb = buffer.slice(start, length);
+      DrillBuf bb = buffer.slice(start, length);
       data.setBytes(currentOffset, bb);
     }
 
     protected void set(int index, Nullable${minor.class}Holder holder){
-      final int length = holder.end - holder.start;
-      final int currentOffset = offsetVector.getAccessor().get(index);
+      int length = holder.end - holder.start;
+      int currentOffset = offsetVector.getAccessor().get(index);
       offsetVector.getMutator().set(index + 1, currentOffset + length);
       data.setBytes(currentOffset, holder.buffer, holder.start, length);
     }
 
     protected void set(int index, ${minor.class}Holder holder){
-      final int length = holder.end - holder.start;
-      final int currentOffset = offsetVector.getAccessor().get(index);
+      int length = holder.end - holder.start;
+      int currentOffset = offsetVector.getAccessor().get(index);
       offsetVector.getMutator().set(index + 1, currentOffset + length);
       data.setBytes(currentOffset, holder.buffer, holder.start, length);
     }
@@ -811,28 +823,35 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     /**
      * <h4>Notes on Usage</h4>
      * <p>
-     * For {@link VariableWidthVector}s this method can be used in the following cases:
+     * For {@link VariableWidthVector}s this method can be used in the following
+     * cases:
      * <ul>
-     *   <li>Setting the actual number of elements currently contained in the vector.</li>
-     *   <li>Trimming the vector to have fewer elements than it current does.</li>
+     * <li>Setting the actual number of elements currently contained in the
+     * vector.</li>
+     * <li>Trimming the vector to have fewer elements than it current does.</li>
      * </ul>
      * </p>
      * <h4>Caveats</h4>
      * <p>
-     *   It is important to note that for {@link org.apache.drill.exec.vector.FixedWidthVector}s this method can also be used to expand the vector.
-     *   However, {@link VariableWidthVector} do not support this usage and this method will throw an {@link IndexOutOfBoundsException} if you attempt
-     *   to use it in this way. Expansion of valueCounts is not supported mainly because there is no benefit, since you would still have to rely on the setSafe
-     *   methods to appropriatly expand the data buffer and populate the vector anyway (since by definition we do not know the width of elements). See DRILL-6234 for details.
+     * It is important to note that for
+     * {@link org.apache.drill.exec.vector.FixedWidthVector}s this method can
+     * also be used to expand the vector. However, {@link VariableWidthVector}
+     * do not support this usage and this method will throw an
+     * {@link IndexOutOfBoundsException} if you attempt to use it in this way.
+     * Expansion of valueCounts is not supported mainly because there is no
+     * benefit, since you would still have to rely on the setSafe methods to
+     * appropriately expand the data buffer and populate the vector anyway
+     * (since by definition we do not know the width of elements). See
+     * DRILL-6234 for details.
      * </p>
-     * <h4>Method Documentation</h4>
-     * {@inheritDoc}
+     * <h4>Method Documentation</h4> {@inheritDoc}
      */
     @Override
     public void setValueCount(int valueCount) {
-      final int currentByteCapacity = getByteCapacity();
+      int currentByteCapacity = getByteCapacity();
       // Check if valueCount to be set is zero and current capacity is also zero. If yes then
       // we should not call get to read start index from offset vector at that value count.
-      final int idx = (valueCount == 0 && currentByteCapacity == 0)
+      int idx = (valueCount == 0 && currentByteCapacity == 0)
         ? 0
         : offsetVector.getAccessor().get(valueCount);
       data.writerIndex(idx);
@@ -841,7 +860,6 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       } else if (allocationMonitor > 0) {
         allocationMonitor = 0;
       }
-      VectorTrimmer.trim(data, idx);
       offsetVector.getMutator().setValueCount(valueCount == 0 ? 0 : valueCount+1);
     }
 
@@ -850,14 +868,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       boolean even = true;
       <#switch minor.class>
       <#case "Var16Char">
-      final java.nio.charset.Charset charset = Charsets.UTF_16;
+      java.nio.charset.Charset charset = Charsets.UTF_16;
       <#break>
       <#case "VarChar">
       <#default>
-      final java.nio.charset.Charset charset = Charsets.UTF_8;
+      java.nio.charset.Charset charset = Charsets.UTF_8;
       </#switch>
-      final byte[] evenValue = new String("aaaaa").getBytes(charset);
-      final byte[] oddValue = new String("bbbbbbbbbb").getBytes(charset);
+      byte[] evenValue = new String("aaaaa").getBytes(charset);
+      byte[] oddValue = new String("bbbbbbbbbb").getBytes(charset);
       for(int i =0; i < size; i++, even = !even){
         set(i, even ? evenValue : oddValue);
         }
@@ -991,8 +1009,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
 
     private void setOffsets(int[] lengths, int numValues, boolean hasNulls) {
       // We need to compute source offsets using the current larget offset and the value length array.
-      final ByteBuffer offByteBuff = offsetsMutator.getByteBuffer();
-      final byte[] bufferArray = offByteBuff.array();
+      ByteBuffer offByteBuff = offsetsMutator.getByteBuffer();
+      byte[] bufferArray = offByteBuff.array();
       int remaining = numValues;
       int srcPos = 0;
 
@@ -1001,7 +1019,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
           offsetsMutator.flush();
         }
 
-        final int toCopy  = Math.min(remaining, offByteBuff.remaining() / 4);
+        int toCopy  = Math.min(remaining, offByteBuff.remaining() / 4);
         int tgtPos       = offByteBuff.position();
 
         if (!hasNulls) {
@@ -1011,7 +1029,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
           }
         } else {
           for (int idx = 0; idx < toCopy; idx++, tgtPos += 4, srcPos++) {
-            final int curr_len = lengths[srcPos];
+            int curr_len = lengths[srcPos];
             totalDataLen    += (curr_len >= 0) ? curr_len : 0;
             UInt4Vector.BufferedMutator.writeInt(totalDataLen, bufferArray, tgtPos);
           }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java
index eed998a..e4e8cbc 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/NullableVector.java
@@ -23,9 +23,15 @@ import org.apache.drill.exec.record.MaterializedField;
 
 public interface NullableVector extends ValueVector {
 
+  public interface Mutator extends ValueVector.Mutator {
+
+    // Used by the vector accessors to force the last set value.
+
+    void setSetCount(int n);
+  }
+
   MaterializedField bitsField = MaterializedField.create(BITS_VECTOR_NAME, Types.required(MinorType.UINT1));
 
   ValueVector getBitsVector();
   ValueVector getValuesVector();
-  void finalizeLastSet(int count);
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/VectorPrinter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/VectorPrinter.java
index 838c749..564e784 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/VectorPrinter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/impl/VectorPrinter.java
@@ -29,6 +29,15 @@ import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 
 public class VectorPrinter {
 
+  public static void printOffsets(UInt4Vector vector, int maxPrint) {
+    int capacity = vector.getAllocatedSize() / 4;
+    if (capacity == 0) {
+      return;
+    }
+    int length = Math.min(maxPrint, capacity);
+    printOffsets(vector, 0, length);
+  }
+
   public static void printOffsets(UInt4Vector vector, int start, int length) {
     header(vector, start, length);
     for (int i = start, j = 0; j < length; i++, j++) {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
index 856a44b..5100ba9 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
@@ -283,7 +283,8 @@ public class NullableScalarWriter extends AbstractScalarWriterImpl {
     // Avoid back-filling null values.
     baseWriter.skipNulls();
     baseWriter.endWrite();
-    nullableVector.finalizeLastSet(writerIndex.vectorIndex());
+    ((NullableVector.Mutator) nullableVector.getMutator())
+        .setSetCount(writerIndex.vectorIndex());
   }
 
   @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
index 758185a..0b25d61 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
@@ -306,14 +306,22 @@ public class OffsetVectorWriterImpl extends AbstractFixedWidthWriter implements
   @Override
   public void setValueCount(int valueCount) {
 
-    // Value count is in row positions, not index
-    // positions. (There are one more index positions
-    // than row positions.)
-
-    int offsetCount = valueCount + 1;
-    mandatoryResize(offsetCount);
-    fillEmpties(valueCount - lastWriteIndex - 1);
-    if (valueCount > 0) {
+    if (valueCount == 0) {
+
+      // Special case: if the total number of values is zero,
+      // then the offset vector should have 0 (not 1) values.
+      // Serialization code relies on this fact.
+
+      vector().getBuffer().writerIndex(0);
+    } else {
+
+      // Value count is in row positions, not index
+      // positions. (There are one more index positions
+      // than row positions.)
+
+      int offsetCount = valueCount + 1;
+      mandatoryResize(offsetCount);
+      fillEmpties(valueCount - lastWriteIndex - 1);
       vector().getBuffer().writerIndex(offsetCount * VALUE_WIDTH);
     }
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractRepeatedMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractRepeatedMapVector.java
index 9e1da48..1fbc2bb 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractRepeatedMapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractRepeatedMapVector.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.vector.complex;
 
-import io.netty.buffer.DrillBuf;
-
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -29,8 +27,8 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.expr.BasicTypeHelper;
 import org.apache.drill.exec.expr.holders.RepeatedValueHolder;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
@@ -42,6 +40,8 @@ import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VectorDescriptor;
 
+import io.netty.buffer.DrillBuf;
+
 public abstract class AbstractRepeatedMapVector extends AbstractMapVector implements RepeatedValueVector {
 
   protected final UInt4Vector offsets; // offsets to start of each record (considering record indices are 0-indexed)
@@ -318,11 +318,11 @@ public abstract class AbstractRepeatedMapVector extends AbstractMapVector implem
     }
   }
 
-  transient private AbstractRepeatedMapTransferPair ephPair;
+  transient private AbstractRepeatedMapTransferPair<?> ephPair;
 
   public void copyFromSafe(int fromIndex, int thisIndex, AbstractRepeatedMapVector from) {
     if (ephPair == null || ephPair.from != from) {
-      ephPair = (AbstractRepeatedMapTransferPair) from.makeTransferPair(this);
+      ephPair = (AbstractRepeatedMapTransferPair<?>) from.makeTransferPair(this);
     }
     ephPair.copyValueSafe(fromIndex, thisIndex);
   }
@@ -430,29 +430,32 @@ public abstract class AbstractRepeatedMapVector extends AbstractMapVector implem
   }
 
   public abstract class Mutator implements RepeatedMutator {
+
     @Override
     public void startNewValue(int index) {
       emptyPopulator.populate(index + 1);
-      offsets.getMutator().setSafe(index + 1, offsets.getAccessor().get(index));
     }
 
     @Override
     public void setValueCount(int topLevelValueCount) {
-      emptyPopulator.populate(topLevelValueCount);
-      offsets.getMutator().setValueCount(topLevelValueCount == 0 ? 0 : topLevelValueCount + 1);
-      int childValueCount = offsets.getAccessor().get(topLevelValueCount);
+      int childValueCount;
+      if (topLevelValueCount == 0) {
+        childValueCount = 0;
+        offsets.getMutator().setValueCount(0);
+      } else {
+        emptyPopulator.populate(topLevelValueCount);
+        childValueCount = offsets.getAccessor().get(topLevelValueCount);
+      }
       for (ValueVector v : getChildren()) {
         v.getMutator().setValueCount(childValueCount);
       }
     }
 
     @Override
-    public void reset() {
-    }
+    public void reset() { }
 
     @Override
-    public void generateTestData(int values) {
-    }
+    public void generateTestData(int values) { }
 
     public int add(int index) {
       int prevEnd = offsets.getAccessor().get(index + 1);
@@ -461,8 +464,7 @@ public abstract class AbstractRepeatedMapVector extends AbstractMapVector implem
     }
 
     @Override
-    public void exchange(ValueVector.Mutator other) {
-    }
+    public void exchange(ValueVector.Mutator other) { }
   }
 
   @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
index 046641b..17a2e73 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java
@@ -333,16 +333,8 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
       while (offsets.getValueCapacity() <= index) {
         offsets.reAlloc();
       }
-      offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
-      setValueCount(index+1);
-    }
-
-    public boolean startNewValueBounded(int index) {
-      if (index >= MAX_ROW_COUNT) {
-        return false;
-      }
-      startNewValue(index);
-      return true;
+      offsets.getMutator().setSafe(index + 1, offsets.getAccessor().get(index));
+      setValueCount(index + 1);
     }
 
     @Override
@@ -354,13 +346,13 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
       // saves a bit of memory, it greatly complicates code that
       // works with vectors because of the special case for zero-length
       // vectors.
-      offsets.getMutator().setValueCount(valueCount == 0 ? 0 : valueCount+1);
+      offsets.getMutator().setValueCount(valueCount == 0 ? 0 : valueCount + 1);
       final int childValueCount = valueCount == 0 ? 0 : offsets.getAccessor().get(valueCount);
       vector.getMutator().setValueCount(childValueCount);
     }
 
     public int getInnerValueCountAt(int index) {
-      return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index);
+      return offsets.getAccessor().get(index + 1) - offsets.getAccessor().get(index);
     }
   }
 }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/EmptyValuePopulator.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/EmptyValuePopulator.java
index c6ad814..511c202 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/EmptyValuePopulator.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/EmptyValuePopulator.java
@@ -17,11 +17,11 @@
  */
 package org.apache.drill.exec.vector.complex;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 /**
- * A helper class that is used to track and populate empty values in repeated value vectors.
+ * Tracks and populate empty values in repeated value vectors.
  */
 public class EmptyValuePopulator {
   private final UInt4Vector offsets;
@@ -31,21 +31,33 @@ public class EmptyValuePopulator {
   }
 
   /**
-   * Marks all values since the last set as empty. The last set value is obtained from underlying offsets vector.
+   * Marks all values since the last set as empty. The last set value is
+   * obtained from underlying offsets vector.
    *
-   * @param lastIndex  the last index (inclusive) in the offsets vector until which empty population takes place
-   * @throws java.lang.IndexOutOfBoundsException  if lastIndex is negative or greater than offsets capacity.
+   * @param valueCount
+   *          the last index (inclusive) in the offsets vector until which empty
+   *          population takes place. Given in record (not offset) units
+   * @throws java.lang.IndexOutOfBoundsException
+   *           if lastIndex is negative or greater than offsets capacity.
    */
-  public void populate(int lastIndex) {
-    Preconditions.checkElementIndex(lastIndex, Integer.MAX_VALUE);
-    final UInt4Vector.Accessor accessor = offsets.getAccessor();
-    final UInt4Vector.Mutator mutator = offsets.getMutator();
-    final int lastSet = Math.max(accessor.getValueCount() - 1, 0);
-    final int previousEnd = accessor.get(lastSet);//0 ? 0 : accessor.get(lastSet);
-    for (int i = lastSet; i < lastIndex; i++) {
-      mutator.setSafe(i + 1, previousEnd);
+  public void populate(int valueCount) {
+    Preconditions.checkElementIndex(valueCount, Integer.MAX_VALUE);
+    UInt4Vector.Accessor accessor = offsets.getAccessor();
+    // currentCount is offset counts
+    int currentCount = accessor.getValueCount();
+    UInt4Vector.Mutator mutator = offsets.getMutator();
+    if (currentCount == 0) {
+      // Fill in the implicit zero position.
+
+      mutator.setSafe(0, 0);
+      currentCount = 1;
     }
-    mutator.setValueCount(lastIndex+1);
+    int previousEnd = accessor.get(currentCount - 1);
+    int offsetCount = valueCount + 1;
+    // Indexes are in offset locations, 1 greater than record indexes
+    for (int i = currentCount; i < offsetCount; i++) {
+      mutator.setSafe(i, previousEnd);
+    }
+    mutator.setValueCount(offsetCount);
   }
-
 }
diff --git a/exec/vector/src/test/java/org/apache/drill/exec/vector/VariableLengthVectorTest.java b/exec/vector/src/test/java/org/apache/drill/exec/vector/VariableLengthVectorTest.java
index c77854f..05fb66a 100644
--- a/exec/vector/src/test/java/org/apache/drill/exec/vector/VariableLengthVectorTest.java
+++ b/exec/vector/src/test/java/org/apache/drill/exec/vector/VariableLengthVectorTest.java
@@ -36,15 +36,16 @@ public class VariableLengthVectorTest
   public void testSettingSameValueCount()
   {
     try (RootAllocator allocator = new RootAllocator(10_000_000)) {
-      final MaterializedField field = MaterializedField.create("stringCol", Types.required(TypeProtos.MinorType.VARCHAR));
-      final VarCharVector vector = new VarCharVector(field, allocator);
+      MaterializedField field = MaterializedField.create("stringCol", Types.required(TypeProtos.MinorType.VARCHAR));
+      @SuppressWarnings("resource")
+      VarCharVector vector = new VarCharVector(field, allocator);
 
       vector.allocateNew();
 
       try {
-        final int size = 1000;
-        final VarCharVector.Mutator mutator = vector.getMutator();
-        final VarCharVector.Accessor accessor = vector.getAccessor();
+        int size = 1000;
+        VarCharVector.Mutator mutator = vector.getMutator();
+        VarCharVector.Accessor accessor = vector.getAccessor();
 
         setSafeIndexStrings("", 0, size, mutator);
 
@@ -64,16 +65,17 @@ public class VariableLengthVectorTest
   public void testTrunicateVectorSetValueCount()
   {
     try (RootAllocator allocator = new RootAllocator(10_000_000)) {
-      final MaterializedField field = MaterializedField.create("stringCol", Types.required(TypeProtos.MinorType.VARCHAR));
-      final VarCharVector vector = new VarCharVector(field, allocator);
+      MaterializedField field = MaterializedField.create("stringCol", Types.required(TypeProtos.MinorType.VARCHAR));
+      @SuppressWarnings("resource")
+      VarCharVector vector = new VarCharVector(field, allocator);
 
       vector.allocateNew();
 
       try {
-        final int size = 1000;
-        final int fluffSize = 10000;
-        final VarCharVector.Mutator mutator = vector.getMutator();
-        final VarCharVector.Accessor accessor = vector.getAccessor();
+        int size = 1000;
+        int fluffSize = 10000;
+        VarCharVector.Mutator mutator = vector.getMutator();
+        VarCharVector.Accessor accessor = vector.getAccessor();
 
         setSafeIndexStrings("", 0, size, mutator);
         setSafeIndexStrings("first cut ", size, fluffSize, mutator);
@@ -92,19 +94,20 @@ public class VariableLengthVectorTest
   @Test
   public void testDRILL7341() {
     try (RootAllocator allocator = new RootAllocator(10_000_000)) {
-      final MaterializedField field = MaterializedField.create("stringCol", Types.optional(TypeProtos.MinorType.VARCHAR));
-      final NullableVarCharVector sourceVector = new NullableVarCharVector(field, allocator);
-      final NullableVarCharVector targetVector = new NullableVarCharVector(field, allocator);
+      MaterializedField field = MaterializedField.create("stringCol", Types.optional(TypeProtos.MinorType.VARCHAR));
+      NullableVarCharVector sourceVector = new NullableVarCharVector(field, allocator);
+      @SuppressWarnings("resource")
+      NullableVarCharVector targetVector = new NullableVarCharVector(field, allocator);
 
       sourceVector.allocateNew();
       targetVector.allocateNew();
 
       try {
-        final NullableVarCharVector.Mutator sourceMutator = sourceVector.getMutator();
+        NullableVarCharVector.Mutator sourceMutator = sourceVector.getMutator();
         sourceMutator.setValueCount(sourceVector.getValueCapacity() * 4);
 
         targetVector.exchange(sourceVector);
-        final NullableVarCharVector.Mutator targetMutator = targetVector.getMutator();
+        NullableVarCharVector.Mutator targetMutator = targetVector.getMutator();
         targetMutator.setValueCount(targetVector.getValueCapacity() * 2);
       } finally {
         sourceVector.clear();
@@ -120,16 +123,17 @@ public class VariableLengthVectorTest
   public void testSetBackTracking()
   {
     try (RootAllocator allocator = new RootAllocator(10_000_000)) {
-      final MaterializedField field = MaterializedField.create("stringCol", Types.required(TypeProtos.MinorType.VARCHAR));
-      final VarCharVector vector = new VarCharVector(field, allocator);
+      MaterializedField field = MaterializedField.create("stringCol", Types.required(TypeProtos.MinorType.VARCHAR));
+      @SuppressWarnings("resource")
+      VarCharVector vector = new VarCharVector(field, allocator);
 
       vector.allocateNew();
 
       try {
-        final int size = 1000;
-        final int fluffSize = 10000;
-        final VarCharVector.Mutator mutator = vector.getMutator();
-        final VarCharVector.Accessor accessor = vector.getAccessor();
+        int size = 1000;
+        int fluffSize = 10000;
+        VarCharVector.Mutator mutator = vector.getMutator();
+        VarCharVector.Accessor accessor = vector.getAccessor();
 
         setSafeIndexStrings("", 0, size, mutator);
         setSafeIndexStrings("first cut ", size, fluffSize, mutator);
@@ -150,7 +154,7 @@ public class VariableLengthVectorTest
   public static void setSafeIndexStrings(String prefix, int offset, int size, VarCharVector.Mutator mutator)
   {
     for (int index = offset; index < size; index++) {
-      final String indexString = prefix + "String num " + index;
+      String indexString = prefix + "String num " + index;
       mutator.setSafe(index, indexString.getBytes());
     }
   }
@@ -158,7 +162,7 @@ public class VariableLengthVectorTest
   public static void checkIndexStrings(String prefix, int offset, int size, VarCharVector.Accessor accessor)
   {
     for (int index = offset; index < size; index++) {
-      final String indexString = prefix + "String num " + index;
+      String indexString = prefix + "String num " + index;
       Assert.assertArrayEquals(indexString.getBytes(), accessor.get(index));
     }
   }
diff --git a/exec/vector/src/test/java/org/apache/drill/exec/vector/VectorTest.java b/exec/vector/src/test/java/org/apache/drill/exec/vector/VectorTest.java
new file mode 100644
index 0000000..d633a79
--- /dev/null
+++ b/exec/vector/src/test/java/org/apache/drill/exec/vector/VectorTest.java
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.memory.RootAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.EmptyValuePopulator;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.impl.NullableVarCharWriterImpl;
+import org.apache.drill.exec.vector.complex.impl.SingleMapWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.drill.exec.vector.complex.writer.FieldWriter;
+import org.apache.drill.exec.vector.complex.writer.IntWriter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.DrillBuf;
+
+public class VectorTest {
+
+  private static RootAllocator allocator;
+
+  @BeforeClass
+  public static void setup() {
+    allocator = new RootAllocator(10_000_000);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    allocator.close();
+  }
+
+  @Test
+  public void testVarChar() {
+    MaterializedField field = MaterializedField.create("stringCol", Types.required(TypeProtos.MinorType.VARCHAR));
+    try (VarCharVector v = new VarCharVector(field, allocator)) {
+      v.allocateNew(1000, 128);
+      VarCharVector.Accessor va = v.getAccessor();
+      UInt4Vector ov = v.getOffsetVector();
+      UInt4Vector.Accessor ova = ov.getAccessor();
+
+      assertEquals(1024, v.getBuffer().capacity());
+      // Note: offset vector size is (128 + 1) rounded up
+      assertEquals(256 * 4, ov.getBuffer().capacity());
+
+      // Zero fill so the following is deterministic
+
+      v.getBuffer().setZero(0, 1024);
+      ov.getBuffer().setZero(0, 256 * 4);
+
+      // O-size is special
+
+      v.getMutator().setValueCount(0);
+      assertEquals(0, va.getValueCount());
+      assertEquals(0, ova.getValueCount());
+
+      // Fill-empties at 0 is also special.
+
+      v.getMutator().fillEmpties(-1, 0);
+      assertEquals(0, va.getValueCount());
+      assertEquals(0, ova.getValueCount());
+
+      // Write one value
+
+      v.getMutator().setSafe(0, "foo".getBytes());
+      v.getMutator().setValueCount(1);
+      assertEquals(1, va.getValueCount());
+      assertEquals(2, ova.getValueCount());
+      assertEquals(0, ova.get(0));
+      assertEquals(3, ova.get(1));
+
+      // Pretend to skip a value
+
+      v.getMutator().fillEmpties(0, 2);
+      // fillEmpties does not change the value count
+      assertEquals(0, ova.get(0));
+      assertEquals(3, ova.get(1)); // First value
+      assertEquals(3, ova.get(2)); // Filled
+      assertEquals(0, ova.get(3)); // Should not be set
+
+      // Write one more value
+
+      v.getMutator().setSafe(2, "mumble".getBytes());
+      v.getMutator().setValueCount(2);
+      assertEquals(2, va.getValueCount());
+      assertEquals(3, ova.getValueCount());
+      assertEquals(3, ova.get(1));
+      assertEquals(3, ova.get(2));
+      assertEquals(9, ova.get(3));
+    }
+  }
+
+  /**
+   * Verify the delicate, fragile logic for setting the value count and
+   * filling empty values. Some operators and readers first write values,
+   * then set the value count at the end of the batch. The "complex writers"
+   * set the value count after each value. The "lastSet" count tracks the
+   * last value actually written, but is set in fillEmpties(), which is
+   * also called when setting the value count.
+   */
+
+  @Test
+  public void testNullableVarChar() {
+    MaterializedField field = MaterializedField.create("stringCol", Types.optional(TypeProtos.MinorType.VARCHAR));
+    try (NullableVarCharVector v = new NullableVarCharVector(field, allocator)) {
+      int targetDataLength = 1000;
+      int targetValueCount = 128;
+      v.allocateNew(targetDataLength, targetValueCount);
+      NullableVarCharVector.Accessor va = v.getAccessor();
+      NullableVarCharVector.Mutator vm = v.getMutator();
+      VarCharVector dv = v.getValuesVector();
+      VarCharVector.Accessor da = dv.getAccessor();
+      UInt4Vector ov = dv.getOffsetVector();
+      UInt4Vector.Accessor ova = ov.getAccessor();
+      UInt1Vector bv = v.getBitsVector();
+
+      // Right at edge, so target offset vector is larger
+      int actualDataLength = 1024;
+      int targetOffsetCount = 2 * targetValueCount;
+      assertEquals(actualDataLength, dv.getBuffer().capacity());
+      assertEquals(targetOffsetCount * 4, ov.getBuffer().capacity());
+      assertEquals(targetValueCount, bv.getBuffer().capacity());
+
+      // Zero fill so the following is deterministic
+      // Bits are already zero
+
+      v.getBuffer().setZero(0, 1024);
+      ov.getBuffer().setZero(0, 256 * 4);
+
+      // Initial setup. Valid only because of zero-fill and
+      // how intial values happen to be set up.
+
+      assertEquals(0, va.getValueCount());
+      assertEquals(-1, vm.getLastSet());
+      assertEquals(0, bv.getAccessor().getValueCount());
+      assertEquals(0, da.getValueCount());
+      assertEquals(0, ova.getValueCount());
+      assertEquals(0, ova.get(0));
+      assertEquals(0, ova.get(1));
+
+      // O-size is special
+
+      v.getMutator().setValueCount(0);
+      assertEquals(-1, vm.getLastSet());
+      assertEquals(0, va.getValueCount());
+      assertEquals(0, bv.getAccessor().getValueCount());
+      assertEquals(0, da.getValueCount());
+      assertEquals(0, ova.getValueCount());
+      assertEquals(0, ova.get(0));
+      assertEquals(0, ova.get(1));
+
+      // Fill-empties at 0 is also special.
+
+      v.getMutator().fillEmpties(0);
+      assertEquals(0, vm.getLastSet());
+      assertEquals(0, va.getValueCount());
+      assertEquals(0, ova.getValueCount());
+      assertEquals(0, ova.get(0));
+      assertEquals(0, ova.get(1));
+
+      // Write one value
+
+      byte[] bytes = "foo".getBytes();
+      vm.setSafe(0, bytes, 0, bytes.length);
+      assertEquals(0, vm.getLastSet());
+      vm.setValueCount(1);
+      assertEquals(0, vm.getLastSet());
+      assertEquals(1, va.getValueCount());
+      assertEquals(2, ova.getValueCount());
+      assertEquals(0, ova.get(0));
+      assertEquals(bytes.length, ova.get(1));
+
+      // Pretend to skip a value
+
+      v.getMutator().fillEmpties(2);
+      // Optimistically pre-set for value we're about to write
+      assertEquals(2, vm.getLastSet());
+      // fillEmpties does not change the value count
+      assertEquals(0, ova.get(0));
+      assertEquals(3, ova.get(1)); // First value
+      assertEquals(3, ova.get(2)); // Filled
+      assertEquals(0, ova.get(3)); // Should not be set
+
+      // Calling fillEmpties() twice is idempotent
+
+      v.getMutator().fillEmpties(2);
+      assertEquals(2, v.getMutator().getLastSet());
+      assertEquals(0, ova.get(0));
+      assertEquals(3, ova.get(1)); // First value
+      assertEquals(3, ova.get(2)); // Filled
+      assertEquals(0, ova.get(3)); // Should not be set
+
+      // Write one more value
+
+      byte[] second = "mumble".getBytes();
+      vm.setSafe(2, second, 0, second.length);
+      assertEquals(2, v.getMutator().getLastSet());
+      vm.setValueCount(3);
+      assertEquals(2, v.getMutator().getLastSet());
+      assertEquals(3, va.getValueCount());
+      assertEquals(4, ova.getValueCount());
+      assertEquals(3, ova.get(1));
+      assertEquals(3, ova.get(2));
+      assertEquals(9, ova.get(3));
+
+      // Skip two values
+
+      v.getMutator().setValueCount(5);
+      assertEquals(4, v.getMutator().getLastSet());
+      assertEquals(5, va.getValueCount());
+      assertEquals(6, ova.getValueCount());
+      assertEquals(9, ova.get(3));
+      assertEquals(9, ova.get(4));
+      assertEquals(9, ova.get(5));
+
+      // Skip a large number of values. Finish the vector
+      // right where the offset vector would have to increase
+      // in length
+
+      v.getMutator().setValueCount(targetOffsetCount);
+      assertEquals(targetOffsetCount - 1, v.getMutator().getLastSet());
+      assertEquals(targetOffsetCount, va.getValueCount());
+      assertEquals(targetOffsetCount + 1, ova.getValueCount());
+      assertEquals(9, ova.get(targetOffsetCount-1));
+      assertEquals(9, ova.get(targetOffsetCount));
+      assertEquals(0, ova.get(targetOffsetCount + 1));
+    }
+  }
+
+  @Test
+  public void testNullableVarCharWriter() throws Exception {
+    MaterializedField field = MaterializedField.create("stringCol", Types.optional(TypeProtos.MinorType.VARCHAR));
+    try (NullableVarCharVector v = new NullableVarCharVector(field, allocator);
+         DrillBuf buf = allocator.buffer(100)) {
+      v.allocateNew(1000, 128);
+      @SuppressWarnings("resource")
+      FieldWriter w = new NullableVarCharWriterImpl(v, null);
+
+      // Write in locations 1 and 3.
+
+      w.setPosition(0);
+      buf.setBytes(0, "foo".getBytes());
+      w.writeVarChar(0, 3, buf);
+
+      w.setPosition(2);
+      buf.setBytes(0, "mumble".getBytes());
+      w.writeVarChar(0, 6, buf);
+
+      // Don't close the writer; it clears the vector
+      // w.close();
+
+      VarCharVector dv = v.getValuesVector();
+      UInt4Vector ov = dv.getOffsetVector();
+      UInt4Vector.Accessor ova = ov.getAccessor();
+
+      v.getMutator().setValueCount(2);
+      assertEquals(2, v.getAccessor().getValueCount());
+      assertEquals(3, ova.getValueCount());
+      assertEquals(3, ova.get(1));
+      assertEquals(3, ova.get(2));
+      assertEquals(9, ova.get(3));
+    }
+  }
+
+  @Test
+  public void testEmptyValuePopulator() throws Exception {
+    MaterializedField field = MaterializedField.create("offsets", Types.required(TypeProtos.MinorType.UINT4));
+    try (UInt4Vector v = new UInt4Vector(field, allocator)) {
+      EmptyValuePopulator pop = new EmptyValuePopulator(v);
+      UInt4Vector.Accessor va = v.getAccessor();
+      UInt4Vector.Mutator vm = v.getMutator();
+
+      v.allocateNew(128);
+
+      // Zero case; for zero-length batches
+
+      vm.setValueCount(0);
+      assertEquals(0, va.getValueCount());
+      assertEquals(0, va.get(0));
+
+      // Start a record 0 value.
+
+      pop.populate(0);
+      assertEquals(1, va.getValueCount());
+      assertEquals(0, va.get(0));
+
+      // Pretend batch count is 1. Offset vector is
+      // currently in special 0-size state.
+
+      pop.populate(1);
+      vm.setValueCount(1);
+      assertEquals(1, va.getValueCount());
+      assertEquals(0, va.get(0));
+      assertEquals(0, va.get(1));
+
+      // Pretend values are [xx] and [xxx]
+
+      vm.set(1, 2);
+      vm.set(2, 5);
+      vm.set(3, 0);
+      vm.setValueCount(3);
+      assertEquals(3, va.getValueCount());
+      assertEquals(5, va.get(2));
+      assertEquals(0, va.get(3));
+
+      // Pretend that the record count is 2 for the two
+      // values above.
+
+      pop.populate(2);
+      assertEquals(3, va.getValueCount());
+      assertEquals(5, va.get(2));
+      assertEquals(0, va.get(3));
+
+      // Pretend, instead we skipped records 2, 3 and 4
+
+      pop.populate(5);
+      assertEquals(6, va.getValueCount());
+      assertEquals(5, va.get(2));
+      assertEquals(5, va.get(3));
+      assertEquals(5, va.get(4));
+      assertEquals(5, va.get(5));
+      assertEquals(0, va.get(6));
+    }
+  }
+
+  @Test
+  public void testRepeatedMapCount() throws Exception {
+    try (RepeatedMapVector v = buildRepeatedMap()) {
+
+      IntVector iv = getInner(v);
+      RepeatedMapVector.Accessor va = v.getAccessor();
+      RepeatedMapVector.Mutator vm = v.getMutator();
+      IntVector.Accessor ia = iv.getAccessor();
+      IntVector.Mutator im = iv.getMutator();
+      UInt4Vector ov = v.getOffsetVector();
+      UInt4Vector.Accessor oa = ov.getAccessor();
+      UInt4Vector.Mutator om = ov.getMutator();
+
+      // Zero fill so the following is deterministic
+
+      ov.getBuffer().setZero(0, 6 * 4);
+
+      // Initial state
+
+      assertEquals(0, va.getValueCount());
+      assertEquals(0, ia.getValueCount());
+      assertEquals(0, oa.getValueCount());
+      assertEquals(0, oa.get(0));
+
+      // Record size = 0
+
+      vm.setValueCount(0);
+      assertEquals(0, va.getValueCount());
+      assertEquals(0, ia.getValueCount());
+      assertEquals(0, oa.getValueCount());
+      assertEquals(0, oa.get(0));
+
+      // Record size = 1, so, implicit record 1 (1-based) of []
+
+      vm.setValueCount(1);
+      assertEquals(1, va.getValueCount());
+      assertEquals(0, ia.getValueCount());
+      assertEquals(2, oa.getValueCount());
+      assertEquals(0, oa.get(0));
+      assertEquals(0, oa.get(1));
+
+      // Record 2 (1-based) is [10, 20]
+
+      im.set(0, 10);
+      im.set(1, 20);
+      im.setValueCount(2);
+      om.set(2, 2);
+      om.setValueCount(3);
+      vm.setValueCount(2);
+      assertEquals(2, va.getValueCount());
+      assertEquals(2, ia.getValueCount());
+      assertEquals(3, oa.getValueCount());
+      assertEquals(0, oa.get(0));
+      assertEquals(0, oa.get(1));
+      assertEquals(2, oa.get(2));
+      assertEquals(0, oa.get(3));
+
+      // Batch size = 4, so implicit record 2, 4 of []
+
+      vm.setValueCount(4);
+      assertEquals(4, va.getValueCount());
+      assertEquals(2, ia.getValueCount());
+      assertEquals(5, oa.getValueCount());
+      assertEquals(0, oa.get(0));
+      assertEquals(0, oa.get(1));
+      assertEquals(2, oa.get(2));
+      assertEquals(2, oa.get(3));
+      assertEquals(2, oa.get(4));
+      assertEquals(0, oa.get(5));
+    }
+  }
+
+  @Test
+  public void testRepeatedCopySafe() throws Exception {
+    try (RepeatedMapVector v = buildRepeatedMap();
+        RepeatedMapVector f = buildFromMap()) {
+
+      RepeatedMapVector.Mutator vm = v.getMutator();
+      UInt4Vector ov = v.getOffsetVector();
+      UInt4Vector.Accessor oa = ov.getAccessor();
+      UInt4Vector.Mutator om = ov.getMutator();
+
+      TransferPair tp = f.makeTransferPair(v);
+
+      tp.copyValueSafe(0, 0);
+
+      // CopyValue does not change the value count
+      //assertEquals(1, va.getValueCount());
+      //assertEquals(2, oa.getValueCount());
+      //assertEquals(2, ia.getValueCount());
+      assertEquals(0, oa.get(0));
+      assertEquals(2, oa.get(1));
+
+      tp.copyValueSafe(1, 1);
+      assertEquals(0, oa.get(0));
+      assertEquals(2, oa.get(1));
+      assertEquals(5, oa.get(2));
+
+      tp.copyValueSafe(2, 2);
+      assertEquals(2, oa.get(1));
+      assertEquals(5, oa.get(2));
+      assertEquals(5, oa.get(3));
+
+     vm.setValueCount(3);
+
+      // v should now be the same as f
+      validateFrom(v);
+    }
+  }
+
+  private class SpecialMapVector extends MapVector {
+
+    private final RepeatedMapVector v;
+
+    public SpecialMapVector(RepeatedMapVector v) {
+      super("", null, null);
+      this.v = v;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
+      assert clazz == RepeatedMapVector.class;
+      return (T) v;
+    }
+  }
+
+  @Test
+  public void testRepeatedMapWriter() throws Exception {
+
+    // The writers will create a nullable int inner vector.
+
+    MaterializedField field = MaterializedField.create("repeated_map", Types.repeated(TypeProtos.MinorType.MAP));
+    try (RepeatedMapVector v = new RepeatedMapVector(field, allocator, null)) {
+
+      SpecialMapVector mapVector = new SpecialMapVector(v);
+      @SuppressWarnings("resource")
+      SingleMapWriter mapRoot = new SingleMapWriter(mapVector, null, false);
+      ListWriter lw = mapRoot.list("repeated_map");
+      MapWriter mw = lw.map();
+
+      // Record 1: [10, 20]
+
+      lw.setPosition(0);
+      lw.startList();
+      mw.start();
+      IntWriter iw = mw.integer("inner");
+      iw.writeInt(10);
+      mw.end();
+      mw.start();
+      iw.writeInt(20);
+      mw.end();
+      lw.endList();
+
+      // Record 2: [30, 40, 50]
+
+      lw.setPosition(1);
+      lw.startList();
+      mw.start();
+      iw.writeInt(30);
+      mw.end();
+      mw.start();
+      iw.writeInt(40);
+      mw.end();
+      mw.start();
+      iw.writeInt(50);
+      mw.end();
+      lw.endList();
+
+      // Record 3: []
+
+      lw.setPosition(2);
+      lw.startList();
+      lw.endList();
+
+      v.getMutator().setValueCount(3);
+
+      assertEquals(3, v.getAccessor().getValueCount());
+      UInt4Vector.Accessor oa = v.getOffsetVector().getAccessor();
+      assertEquals(4, oa.getValueCount());
+      assertEquals(0, oa.get(0));
+      assertEquals(2, oa.get(1));
+      assertEquals(5, oa.get(2));
+      assertEquals(5, oa.get(3));
+      assertEquals(0, oa.get(4)); // Past end
+
+      NullableIntVector inner =  v.addOrGet("inner", Types.optional(TypeProtos.MinorType.INT), NullableIntVector.class);
+      UInt1Vector.Accessor ba = inner.getBitsVector().getAccessor();
+      assertEquals(1, ba.get(0));
+      assertEquals(1, ba.get(1));
+      assertEquals(1, ba.get(2));
+      assertEquals(1, ba.get(3));
+      assertEquals(1, ba.get(4));
+      assertEquals(0, ba.get(5)); // Past end
+
+      NullableIntVector.Accessor ia = inner.getAccessor();
+      assertEquals(10, ia.get(0));
+      assertEquals(20, ia.get(1));
+      assertEquals(30, ia.get(2));
+      assertEquals(40, ia.get(3));
+      assertEquals(50, ia.get(4));
+    }
+  }
+
+  private RepeatedMapVector buildRepeatedMap() {
+    MaterializedField field = MaterializedField.create("repeated_map", Types.repeated(TypeProtos.MinorType.MAP));
+    RepeatedMapVector v = new RepeatedMapVector(field, allocator, null);
+    getInner(v);
+    v.allocateNew(5, 10);
+    return v;
+  }
+
+  private IntVector getInner(RepeatedMapVector v) {
+    return v.addOrGet("inner", Types.required(TypeProtos.MinorType.INT), IntVector.class);
+  }
+
+  private RepeatedMapVector buildFromMap() {
+    RepeatedMapVector v = buildRepeatedMap();
+
+    // Can't figure out how to get the indexes to step for
+    // inner and outer values.
+    // If we get the int writer from the repeated map writer,
+    // the column will be converted to nullable int.
+
+    IntVector iv = getInner(v);
+    IntVector.Mutator im = iv.getMutator();
+    UInt4Vector ov = v.getOffsetVector();
+    UInt4Vector.Mutator om = ov.getMutator();
+
+    om.set(0, 0);
+
+    // Record 1: [10, 20]
+
+    im.set(0, 10);
+    im.set(1, 20);
+    om.set(1, 2);
+
+    // Record 2: [30, 40, 50]
+
+    im.set(2, 30);
+    im.set(3, 40);
+    im.set(4, 50);
+    om.set(2, 5);
+
+    // Record 3: []
+
+    om.set(3, 5);
+
+    om.setValueCount(4);
+    v.getMutator().setValueCount(3);
+
+    // Sanity check
+
+    validateFrom(v);
+    return v;
+  }
+
+  private void validateFrom(RepeatedMapVector v) {
+    UInt4Vector.Accessor oa = v.getOffsetVector().getAccessor();
+    assertEquals(3, v.getAccessor().getValueCount());
+    assertEquals(4, oa.getValueCount());
+    assertEquals(0, oa.get(0));
+    assertEquals(2, oa.get(1));
+    assertEquals(5, oa.get(2));
+    assertEquals(5, oa.get(3));
+
+    IntVector.Accessor ia = getInner(v).getAccessor();
+    assertEquals(10, ia.get(0));
+    assertEquals(20, ia.get(1));
+    assertEquals(30, ia.get(2));
+    assertEquals(40, ia.get(3));
+    assertEquals(50, ia.get(4));
+  }
+}