You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/11/15 21:43:36 UTC

[arrow] 01/04: ARROW-1473: ValueVector new hierarchy prototype (implementation phase 1)

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 42353ba993314e6bba963aae50861566150f96eb
Author: Siddharth Teotia <si...@dremio.com>
AuthorDate: Sat Oct 14 11:38:30 2017 -0400

    ARROW-1473: ValueVector new hierarchy prototype (implementation phase 1)
    
    Close #1164
    Close #1198
    
    Change-Id: If18e42d2edfdfef83e83621334a5b65a390e9db9
---
 .../src/main/codegen/templates/ComplexReaders.java |  24 +-
 .../src/main/codegen/templates/ComplexWriters.java |  45 +-
 .../codegen/templates/NullableValueVectors.java    | 542 ++++++++++++--
 .../src/main/codegen/templates/UnionVector.java    |  18 +-
 .../arrow/vector/BaseNullableFixedWidthVector.java | 701 ++++++++++++++++++
 .../vector/BaseNullableVariableWidthVector.java    | 764 ++++++++++++++++++++
 .../org/apache/arrow/vector/BaseValueVector.java   |  20 +
 .../org/apache/arrow/vector/BitVectorHelper.java   |  60 ++
 .../org/apache/arrow/vector/NullableIntVector.java | 299 ++++++++
 .../apache/arrow/vector/NullableVarCharVector.java | 451 ++++++++++++
 .../java/org/apache/arrow/vector/ValueVector.java  |   6 +
 .../org/apache/arrow/vector/VectorUnloader.java    |  15 +-
 .../java/org/apache/arrow/vector/ZeroVector.java   |   9 +
 .../arrow/vector/complex/FixedSizeListVector.java  |  27 +-
 .../apache/arrow/vector/complex/ListVector.java    |  23 +-
 .../org/apache/arrow/vector/complex/MapVector.java |  33 +-
 .../arrow/vector/file/json/JsonFileReader.java     |   2 +-
 .../arrow/vector/TestBufferOwnershipTransfer.java  |   6 +-
 .../apache/arrow/vector/TestDictionaryVector.java  |  59 +-
 .../arrow/vector/TestFixedSizeListVector.java      |  13 +-
 .../apache/arrow/vector/TestSplitAndTransfer.java  |  15 +-
 .../org/apache/arrow/vector/TestValueVector.java   | 788 +++++++++++++++------
 .../org/apache/arrow/vector/TestVectorReAlloc.java |  11 +-
 .../apache/arrow/vector/TestVectorUnloadLoad.java  | 104 +--
 .../org/apache/arrow/vector/file/BaseFileTest.java |  70 +-
 .../apache/arrow/vector/file/TestArrowFile.java    |   2 +-
 26 files changed, 3658 insertions(+), 449 deletions(-)

diff --git a/java/vector/src/main/codegen/templates/ComplexReaders.java b/java/vector/src/main/codegen/templates/ComplexReaders.java
index 38cd1bf..7910649 100644
--- a/java/vector/src/main/codegen/templates/ComplexReaders.java
+++ b/java/vector/src/main/codegen/templates/ComplexReaders.java
@@ -70,7 +70,11 @@ public class ${name}ReaderImpl extends AbstractFieldReader {
   
   public boolean isSet(){
     <#if nullMode == "Nullable">
-    return !vector.getAccessor().isNull(idx());
+      <#if minor.class != "Int" && minor.class != "VarChar">
+        return !vector.getAccessor().isNull(idx());
+      <#else>
+        return !vector.isNull(idx());
+      </#if>
     <#else>
     return true;
     </#if>
@@ -93,11 +97,19 @@ public class ${name}ReaderImpl extends AbstractFieldReader {
   </#if>
 
   public void read(Nullable${minor.class?cap_first}Holder h){
-    vector.getAccessor().get(idx(), h);
+    <#if minor.class != "Int" && minor.class != "VarChar">
+      vector.getAccessor().get(idx(), h);
+    <#else>
+      vector.get(idx(), h);
+    </#if>
   }
   
   public ${friendlyType} read${safeType}(){
-    return vector.getAccessor().getObject(idx());
+    <#if minor.class == "Int" || minor.class == "VarChar">
+      return vector.getObject(idx());
+    <#else>
+      return vector.getAccessor().getObject(idx());
+    </#if>
   }
 
   <#if minor.class == "TimeStampSec" ||
@@ -115,7 +127,11 @@ public class ${name}ReaderImpl extends AbstractFieldReader {
   }
   
   public Object readObject(){
-    return vector.getAccessor().getObject(idx());
+    <#if minor.class == "Int" || minor.class == "VarChar">
+      return (Object)vector.getObject(idx());
+    <#else>
+      return vector.getAccessor().getObject(idx());
+    </#if>
   }
 }
 </#if>
diff --git a/java/vector/src/main/codegen/templates/ComplexWriters.java b/java/vector/src/main/codegen/templates/ComplexWriters.java
index fe099be..77f6594 100644
--- a/java/vector/src/main/codegen/templates/ComplexWriters.java
+++ b/java/vector/src/main/codegen/templates/ComplexWriters.java
@@ -39,11 +39,16 @@ package org.apache.arrow.vector.complex.impl;
 @SuppressWarnings("unused")
 public class ${eName}WriterImpl extends AbstractFieldWriter {
 
-  private final Nullable${name}Vector.Mutator mutator;
+  <#if minor.class != "Int" && minor.class != "VarChar">
+    private final Nullable${name}Vector.Mutator mutator;
+  </#if>
+
   final Nullable${name}Vector vector;
 
   public ${eName}WriterImpl(Nullable${name}Vector vector) {
-    this.mutator = vector.getMutator();
+    <#if minor.class != "Int" && minor.class != "VarChar">
+      this.mutator = vector.getMutator();
+    </#if>
     this.vector = vector;
   }
 
@@ -103,18 +108,33 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
   <#else>
 
   public void write(${minor.class}Holder h) {
-    mutator.setSafe(idx(), h);
-    vector.getMutator().setValueCount(idx()+1);
+    <#if minor.class != "Int" && minor.class != "VarChar">
+      mutator.setSafe(idx(), h);
+      vector.getMutator().setValueCount(idx()+1);
+    <#else>
+        vector.setSafe(idx(), h);
+        vector.setValueCount(idx()+1);
+    </#if>
   }
 
   public void write(Nullable${minor.class}Holder h) {
-    mutator.setSafe(idx(), h);
-    vector.getMutator().setValueCount(idx()+1);
+    <#if minor.class != "Int" && minor.class != "VarChar">
+      mutator.setSafe(idx(), h);
+      vector.getMutator().setValueCount(idx()+1);
+    <#else>
+      vector.setSafe(idx(), h);
+      vector.setValueCount(idx()+1);
+    </#if>
   }
 
   public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>) {
-    mutator.setSafe(idx()<#if mode == "Nullable">, 1</#if><#list fields as field><#if field.include!true >, ${field.name}</#if></#list>);
-    vector.getMutator().setValueCount(idx()+1);
+    <#if minor.class != "Int" && minor.class != "VarChar">
+      mutator.setSafe(idx()<#if mode == "Nullable">, 1</#if><#list fields as field><#if field.include!true >, ${field.name}</#if></#list>);
+      vector.getMutator().setValueCount(idx()+1);
+    <#else>
+      vector.setSafe(idx()<#if mode == "Nullable">, 1</#if><#list fields as field><#if field.include!true >, ${field.name}</#if></#list>);
+      vector.setValueCount(idx()+1);
+    </#if>
   }
   <#if minor.class == "Decimal">
 
@@ -126,8 +146,13 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
   <#if mode == "Nullable">
 
   public void writeNull() {
-    mutator.setNull(idx());
-    vector.getMutator().setValueCount(idx()+1);
+    <#if minor.class != "Int" && minor.class != "VarChar">
+        mutator.setNull(idx());
+        vector.getMutator().setValueCount(idx()+1);
+    <#else>
+        vector.setNull(idx());
+        vector.setValueCount(idx()+1);
+    </#if>
   }
   </#if>
   </#if>
diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java b/java/vector/src/main/codegen/templates/NullableValueVectors.java
index 122cd23..5d1f5a3 100644
--- a/java/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -19,8 +19,14 @@
 <#list vv.types as type>
 <#list type.minor as minor>
 
+<#if minor.class == "Int" || minor.class == "VarChar">
+<#assign className = "LegacyNullable${minor.class}Vector" />
+<#assign valuesName = "Nullable${minor.class}Vector" />
+<#else>
 <#assign className = "Nullable${minor.class}Vector" />
 <#assign valuesName = "${minor.class}Vector" />
+</#if>
+
 <#assign friendlyType = (minor.friendlyType!minor.boxedType!type.boxedType) />
 
 <@pp.changeOutputFile name="/org/apache/arrow/vector/${className}.java" />
@@ -44,15 +50,24 @@ import org.apache.arrow.flatbuf.Precision;
  * NB: this class is automatically generated from ${.template_name} and ValueVectorTypes.tdd using FreeMarker.
  */
 @SuppressWarnings("unused")
-public final class ${className} extends BaseValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector, FieldVector {
+<#if minor.class == "Int" || minor.class == "VarChar">
+@Deprecated
+</#if>
+public final class ${className} extends BaseValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, FieldVector {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${className}.class);
 
 protected final static byte[] emptyByteArray = new byte[]{};
+
+  <#if minor.class != "Int" && minor.class != "VarChar">
   private final FieldReader reader = new ${minor.class}ReaderImpl(${className}.this);
+  </#if>
 
   private final String bitsField = "$bits$";
   private final String valuesField = "$values$";
+
+  <#if minor.class != "Int" && minor.class != "VarChar">
   private final Field field;
+  </#if>
 
   final BitVector bits = new BitVector(bitsField, allocator);
   final ${valuesName} values;
@@ -60,7 +75,9 @@ protected final static byte[] emptyByteArray = new byte[]{};
   private final Mutator mutator;
   private final Accessor accessor;
 
+  <#if minor.class != "Int" && minor.class != "VarChar">
   private final List<BufferBacked> innerVectors;
+  </#if>
 
   <#if minor.typeParams??>
     <#assign typeParams = minor.typeParams?reverse>
@@ -105,6 +122,7 @@ protected final static byte[] emptyByteArray = new byte[]{};
     </#if>
     this.mutator = new Mutator();
     this.accessor = new Accessor();
+    <#if minor.class != "Int" && minor.class != "VarChar">
     this.field = new Field(name, fieldType, null);
     innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(
         bits,
@@ -113,16 +131,24 @@ protected final static byte[] emptyByteArray = new byte[]{};
         </#if>
         values
     ));
+    </#if>
   }
 
-  @Override
+  <#if minor.class != "Int" && minor.class != "VarChar">
+  /* not needed for new vectors */
   public BitVector getValidityVector() {
     return bits;
   }
+  </#if>
 
   @Override
   public List<BufferBacked> getFieldInnerVectors() {
-    return innerVectors;
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getFieldInnerVectors();
+    <#else>
+        return innerVectors;
+    </#if>
   }
 
   @Override
@@ -139,6 +165,10 @@ protected final static byte[] emptyByteArray = new byte[]{};
 
   @Override
   public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.loadFieldBuffers(fieldNode, ownBuffers);
+    <#else>
     <#if type.major = "VarLen">
     // variable width values: truncate offset vector buffer to size (#1)
     org.apache.arrow.vector.BaseDataValueVector.truncateBufferBasedOnSize(ownBuffers, 1,
@@ -151,34 +181,64 @@ protected final static byte[] emptyByteArray = new byte[]{};
     </#if>
     org.apache.arrow.vector.BaseDataValueVector.load(fieldNode, getFieldInnerVectors(), ownBuffers);
     bits.valueCount = fieldNode.getLength();
+    </#if>
   }
 
   public List<ArrowBuf> getFieldBuffers() {
-    return org.apache.arrow.vector.BaseDataValueVector.unload(getFieldInnerVectors());
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getFieldBuffers();
+    <#else>
+        return org.apache.arrow.vector.BaseDataValueVector.unload(getFieldInnerVectors());
+    </#if>
   }
 
   @Override
   public Field getField() {
-    return field;
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getField();
+    <#else>
+      return field;
+    </#if>
   }
 
   @Override
   public MinorType getMinorType() {
-    return MinorType.${minor.class?upper_case};
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getMinorType();
+    <#else>
+        return MinorType.${minor.class?upper_case};
+    </#if>
   }
 
   @Override
   public FieldReader getReader(){
-    return reader;
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getReader();
+    <#else>
+        return reader;
+    </#if>
   }
 
   @Override
   public int getValueCapacity(){
-    return Math.min(bits.getValueCapacity(), values.getValueCapacity());
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getValueCapacity();
+    <#else>
+        return Math.min(bits.getValueCapacity(), values.getValueCapacity());
+    </#if>
   }
 
   @Override
   public ArrowBuf[] getBuffers(boolean clear) {
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getBuffers(clear);
+    <#else>
     final ArrowBuf[] buffers = ObjectArrays.concat(bits.getBuffers(false), values.getBuffers(false), ArrowBuf.class);
     if (clear) {
       for (final ArrowBuf buffer:buffers) {
@@ -187,25 +247,41 @@ protected final static byte[] emptyByteArray = new byte[]{};
       clear();
     }
     return buffers;
+    </#if>
   }
 
   @Override
   public void close() {
-    bits.close();
-    values.close();
-    super.close();
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.close();
+    <#else>
+        bits.close();
+        values.close();
+        super.close();
+    </#if>
   }
 
   @Override
   public void clear() {
-    bits.clear();
-    values.clear();
-    super.clear();
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.clear();
+    <#else>
+        bits.clear();
+        values.clear();
+        super.clear();
+    </#if>
   }
 
   @Override
   public int getBufferSize(){
-    return values.getBufferSize() + bits.getBufferSize();
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getBufferSize();
+    <#else>
+        return values.getBufferSize() + bits.getBufferSize();
+    </#if>
   }
 
   @Override
@@ -214,34 +290,52 @@ protected final static byte[] emptyByteArray = new byte[]{};
       return 0;
     }
 
-    return values.getBufferSizeFor(valueCount)
-        + bits.getBufferSizeFor(valueCount);
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getBufferSizeFor(valueCount);
+    <#else>
+        return values.getBufferSizeFor(valueCount)
+          + bits.getBufferSizeFor(valueCount);
+    </#if>
   }
 
   public ArrowBuf getBuffer() {
     return values.getDataBuffer();
   }
 
-  @Override
   public ${valuesName} getValuesVector() {
     return values;
   }
 
   @Override
   public void setInitialCapacity(int numRecords) {
-    bits.setInitialCapacity(numRecords);
-    values.setInitialCapacity(numRecords);
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.setInitialCapacity(numRecords);
+    <#else>
+        bits.setInitialCapacity(numRecords);
+        values.setInitialCapacity(numRecords);
+    </#if>
   }
 
   @Override
   public void allocateNew() {
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.allocateNew();
+    <#else>
     if(!allocateNewSafe()){
       throw new OutOfMemoryException("Failure while allocating buffer.");
     }
+    </#if>
   }
 
   @Override
   public boolean allocateNewSafe() {
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.allocateNewSafe();
+    <#else>
     /* Boolean to keep track if all the memory allocations were successful
      * Used in the case of composite vectors when we need to allocate multiple
      * buffers for multiple vectors. If one of the allocations failed we need to
@@ -259,23 +353,38 @@ protected final static byte[] emptyByteArray = new byte[]{};
     mutator.reset();
     accessor.reset();
     return success;
+    </#if>
   }
 
   @Override
   public void reAlloc() {
-    bits.reAlloc();
-    values.reAlloc();
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.reAlloc();
+    <#else>
+        bits.reAlloc();
+        values.reAlloc();
+    </#if>
   }
 
   public void reset() {
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.reset();
+    <#else>
     bits.zeroVector();
     mutator.reset();
     accessor.reset();
+    </#if>
   }
 
   <#if type.major == "VarLen">
   @Override
   public void allocateNew(int totalBytes, int valueCount) {
+    <#if minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        values.allocateNew(totalBytes, valueCount);
+    <#else>
     try {
       values.allocateNew(totalBytes, valueCount);
       bits.allocateNew(valueCount);
@@ -286,6 +395,7 @@ protected final static byte[] emptyByteArray = new byte[]{};
     bits.zeroVector();
     mutator.reset();
     accessor.reset();
+    </#if>
   }
 
   @Override
@@ -301,6 +411,10 @@ protected final static byte[] emptyByteArray = new byte[]{};
   <#else>
   @Override
   public void allocateNew(int valueCount) {
+    <#if minor.class == "Int">
+        /* DELEGATE TO NEW VECTOR */
+        values.allocateNew(valueCount);
+    <#else>
     try {
       values.allocateNew(valueCount);
       bits.allocateNew(valueCount);
@@ -311,6 +425,7 @@ protected final static byte[] emptyByteArray = new byte[]{};
     bits.zeroVector();
     mutator.reset();
     accessor.reset();
+    </#if>
   }
 
   /**
@@ -318,32 +433,86 @@ protected final static byte[] emptyByteArray = new byte[]{};
    */
   @Override
   public void zeroVector() {
-    bits.zeroVector();
-    values.zeroVector();
+    <#if minor.class == "Int">
+        /* DELEGATE TO NEW VECTOR */
+        values.zeroVector();
+    <#else>
+        bits.zeroVector();
+        values.zeroVector();
+    </#if>
   }
   </#if>
 
+
+
   @Override
   public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getTransferPair(ref, allocator, callBack);
+    <#else>
         return getTransferPair(ref, allocator);
+    </#if>
   }
 
+
+
   @Override
   public TransferPair getTransferPair(BufferAllocator allocator){
-    return new TransferImpl(name, allocator);
-
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getTransferPair(allocator);
+    <#else>
+        return new TransferImpl(name, allocator);
+    </#if>
   }
 
+
+
   @Override
   public TransferPair getTransferPair(String ref, BufferAllocator allocator){
-    return new TransferImpl(ref, allocator);
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getTransferPair(ref, allocator);
+    <#else>
+        return new TransferImpl(ref, allocator);
+    </#if>
   }
 
+
+
   @Override
   public TransferPair makeTransferPair(ValueVector to) {
-    return new TransferImpl((${className}) to);
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.makeTransferPair(to);
+    <#else>
+        return new TransferImpl((${className}) to);
+    </#if>
+  }
+
+
+
+  <#if minor.class == "Int" || minor.class == "VarChar">
+  public void transferTo(${valuesName} target) {
+    /* DELEGATE TO NEW VECTOR */
+    <#if type.major == "VarLen">
+        values.transferTo((BaseNullableVariableWidthVector) target);
+    <#else>
+        values.transferTo((BaseNullableFixedWidthVector) target);
+    </#if>
+  }
+
+  public void splitAndTransferTo(int startIndex, int length, ${valuesName} target) {
+    /* DELEGATE TO NEW VECTOR */
+    <#if type.major == "VarLen">
+        values.splitAndTransferTo(startIndex, length, (BaseNullableVariableWidthVector) target);
+    <#else>
+        values.splitAndTransferTo(startIndex, length, (BaseNullableFixedWidthVector) target);
+    </#if>
   }
 
+  <#else>
   public void transferTo(${className} target){
     bits.transferTo(target.bits);
     values.transferTo(target.values);
@@ -360,7 +529,11 @@ protected final static byte[] emptyByteArray = new byte[]{};
     target.mutator.lastSet = length - 1;
     </#if>
   }
+  </#if>
 
+
+
+  <#if minor.class != "Int" && minor.class != "VarChar">
   private class TransferImpl implements TransferPair {
     ${className} to;
 
@@ -392,6 +565,9 @@ protected final static byte[] emptyByteArray = new byte[]{};
       to.copyFromSafe(fromIndex, toIndex, ${className}.this);
     }
   }
+  </#if>
+
+
 
   @Override
   public Accessor getAccessor(){
@@ -403,7 +579,20 @@ protected final static byte[] emptyByteArray = new byte[]{};
     return mutator;
   }
 
-  public void copyFrom(int fromIndex, int thisIndex, ${className} from){
+
+
+  <#if minor.class == "Int" || minor.class == "VarChar">
+  public void copyFrom(int fromIndex, int thisIndex, ${valuesName} from) {
+    /* DELEGATE TO NEW VECTOR */
+    values.copyFrom(fromIndex, thisIndex, from);
+  }
+
+  public void copyFromSafe(int fromIndex, int thisIndex, ${valuesName} from) {
+    /* DELEGATE TO NEW VECTOR */
+    values.copyFromSafe(fromIndex, thisIndex, from);
+  }
+  <#else>
+  public void copyFrom(int fromIndex, int thisIndex, ${className} from) {
     final Accessor fromAccessor = from.getAccessor();
     if (!fromAccessor.isNull(fromIndex)) {
       mutator.set(thisIndex, fromAccessor.get(fromIndex));
@@ -428,17 +617,28 @@ protected final static byte[] emptyByteArray = new byte[]{};
     values.copyFromSafe(fromIndex, thisIndex, from.values);
     <#if type.major == "VarLen">mutator.lastSet = thisIndex;</#if>
   }
+  </#if>
 
   @Override
   public long getValidityBufferAddress() {
     /* address of the databuffer associated with the bitVector */
-    return (bits.getDataBuffer().memoryAddress());
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getValidityBufferAddress();
+    <#else>
+        return (bits.getDataBuffer().memoryAddress());
+    </#if>
   }
 
   @Override
   public long getDataBufferAddress() {
     /* address of the dataBuffer associated with the valueVector */
-    return (values.getDataBuffer().memoryAddress());
+    <#if minor.class == "Int" || minor.class == "VarChar">
+          /* DELEGATE TO NEW VECTOR */
+          return values.getDataBufferAddress();
+    <#else>
+          return (bits.getDataBuffer().memoryAddress());
+    </#if>
   }
 
   @Override
@@ -446,17 +646,26 @@ protected final static byte[] emptyByteArray = new byte[]{};
     /* address of the dataBuffer associated with the offsetVector
      * this operation is not supported for fixed-width vector types.
      */
-    <#if type.major != "VarLen">
-        throw new UnsupportedOperationException();
+    <#if minor.class == "Int" || minor.class == "VarChar">
+          /* DELEGATE TO NEW VECTOR */
+          return values.getOffsetBufferAddress();
     <#else>
-        return (values.getOffsetAddr());
+        <#if type.major != "VarLen">
+          throw new UnsupportedOperationException();
+        <#else>
+          return (values.getOffsetAddr());
+        </#if>
     </#if>
   }
 
   @Override
   public ArrowBuf getValidityBuffer() {
-    /* dataBuffer associated with the bitVector */
-    return (bits.getDataBuffer());
+    <#if minor.class == "Int" || minor.class == "VarChar">
+          /* DELEGATE TO NEW VECTOR */
+          return values.getValidityBuffer();
+    <#else>
+          return (bits.getDataBuffer());
+    </#if>
   }
 
   @Override
@@ -468,10 +677,15 @@ protected final static byte[] emptyByteArray = new byte[]{};
   @Override
   public ArrowBuf getOffsetBuffer() {
     /* dataBuffer associated with the offsetVector of the valueVector */
-    <#if type.major != "VarLen">
-        throw new UnsupportedOperationException();
+    <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getOffsetBuffer();
     <#else>
-        return (values.getOffsetBuffer());
+        <#if type.major != "VarLen">
+          throw new UnsupportedOperationException();
+        <#else>
+          return (values.getOffsetBuffer());
+        </#if>
     </#if>
   }
 
@@ -485,38 +699,80 @@ protected final static byte[] emptyByteArray = new byte[]{};
      * @param  index   position of the value
      * @return value of the element, if not null
      */
-    public <#if type.major == "VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) {
-      if (isNull(index)) {
+    <#if minor.class == "Int" || minor.class == "VarChar">
+      public <#if type.major == "VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) {
+        /* DELEGATE TO NEW VECTOR */
+        return values.get(index);
+      }
+    <#else>
+
+      public <#if type.major == "VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) {
+        if (isNull(index)) {
           throw new IllegalStateException("Can't get a null value");
+        }
+        return vAccessor.get(index);
       }
-      return vAccessor.get(index);
-    }
+    </#if>
 
     @Override
     public boolean isNull(int index) {
-      return isSet(index) == 0;
+      <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.isNull(index);
+      <#else>
+        return isSet(index) == 0;
+      </#if>
     }
 
     public int isSet(int index){
-      return bAccessor.get(index);
+      <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.isSet(index);
+      <#else>
+        return bAccessor.get(index);
+      </#if>
     }
 
     <#if type.major == "VarLen">
     public long getStartEnd(int index){
-      return vAccessor.getStartEnd(index);
+      <#if minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getStartEnd(index);
+      <#else>
+        return vAccessor.getStartEnd(index);
+      </#if>
     }
 
     @Override
     public int getValueLength(int index) {
-      return values.getAccessor().getValueLength(index);
+      <#if minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getValueLength(index);
+      <#else>
+        return values.getAccessor().getValueLength(index);
+      </#if>
     }
     </#if>
 
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public void get(int index, Nullable${minor.class}Holder holder){
+        /* DELEGATE TO NEW VECTOR */
+        values.get(index, holder);
+    }
+    <#else>
     public void get(int index, Nullable${minor.class}Holder holder){
       vAccessor.get(index, holder);
       holder.isSet = bAccessor.get(index);
     }
+    </#if>
 
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    @Override
+    public ${friendlyType} getObject(int index) {
+      /* DELEGATE TO NEW VECTOR */
+      return values.getObject(index);
+    }
+    <#else>
     @Override
     public ${friendlyType} getObject(int index) {
       if (isNull(index)) {
@@ -525,6 +781,7 @@ protected final static byte[] emptyByteArray = new byte[]{};
         return vAccessor.getObject(index);
       }
     }
+    </#if>
 
     <#if minor.class == "IntervalYear" || minor.class == "IntervalDay">
     public StringBuilder getAsStringBuilder(int index) {
@@ -538,7 +795,12 @@ protected final static byte[] emptyByteArray = new byte[]{};
 
     @Override
     public int getValueCount(){
-      return bits.getAccessor().getValueCount();
+      <#if minor.class == "Int" || minor.class == "VarChar">
+        /* DELEGATE TO NEW VECTOR */
+        return values.getValueCount();
+      <#else>
+        return bits.getAccessor().getValueCount();
+      </#if>
     }
 
     public void reset(){}
@@ -551,21 +813,35 @@ protected final static byte[] emptyByteArray = new byte[]{};
     private Mutator(){
     }
 
-    public ${valuesName} getVectorWithValues(){
+    public ${valuesName} getVectorWithValues() {
       return values;
     }
 
+
     @Override
     public void setIndexDefined(int index){
+      <#if minor.class == "Int" || minor.class == "VarChar">
+      values.setIndexDefined(index);
+      <#else>
       bits.getMutator().setToOne(index);
+      </#if>
     }
 
+
+
     /**
      * Set the variable length element at the specified index to the supplied byte array.
      *
      * @param index   position of the bit to set
      * @param value   array of bytes (or int if smaller than 4 bytes) to write
      */
+
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public void set(int index, <#if type.major == "VarLen">byte[]<#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) {
+       /* DELEGATE TO NEW VECTOR */
+       values.set(index, value);
+    }
+    <#else>
     public void set(int index, <#if type.major == "VarLen">byte[]<#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) {
       setCount++;
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
@@ -579,9 +855,24 @@ protected final static byte[] emptyByteArray = new byte[]{};
       valuesMutator.set(index, value);
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
+    </#if>
+
+
 
     <#if type.major == "VarLen">
+    <#if minor.class == "VarChar">
+    public void fillEmpties(int index) {
+      /* DELEGATE TO NEW VECTOR */
+      values.fillEmpties(index);
+    }
 
+    @Override
+    public void setValueLengthSafe(int index, int length) {
+      /* DELEGATE TO NEW VECTOR */
+      values.setValueLengthSafe(index, length);
+    }
+
+    <#else>
     public void fillEmpties(int index){
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       for (int i = lastSet + 1; i < index; i++) {
@@ -599,7 +890,16 @@ protected final static byte[] emptyByteArray = new byte[]{};
       lastSet = index;
     }
     </#if>
+    </#if>
 
+
+
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public void setSafe(int index, byte[] value, int start, int length) {
+       /* DELEGATE TO NEW VECTOR */
+      values.setSafe(index, value, start, length);
+    }
+    <#else>
     public void setSafe(int index, byte[] value, int start, int length) {
       <#if type.major != "VarLen">
       throw new UnsupportedOperationException();
@@ -612,7 +912,16 @@ protected final static byte[] emptyByteArray = new byte[]{};
       <#if type.major == "VarLen">lastSet = index;</#if>
       </#if>
     }
+    </#if>
+
+
 
+    <#if minor.class == "VarChar">
+    public void setSafe(int index, ByteBuffer value, int start, int length) {
+       /* DELEGATE TO NEW VECTOR */
+       values.setSafe(index, value, start, length);
+    }
+    <#else>
     public void setSafe(int index, ByteBuffer value, int start, int length) {
       <#if type.major != "VarLen">
       throw new UnsupportedOperationException();
@@ -625,11 +934,25 @@ protected final static byte[] emptyByteArray = new byte[]{};
       <#if type.major == "VarLen">lastSet = index;</#if>
       </#if>
     }
+    </#if>
 
+
+
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public void setNull(int index) {
+       /* DELEGATE TO NEW VECTOR */
+       values.setNull(index);
+    }
+    <#else>
     public void setNull(int index){
       bits.getMutator().setSafe(index, 0);
     }
+    </#if>
+
+
 
+    <#if minor.class != "Int" && minor.class != "VarChar">
+    /* these methods are probably not needed */
     public void setSkipNull(int index, ${minor.class}Holder holder){
       values.getMutator().set(index, holder);
     }
@@ -637,8 +960,17 @@ protected final static byte[] emptyByteArray = new byte[]{};
     public void setSkipNull(int index, Nullable${minor.class}Holder holder){
       values.getMutator().set(index, holder);
     }
+    </#if>
+
 
-    public void set(int index, Nullable${minor.class}Holder holder){
+
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public void set(int index, Nullable${minor.class}Holder holder) {
+      /* DELEGATE TO NEW VECTOR */
+      values.set(index, holder);
+    }
+    <#else>
+    public void set(int index, Nullable${minor.class}Holder holder) {
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       <#if type.major == "VarLen">
       for (int i = lastSet + 1; i < index; i++) {
@@ -649,8 +981,17 @@ protected final static byte[] emptyByteArray = new byte[]{};
       valuesMutator.set(index, holder);
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
+    </#if>
+
+
 
-    public void set(int index, ${minor.class}Holder holder){
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public void set(int index, ${minor.class}Holder holder) {
+        /* DELEGATE TO NEW VECTOR */
+        values.set(index, holder);
+    }
+    <#else>
+    public void set(int index, ${minor.class}Holder holder) {
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
       <#if type.major == "VarLen">
       for (int i = lastSet + 1; i < index; i++) {
@@ -661,11 +1002,45 @@ protected final static byte[] emptyByteArray = new byte[]{};
       valuesMutator.set(index, holder);
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
+    </#if>
 
+
+
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public boolean isSafe(int outIndex) {
+       /* DELEGATE TO NEW VECTOR */
+       return values.isSafe(outIndex);
+    }
+    <#else>
     public boolean isSafe(int outIndex) {
       return outIndex < ${className}.this.getValueCapacity();
     }
+    </#if>
+
+
 
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    <#if minor.class == "Int">
+    public void set(int index, int isSet, int valueField) {
+      /* DELEGATE TO NEW VECTOR */
+      values.set(index, isSet, valueField);
+    }
+    public void setSafe(int index, int isSet, int valueField) {
+      /* DELEGATE TO NEW VECTOR */
+      values.setSafe(index, isSet, valueField);
+    }
+    </#if>
+    <#if minor.class == "VarChar">
+    public void set(int index, int isSet, int startField, int endField, ArrowBuf bufferField ) {
+      /* DELEGATE TO NEW VECTOR */
+      values.set(index, isSet, startField, endField, bufferField);
+    }
+    public void setSafe(int index, int isSet, int startField, int endField, ArrowBuf bufferField ) {
+        /* DELEGATE TO NEW VECTOR */
+        values.setSafe(index, isSet, startField, endField, bufferField);
+    }
+    </#if>
+    <#else>
     <#assign fields = minor.fields!type.fields />
     public void set(int index, int isSet<#list fields as field>, ${field.type} ${field.name}Field</#list> ){
       final ${valuesName}.Mutator valuesMutator = values.getMutator();
@@ -688,8 +1063,21 @@ protected final static byte[] emptyByteArray = new byte[]{};
       setCount++;
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
+    </#if>
+
 
 
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    public void setSafe(int index, Nullable${minor.class}Holder value) {
+      /* DELEGATE TO NEW VECTOR */
+      values.setSafe(index, value);
+    }
+
+    public void setSafe(int index, ${minor.class}Holder value) {
+      /* DELEGATE TO NEW VECTOR */
+      values.setSafe(index, value);
+    }
+    <#else>
     public void setSafe(int index, Nullable${minor.class}Holder value) {
       <#if type.major == "VarLen">
       fillEmpties(index);
@@ -709,15 +1097,25 @@ protected final static byte[] emptyByteArray = new byte[]{};
       setCount++;
       <#if type.major == "VarLen">lastSet = index;</#if>
     }
+    </#if>
+
+
 
     <#if !(type.major == "VarLen" || minor.class == "IntervalDay")>
     public void setSafe(int index, ${minor.javaType!type.javaType} value) {
+      <#if minor.class == "Int">
+        /* DELEGATE TO NEW VECTOR */
+        values.setSafe(index, value);
+      <#else>
       bits.getMutator().setSafeToOne(index);
       values.getMutator().setSafe(index, value);
       setCount++;
+      </#if>
     }
-
     </#if>
+
+
+
     <#if minor.class == "Decimal">
     public void set(int index, ${friendlyType} value) {
       bits.getMutator().setToOne(index);
@@ -729,8 +1127,17 @@ protected final static byte[] emptyByteArray = new byte[]{};
       values.getMutator().setSafe(index, value);
       setCount++;
     }
-
     </#if>
+
+
+
+    <#if minor.class == "Int" || minor.class == "VarChar">
+    @Override
+    public void setValueCount(int valueCount) {
+      /* DELEGATE TO NEW VECTOR */
+      values.setValueCount(valueCount);
+    }
+    <#else>
     @Override
     public void setValueCount(int valueCount) {
       assert valueCount >= 0;
@@ -740,7 +1147,12 @@ protected final static byte[] emptyByteArray = new byte[]{};
       values.getMutator().setValueCount(valueCount);
       bits.getMutator().setValueCount(valueCount);
     }
+    </#if>
+
 
+
+    <#if minor.class != "Int" && minor.class != "VarChar">
+    /* THIS METHOD IS PROBABLY NOT NEEDED FOR NEW VECTORS */
     @Override
     public void generateTestData(int valueCount){
       bits.getMutator().generateTestDataAlt(valueCount);
@@ -748,13 +1160,27 @@ protected final static byte[] emptyByteArray = new byte[]{};
       <#if type.major = "VarLen">lastSet = valueCount;</#if>
       setValueCount(valueCount);
     }
+    </#if>
+
 
+
+    <#if minor.class != "Int" && minor.class != "VarChar">
+    /* MUTATOR RESET IS NOT NEEDED FOR NEW VECTORS */
     @Override
     public void reset(){
       setCount = 0;
       <#if type.major = "VarLen">lastSet = -1;</#if>
     }
+    </#if>
+
+
 
+    <#if minor.class == "VarChar">
+    public void setLastSet(int value) {
+      /* DELEGATE TO NEW VECTOR */
+      values.setLastSet(value);
+    }
+    <#else>
     public void setLastSet(int value) {
       <#if type.major = "VarLen">
         lastSet = value;
@@ -762,7 +1188,16 @@ protected final static byte[] emptyByteArray = new byte[]{};
         throw new UnsupportedOperationException();
       </#if>
     }
+    </#if>
+
 
+
+    <#if minor.class == "VarChar">
+    public int getLastSet() {
+      /* DELEGATE TO NEW VECTOR */
+      return values.getLastSet();
+    }
+    <#else>
     public int getLastSet() {
       <#if type.major != "VarLen">
         throw new UnsupportedOperationException();
@@ -770,6 +1205,7 @@ protected final static byte[] emptyByteArray = new byte[]{};
         return lastSet;
       </#if>
     }
+    </#if>
   }
 }
 </#list>
diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java
index fe24a86..3c7ed01 100644
--- a/java/vector/src/main/codegen/templates/UnionVector.java
+++ b/java/vector/src/main/codegen/templates/UnionVector.java
@@ -436,7 +436,11 @@ public class UnionVector implements FieldVector {
           <#assign uncappedName = name?uncap_first/>
           <#if !minor.typeParams?? >
       case ${name?upper_case}:
-        return get${name}Vector().getAccessor().getObject(index);
+        <#if minor.class != "Int" && minor.class != "VarChar">
+          return get${name}Vector().getAccessor().getObject(index);
+        <#else>
+          return get${name}Vector().getObject(index);
+        </#if>
           </#if>
         </#list>
       </#list>
@@ -530,7 +534,11 @@ public class UnionVector implements FieldVector {
         <#if !minor.typeParams?? >
     public void setSafe(int index, Nullable${name}Holder holder) {
       setType(index, MinorType.${name?upper_case});
-      get${name}Vector().getMutator().setSafe(index, holder);
+      <#if minor.class != "Int" && minor.class != "VarChar">
+        get${name}Vector().getMutator().setSafe(index, holder);
+      <#else>
+        get${name}Vector().setSafe(index, holder);
+      </#if>
     }
 
         </#if>
@@ -547,4 +555,10 @@ public class UnionVector implements FieldVector {
     @Override
     public void generateTestData(int values) { }
   }
+
+  public int getValueCount() { return 0; }
+
+  public void setValueCount(int valueCount) { }
+
+  public Object getObject(int index) { return null; }
 }
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java
new file mode 100644
index 0000000..c5f7810
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java
@@ -0,0 +1,701 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector;
+
+import io.netty.buffer.ArrowBuf;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.arrow.memory.OutOfMemoryException;
+import org.apache.arrow.memory.BaseAllocator;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.util.CallBack;
+import org.apache.arrow.vector.util.OversizedAllocationException;
+import org.apache.arrow.vector.util.TransferPair;
+
+public abstract class BaseNullableFixedWidthVector extends BaseValueVector
+        implements FixedWidthVector, FieldVector {
+   private final byte typeWidth;
+
+   private int valueAllocationSizeInBytes;
+   private int validityAllocationSizeInBytes;
+
+   protected final Field field;
+   private int allocationMonitor;
+   protected ArrowBuf validityBuffer;
+   protected ArrowBuf valueBuffer;
+   protected int valueCount;
+
+   public BaseNullableFixedWidthVector(final String name, final BufferAllocator allocator,
+                                       FieldType fieldType, final byte typeWidth) {
+      super(name, allocator);
+      this.typeWidth = typeWidth;
+      valueAllocationSizeInBytes = INITIAL_VALUE_ALLOCATION * typeWidth;
+      validityAllocationSizeInBytes = getSizeFromCount(INITIAL_VALUE_ALLOCATION);
+      field = new Field(name, fieldType, null);
+      valueCount = 0;
+      allocationMonitor = 0;
+      validityBuffer = allocator.getEmpty();
+      valueBuffer = allocator.getEmpty();
+   }
+
+   /* TODO:
+    * Determine how writerIndex and readerIndex need to be used. Right now we
+    * are setting the writerIndex and readerIndex in the call to getFieldBuffers
+    * using the valueCount -- this assumes that the caller of getFieldBuffers
+    * on the vector has already invoked setValueCount.
+    *
+    * Do we need to set them during vector transfer and splitAndTransfer?
+    */
+
+   /* TODO:
+    *
+    * see if getNullCount() can be made faster -- O(1)
+    */
+
+   /* TODO:
+    * Once the entire hierarchy has been refactored, move common functions
+    * like getNullCount(), splitAndTransferValidityBuffer to top level
+    * base class BaseValueVector.
+    *
+    * Along with this, some class members (validityBuffer) can also be
+    * abstracted out to top level base class.
+    *
+    * Right now BaseValueVector is the top level base class for other
+    * vector types in ValueVector hierarchy and those vectors have not
+    * yet been refactored so moving things to the top class as of now
+    * is not a good idea.
+    */
+
+   /* TODO:
+    * See if we need logger -- unnecessary object probably
+    */
+
+   protected abstract org.slf4j.Logger getLogger();
+
+   @Override
+   public Mutator getMutator() {
+      throw new  UnsupportedOperationException("Mutator is not needed to write into vector");
+   }
+
+   @Override
+   public Accessor getAccessor() {
+      throw new UnsupportedOperationException("Accessor is not needed to read from vector");
+   }
+
+   @Override
+   public long getValidityBufferAddress() {
+      return (validityBuffer.memoryAddress());
+   }
+
+   @Override
+   public long getDataBufferAddress() {
+      return (valueBuffer.memoryAddress());
+   }
+
+   @Override
+   public long getOffsetBufferAddress() {
+      throw new UnsupportedOperationException("not supported for fixed-width vectors");
+   }
+
+   @Override
+   public ArrowBuf getValidityBuffer() {
+      return validityBuffer;
+   }
+
+   @Override
+   public ArrowBuf getDataBuffer() {
+      return valueBuffer;
+   }
+
+   @Override
+   public ArrowBuf getOffsetBuffer() {
+      throw new UnsupportedOperationException("not supported for fixed-width vectors");
+   }
+
+   @Override
+   public void setInitialCapacity(int valueCount) {
+      final long size = (long)valueCount * typeWidth;
+      if (size > MAX_ALLOCATION_SIZE) {
+         throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
+      }
+      valueAllocationSizeInBytes = (int)size;
+      validityAllocationSizeInBytes = getSizeFromCount(valueCount);
+   }
+
+   @Override
+   public int getValueCapacity(){
+      return Math.min(getValueBufferValueCapacity(), getValidityBufferValueCapacity());
+   }
+
+   /* for test purposes */
+   private int getValueBufferValueCapacity() {
+      return (int)((valueBuffer.capacity() * 1.0)/typeWidth);
+   }
+
+   /* for test purposes */
+   private int getValidityBufferValueCapacity() {
+      return (int)(validityBuffer.capacity() * 8L);
+   }
+
+   /* number of bytes for the validity buffer for the given valueCount */
+   protected int getSizeFromCount(int valueCount) {
+      return (int) Math.ceil(valueCount / 8.0);
+   }
+
+   @Override
+   public void zeroVector() {
+      initValidityBuffer();
+      initValueBuffer();
+   }
+
+   private void initValidityBuffer() {
+      validityBuffer.setZero(0, validityBuffer.capacity());
+   }
+
+   private void initValueBuffer() {
+      valueBuffer.setZero(0, valueBuffer.capacity());
+   }
+
+   public void reset() {
+      zeroVector();
+   }
+
+   @Override
+   public void close() { clear(); }
+
+   @Override
+   public void clear() {
+      validityBuffer = releaseBuffer(validityBuffer);
+      valueBuffer = releaseBuffer(valueBuffer);
+   }
+
+   /* used to step down the memory allocation */
+   protected void incrementAllocationMonitor() {
+      if (allocationMonitor < 0) {
+         allocationMonitor = 0;
+      }
+      allocationMonitor++;
+   }
+
+   /* used to step up the memory allocation */
+   protected void decrementAllocationMonitor() {
+      if (allocationMonitor > 0) {
+         allocationMonitor = 0;
+      }
+      allocationMonitor--;
+   }
+
+   @Override
+   public void allocateNew() {
+      if(!allocateNewSafe()){
+         throw new OutOfMemoryException("Failure while allocating memory.");
+      }
+   }
+
+   public boolean allocateNewSafe() {
+      long curAllocationSizeValue = valueAllocationSizeInBytes;
+      long curAllocationSizeValidity = validityAllocationSizeInBytes;
+
+      if (curAllocationSizeValue > MAX_ALLOCATION_SIZE) {
+         throw new OversizedAllocationException("Requested amount of memory exceeds limit");
+      }
+
+      /* we are doing a new allocation -- release the current buffers */
+      clear();
+
+      try{
+         allocateBytes(curAllocationSizeValue, curAllocationSizeValidity);
+      } catch (Exception e) {
+         getLogger().error("ERROR: Failure in allocateNewSafe");
+         getLogger().error(e.getMessage());
+         clear();
+         return false;
+      }
+
+      return true;
+   }
+
+   public void allocateNew(int valueCount) {
+      long valueBufferSize = valueCount * typeWidth;
+      long validityBufferSize = getSizeFromCount(valueCount);
+
+      if (allocationMonitor > 10) {
+         /* step down the default memory allocation since we have observed
+          * multiple times that provisioned value capacity was much larger than
+          * actually needed. see setValueCount for more details.
+          */
+         valueBufferSize = Math.max(8, valueBufferSize / 2);
+         validityBufferSize = Math.max(8, validityBufferSize / 2);
+         allocationMonitor = 0;
+      } else if (allocationMonitor < -2) {
+         valueBufferSize = valueBufferSize * 2L;
+         validityBufferSize = validityBufferSize * 2L;
+         allocationMonitor = 0;
+      }
+
+      if (valueBufferSize > MAX_ALLOCATION_SIZE) {
+         throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
+      }
+
+      /* we are doing a new allocation -- release the current buffers */
+      clear();
+
+      try {
+         allocateBytes(valueBufferSize, validityBufferSize);
+      } catch(Exception e) {
+         getLogger().error("ERROR: Failure in allocateNew");
+         getLogger().error(e.getMessage());
+         clear();
+         throw e;
+      }
+   }
+
+   /**
+    * Actual memory allocation is done by this function. All the calculations
+    * and knowledge about what size to allocate is upto the callers of this
+    * method.
+    * Callers appropriately handle errors if memory allocation fails here.
+    * Callers should also take care of determining that desired size is
+    * within the bounds of max allocation allowed and any other error
+    * conditions.
+    */
+   private void allocateBytes(final long valueBufferSize, final long validityBufferSize) {
+      /* allocate data buffer */
+      int curSize = (int)valueBufferSize;
+      valueBuffer = allocator.buffer(curSize);
+      valueBuffer.readerIndex(0);
+      valueAllocationSizeInBytes = curSize;
+
+      /* allocate validity buffer */
+      allocateValidityBuffer((int)validityBufferSize);
+      initValidityBuffer();
+   }
+
+   /*
+    * during splitAndTransfer, if we splitting from a random position within a byte,
+    * we can't just slice the source buffer so we have to explicitly allocate the
+    * validityBuffer of the target vector. This is unlike the databuffer which we can
+    * always slice for the target vector.
+    */
+   private void allocateValidityBuffer(final int validityBufferSize) {
+      validityBuffer = allocator.buffer(validityBufferSize);
+      validityBuffer.readerIndex(0);
+      validityAllocationSizeInBytes = validityBufferSize;
+      initValidityBuffer();
+   }
+
+   @Override
+   public int getBufferSizeFor(final int count) {
+      if (count == 0) { return 0; }
+      return (count * typeWidth) + getSizeFromCount(count);
+   }
+
+   @Override
+   public int getBufferSize() {
+      if (valueCount == 0) { return 0; }
+      return (valueCount * typeWidth) + getSizeFromCount(valueCount);
+   }
+
+   @Override
+   public Field getField() {
+      return field;
+   }
+
+   @Override
+   public ArrowBuf[] getBuffers(boolean clear) {
+      final ArrowBuf[] buffers = new ArrowBuf[2];
+      buffers[0] = validityBuffer;
+      buffers[1] = valueBuffer;
+      if (clear) {
+         for (final ArrowBuf buffer:buffers) {
+            buffer.retain(1);
+         }
+         clear();
+      }
+      return buffers;
+   }
+
+   @Override
+   public void reAlloc() {
+      valueBuffer = reallocBufferHelper(valueBuffer, true);
+      validityBuffer = reallocBufferHelper(validityBuffer, false);
+   }
+
+   private ArrowBuf reallocBufferHelper(ArrowBuf buffer, final boolean dataBuffer) {
+      final int currentBufferCapacity = buffer.capacity();
+      long baseSize  = (dataBuffer ? valueAllocationSizeInBytes
+                                   : validityAllocationSizeInBytes);
+
+      if (baseSize < (long)currentBufferCapacity) {
+         baseSize = (long)currentBufferCapacity;
+      }
+
+      long newAllocationSize = baseSize * 2L;
+      newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);
+
+      if (newAllocationSize > MAX_ALLOCATION_SIZE) {
+         throw new OversizedAllocationException("Unable to expand the buffer");
+      }
+
+      getLogger().debug("Reallocating vector [{}]. # of bytes: [{}] -> [{}]",
+              name, (dataBuffer ? valueAllocationSizeInBytes : validityAllocationSizeInBytes),
+              newAllocationSize);
+
+      final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize);
+      newBuf.setBytes(0, buffer, 0, currentBufferCapacity);
+      final int halfNewCapacity = newBuf.capacity() / 2;
+      newBuf.setZero(halfNewCapacity, halfNewCapacity);
+      buffer.release(1);
+      buffer = newBuf;
+      if (dataBuffer) {
+         valueAllocationSizeInBytes = (int)newAllocationSize;
+      }
+      else {
+         validityAllocationSizeInBytes = (int)newAllocationSize;
+      }
+
+      return buffer;
+   }
+
+   @Override
+   public List<BufferBacked> getFieldInnerVectors() { throw new UnsupportedOperationException(); }
+
+   @Override
+   public void initializeChildrenFromFields(List<Field> children) {
+      if (!children.isEmpty()) {
+         throw new IllegalArgumentException("primitive type vector can not have children");
+      }
+   }
+
+   @Override
+   public List<FieldVector> getChildrenFromFields() {
+      return Collections.emptyList();
+   }
+
+   @Override
+   public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
+      if (ownBuffers.size() != 2) {
+         throw new IllegalArgumentException("Illegal buffer count, expected " + 2 + ", got: " + ownBuffers.size());
+      }
+
+      ArrowBuf bitBuffer = ownBuffers.get(0);
+      ArrowBuf dataBuffer = ownBuffers.get(1);
+
+      validityBuffer.release();
+      validityBuffer = bitBuffer.retain(allocator);
+      valueBuffer.release();
+      valueBuffer = dataBuffer.retain(allocator);
+
+      valueCount = fieldNode.getLength();
+
+      valueAllocationSizeInBytes = valueBuffer.capacity();
+      validityAllocationSizeInBytes = validityBuffer.capacity();
+   }
+
+   public List<ArrowBuf> getFieldBuffers() {
+      List<ArrowBuf> result = new ArrayList<>(2);
+
+      validityBuffer.readerIndex(0);
+      validityBuffer.writerIndex(getSizeFromCount(valueCount));
+      valueBuffer.readerIndex(0);
+      valueBuffer.writerIndex(valueCount * typeWidth);
+
+      result.add(validityBuffer);
+      result.add(valueBuffer);
+
+      return result;
+   }
+
+   @Override
+   public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
+      return getTransferPair(ref, allocator);
+   }
+
+   @Override
+   public TransferPair getTransferPair(BufferAllocator allocator){
+      return getTransferPair(name, allocator);
+   }
+
+   public abstract TransferPair getTransferPair(String ref, BufferAllocator allocator);
+
+   public void transferTo(BaseNullableFixedWidthVector target){
+      compareTypes(target, "transferTo");
+      target.clear();
+      target.validityBuffer = validityBuffer.transferOwnership(target.allocator).buffer;
+      target.valueBuffer = valueBuffer.transferOwnership(target.allocator).buffer;
+      target.valueCount = valueCount;
+      clear();
+   }
+
+   public void splitAndTransferTo(int startIndex, int length,
+                                  BaseNullableFixedWidthVector target) {
+      compareTypes(target, "splitAndTransferTo");
+      target.clear();
+      splitAndTransferValidityBuffer(startIndex, length, target);
+      splitAndTransferValueBuffer(startIndex, length, target);
+      target.setValueCount(length);
+   }
+
+   private void splitAndTransferValueBuffer(int startIndex, int length,
+                                            BaseNullableFixedWidthVector target) {
+      final int startPoint = startIndex * typeWidth;
+      final int sliceLength = length * typeWidth;
+      target.valueBuffer = valueBuffer.slice(startPoint, sliceLength).transferOwnership(target.allocator).buffer;
+   }
+
+   private void splitAndTransferValidityBuffer(int startIndex, int length,
+                                               BaseNullableFixedWidthVector target) {
+      assert startIndex + length <= valueCount;
+      int firstByteSource = BitVectorHelper.byteIndex(startIndex);
+      int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1);
+      int byteSizeTarget = getSizeFromCount(length);
+      int offset = startIndex % 8;
+
+      if (length > 0) {
+         if (offset == 0) {
+            // slice
+            if (target.validityBuffer != null) {
+               target.validityBuffer.release();
+            }
+            target.validityBuffer = validityBuffer.slice(firstByteSource, byteSizeTarget);
+            target.validityBuffer.retain(1);
+         }
+         else {
+            /* Copy data
+             * When the first bit starts from the middle of a byte (offset != 0),
+             * copy data from src BitVector.
+             * Each byte in the target is composed by a part in i-th byte,
+             * another part in (i+1)-th byte.
+             */
+            target.allocateValidityBuffer(byteSizeTarget);
+
+            for (int i = 0; i < byteSizeTarget - 1; i++) {
+               byte b1 = getBitsFromCurrentByte(this.validityBuffer, firstByteSource + i, offset);
+               byte b2 = getBitsFromNextByte(this.validityBuffer, firstByteSource + i + 1, offset);
+
+               target.validityBuffer.setByte(i, (b1 + b2));
+            }
+
+            /* Copying the last piece is done in the following manner:
+             * if the source vector has 1 or more bytes remaining, we copy
+             * the last piece as a byte formed by shifting data
+             * from the current byte and the next byte.
+             *
+             * if the source vector has no more bytes remaining
+             * (we are at the last byte), we copy the last piece as a byte
+             * by shifting data from the current byte.
+             */
+            if((firstByteSource + byteSizeTarget - 1) < lastByteSource) {
+               byte b1 = getBitsFromCurrentByte(this.validityBuffer,
+                       firstByteSource + byteSizeTarget - 1, offset);
+               byte b2 = getBitsFromNextByte(this.validityBuffer,
+                       firstByteSource + byteSizeTarget, offset);
+
+               target.validityBuffer.setByte(byteSizeTarget - 1, b1 + b2);
+            }
+            else {
+               byte b1 = getBitsFromCurrentByte(this.validityBuffer,
+                       firstByteSource + byteSizeTarget - 1, offset);
+               target.validityBuffer.setByte(byteSizeTarget - 1, b1);
+            }
+         }
+      }
+   }
+
+   private static byte getBitsFromCurrentByte(ArrowBuf data, int index, int offset) {
+      return (byte)((data.getByte(index) & 0xFF) >>> offset);
+   }
+
+   private static byte getBitsFromNextByte(ArrowBuf data, int index, int offset) {
+      return (byte)((data.getByte(index) << (8 - offset)));
+   }
+
+
+   /******************************************************************
+    *                                                                *
+    *          common getters and setters                            *
+    *                                                                *
+    ******************************************************************/
+
+
+   /**
+    * Get the number of elements that are null in the vector
+    *
+    * @return the number of null elements.
+    */
+   public int getNullCount() {
+      int count = 0;
+      final int sizeInBytes = getSizeFromCount(valueCount);
+
+      for (int i = 0; i < sizeInBytes; ++i) {
+         final byte byteValue = validityBuffer.getByte(i);
+         /* Java uses two's complement binary representation, hence 11111111_b which is -1
+          * when converted to Int will have 32bits set to 1. Masking the MSB and then
+          * adding it back solves the issue.
+          */
+         count += Integer.bitCount(byteValue & 0x7F) - (byteValue >> 7);
+      }
+      int nullCount = (sizeInBytes * 8) - count;
+      /* if the valueCount is not a multiple of 8,
+       * the bits on the right were counted as null bits.
+       */
+      int remainder = valueCount % 8;
+      nullCount -= remainder == 0 ? 0 : 8 - remainder;
+      return nullCount;
+   }
+
+
+   /**
+    * Get the value count of vector. This will always be zero unless
+    * {@link #setValueCount(int)} has been called prior to calling this.
+    *
+    * @return valueCount for the vector
+    */
+   public int getValueCount(){
+      return valueCount;
+   }
+
+
+   /**
+    * Set value count for the vector.
+    *
+    * @param valueCount  value count to set
+    */
+   public void setValueCount(int valueCount) {
+      this.valueCount = valueCount;
+      final int currentValueCapacity = getValueCapacity();
+      while (valueCount > getValueCapacity()) {
+         reAlloc();
+      }
+      /*
+       * We are trying to understand the pattern of memory allocation.
+       * If initially, the user did vector.allocateNew(), we would have
+       * allocated memory of default size (4096 * type width).
+       * Later on user invokes setValueCount(count).
+       *
+       * If the existing value capacity is twice as large as the
+       * valueCount, we know that we over-provisioned memory in the
+       * first place when default memory allocation was done because user
+       * really needs a much less value count in the vector.
+       *
+       * We record this by bumping up the allocationMonitor. If this pattern
+       * happens for certain number of times and allocationMonitor
+       * reaches the threshold (internal hardcoded) value, subsequent
+       * call to allocateNew() will take care of stepping down the
+       * default memory allocation size.
+       *
+       * Another case would be under-provisioning the initial memory and
+       * thus going through a lot of realloc(). Here the goal is to
+       * see if we can minimize the number of reallocations. Again the
+       * state is recorded in allocationMonitor by decrementing it
+       * (negative value). If a threshold is hit, realloc will try to
+       * allocate more memory in order to possibly avoid a future realloc.
+       * This case is also applicable to setSafe() methods which can trigger
+       * a realloc() and thus we record the state there as well.
+       */
+      if (valueCount > 0) {
+         if (currentValueCapacity >= (valueCount * 2)) {
+            incrementAllocationMonitor();
+         } else if (currentValueCapacity <= (valueCount/2)) {
+            decrementAllocationMonitor();
+         }
+      }
+   }
+
+
+   /**
+    * Check if the given index is within the current value capacity
+    * of the vector
+    *
+    * @param index  position to check
+    * @return true if index is within the current value capacity
+    */
+   public boolean isSafe(int index) {
+      return index < getValueCapacity();
+   }
+
+
+   /**
+    * Check if element at given index is null.
+    *
+    * @param index  position of element
+    * @return true if element at given index is null, false otherwise
+    */
+   public boolean isNull(int index) {
+      return (isSet(index) == 0);
+   }
+
+
+   /**
+    * Same as {@link #isNull(int)}.
+    *
+    * @param index  position of element
+    * @return 1 if element at given index is not null, 0 otherwise
+    */
+   public int isSet(int index) {
+      final int byteIndex = index >> 3;
+      final byte b = validityBuffer.getByte(byteIndex);
+      final int bitIndex = index & 7;
+      return Long.bitCount(b & (1L << bitIndex));
+   }
+
+   public void setIndexDefined(int index) {
+      handleSafe(index);
+      BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+   }
+
+   public void set(int index, byte[] value, int start, int length) {
+      throw new UnsupportedOperationException();
+   }
+
+   public void setSafe(int index, byte[] value, int start, int length) {
+      throw new UnsupportedOperationException();
+   }
+
+   public void set(int index, ByteBuffer value, int start, int length) {
+      throw new UnsupportedOperationException();
+   }
+
+   public void setSafe(int index, ByteBuffer value, int start, int length) {
+      throw new UnsupportedOperationException();
+   }
+
+
+   /******************************************************************
+    *                                                                *
+    *                helper methods for setters                      *
+    *                                                                *
+    ******************************************************************/
+
+
+
+   protected void handleSafe(int index) {
+      while (index >= getValueCapacity()) {
+         decrementAllocationMonitor();
+         reAlloc();
+      }
+   }
+}
\ No newline at end of file
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java
new file mode 100644
index 0000000..a79709d
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java
@@ -0,0 +1,764 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector;
+
+
+import io.netty.buffer.ArrowBuf;
+
+import org.apache.arrow.memory.OutOfMemoryException;
+import org.apache.arrow.memory.BaseAllocator;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.util.CallBack;
+import org.apache.arrow.vector.util.OversizedAllocationException;
+import org.apache.arrow.vector.util.TransferPair;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class BaseNullableVariableWidthVector extends BaseValueVector
+        implements VariableWidthVector, FieldVector {
+   private static final int DEFAULT_RECORD_BYTE_COUNT = 8;
+   private static final int INITIAL_BYTE_COUNT = INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT;
+
+   private int valueAllocationSizeInBytes;
+   private int validityAllocationSizeInBytes;
+   private int offsetAllocationSizeInBytes;
+
+   /* protected members */
+   protected static final int OFFSET_WIDTH = 4; /* 4 byte unsigned int to track offsets */
+   protected static final byte[] emptyByteArray = new byte[]{};
+   protected ArrowBuf validityBuffer;
+   protected ArrowBuf valueBuffer;
+   protected ArrowBuf offsetBuffer;
+   protected int valueCount;
+   protected int lastSet;
+   protected final Field field;
+   private boolean cleared;
+
+   public BaseNullableVariableWidthVector(final String name, final BufferAllocator allocator,
+                                          FieldType fieldType) {
+      super(name, allocator);
+      valueAllocationSizeInBytes = INITIAL_BYTE_COUNT;
+      validityAllocationSizeInBytes = getSizeFromCount(INITIAL_VALUE_ALLOCATION);
+      offsetAllocationSizeInBytes = (INITIAL_VALUE_ALLOCATION) * OFFSET_WIDTH;
+      field = new Field(name, fieldType, null);
+      valueCount = 0;
+      lastSet = -1;
+      offsetBuffer = allocator.getEmpty();
+      validityBuffer = allocator.getEmpty();
+      valueBuffer = allocator.getEmpty();
+      cleared = false;
+   }
+
+  /* TODO:
+    * Determine how writerIndex and readerIndex need to be used. Right now we
+    * are setting the writerIndex and readerIndex in the call to getFieldBuffers
+    * using the valueCount -- this assumes that the caller of getFieldBuffers
+    * on the vector has already invoked setValueCount.
+    *
+    * Do we need to set them during vector transfer and splitAndTransfer?
+    */
+
+   /* TODO:
+    *
+    * see if getNullCount() can be made faster -- O(1)
+    */
+
+   /* TODO:
+    * Once the entire hierarchy has been refactored, move common functions
+    * like getNullCount(), splitAndTransferValidityBuffer to top level
+    * base class BaseValueVector.
+    *
+    * Along with this, some class members (validityBuffer) can also be
+    * abstracted out to top level base class.
+    *
+    * Right now BaseValueVector is the top level base class for other
+    * vector types in ValueVector hierarchy and those vectors have not
+    * yet been refactored so moving things to the top class as of now
+    * is not a good idea.
+    */
+
+   /* TODO:
+    * See if we need logger -- unnecessary object probably
+    */
+
+   /* TODO:
+    * Implement getBufferSize(), getCurrentSizeInBytes().
+    */
+
+   protected abstract org.slf4j.Logger getLogger();
+
+   public VariableWidthMutator getMutator() {
+      throw new  UnsupportedOperationException("Mutator is not needed to write into vector");
+   }
+
+   public VariableWidthAccessor getAccessor() {
+      throw new UnsupportedOperationException("Accessor is not needed to read from vector");
+   }
+
+   @Override
+   public ArrowBuf getValidityBuffer() {
+      return validityBuffer;
+   }
+
+   @Override
+   public ArrowBuf getDataBuffer() {
+      return valueBuffer;
+   }
+
+   @Override
+   public ArrowBuf getOffsetBuffer() {
+      return offsetBuffer;
+   }
+
+   @Override
+   public long getOffsetBufferAddress() {
+      return offsetBuffer.memoryAddress();
+   }
+
+   @Override
+   public long getValidityBufferAddress() {
+      return validityBuffer.memoryAddress();
+   }
+
+   @Override
+   public long getDataBufferAddress() {
+      return valueBuffer.memoryAddress();
+   }
+
+   @Override
+   public void setInitialCapacity(int valueCount) {
+      final long size = (long)valueCount * DEFAULT_RECORD_BYTE_COUNT;
+      if (size > MAX_ALLOCATION_SIZE) {
+         throw new OversizedAllocationException("Requested amount of memory is more than max allowed");
+      }
+      valueAllocationSizeInBytes = (int)size;
+      validityAllocationSizeInBytes = getSizeFromCount(valueCount);
+      /* to track the end offset of last data element in vector, we need
+       * an additional slot in offset buffer.
+       */
+      offsetAllocationSizeInBytes = (valueCount + 1) * OFFSET_WIDTH;
+   }
+
+   @Override
+   public int getValueCapacity(){
+      final int offsetValueCapacity = Math.max(getOffsetBufferValueCapacity() - 1, 0);
+      return Math.min(offsetValueCapacity, getValidityBufferValueCapacity());
+   }
+
+   /* for test purposes */
+   private int getValidityBufferValueCapacity() {
+      return (int)(validityBuffer.capacity() * 8L);
+   }
+
+   /* for test purposes */
+   private int getOffsetBufferValueCapacity() {
+      return (int)((offsetBuffer.capacity() * 1.0)/OFFSET_WIDTH);
+   }
+
+   /* number of bytes for the validity buffer for a given valueCount */
+   protected int getSizeFromCount(int valueCount) {
+      return (int) Math.ceil(valueCount / 8.0);
+   }
+
+   public void zeroVector() {
+      initValidityBuffer();
+      initOffsetBuffer();
+   }
+
+   private void initValidityBuffer() {
+      validityBuffer.setZero(0, validityBuffer.capacity());
+   }
+
+   private void initOffsetBuffer() {
+      offsetBuffer.setZero(0, offsetBuffer.capacity());
+   }
+
+   public void reset() {
+      zeroVector();
+      lastSet = -1;
+   }
+
+   @Override
+   public void close() {
+      clear();
+   }
+
+   @Override
+   public void clear() {
+      validityBuffer = releaseBuffer(validityBuffer);
+      valueBuffer = releaseBuffer(valueBuffer);
+      offsetBuffer = releaseBuffer(offsetBuffer);
+      cleared = true;
+      lastSet = -1;
+      valueCount = 0;
+   }
+
+   @Override
+   public List<BufferBacked> getFieldInnerVectors() { throw new UnsupportedOperationException(); }
+
+   @Override
+   public void initializeChildrenFromFields(List<Field> children) {
+      if (!children.isEmpty()) {
+         throw new IllegalArgumentException("primitive type vector can not have children");
+      }
+   }
+
+   @Override
+   public List<FieldVector> getChildrenFromFields() {
+      return Collections.emptyList();
+   }
+
+   @Override
+   public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
+      ArrowBuf bitBuffer = ownBuffers.get(0);
+      ArrowBuf offBuffer = ownBuffers.get(1);
+      ArrowBuf dataBuffer = ownBuffers.get(2);
+
+      validityBuffer.release();
+      validityBuffer = bitBuffer.retain(allocator);
+      offsetBuffer.release();
+      offsetBuffer = offBuffer.retain(allocator);
+      valueBuffer.release();
+      valueBuffer = dataBuffer.retain(allocator);
+
+      lastSet = fieldNode.getLength() - 1;
+      valueCount = fieldNode.getLength();
+   }
+
+   public List<ArrowBuf> getFieldBuffers() {
+      List<ArrowBuf> result = new ArrayList<>(3);
+      final int lastDataOffset = getstartOffset(valueCount);
+      validityBuffer.readerIndex(0);
+      validityBuffer.writerIndex(getSizeFromCount(valueCount));
+      offsetBuffer.readerIndex(0);
+      offsetBuffer.writerIndex((valueCount + 1) * OFFSET_WIDTH);
+      valueBuffer.readerIndex(0);
+      valueBuffer.writerIndex(lastDataOffset);
+
+      result.add(validityBuffer);
+      result.add(offsetBuffer);
+      result.add(valueBuffer);
+
+      return result;
+   }
+
+   @Override
+   public void allocateNew() {
+      if(!allocateNewSafe()){
+         throw new OutOfMemoryException("Failure while allocating memory.");
+      }
+   }
+
+   @Override
+   public boolean allocateNewSafe() {
+      long curAllocationSizeValue = valueAllocationSizeInBytes;
+      long curAllocationSizeValidity = validityAllocationSizeInBytes;
+      long curAllocationSizeOffset = offsetAllocationSizeInBytes;
+
+      if (curAllocationSizeValue > MAX_ALLOCATION_SIZE ||
+              curAllocationSizeOffset > MAX_ALLOCATION_SIZE) {
+         throw new OversizedAllocationException("Requested amount of memory exceeds limit");
+      }
+
+      /* we are doing a new allocation -- release the current buffers */
+      clear();
+
+      try {
+         allocateBytes(curAllocationSizeValue, curAllocationSizeValidity, curAllocationSizeOffset);
+      } catch (Exception e) {
+         getLogger().error("ERROR: Failure in allocateNewSafe");
+         getLogger().error(e.getMessage());
+         clear();
+         return false;
+      }
+
+      return true;
+   }
+
+   @Override
+   public void allocateNew(int totalBytes, int valueCount) {
+      assert totalBytes >= 0;
+      final int offsetBufferSize = (valueCount + 1) * OFFSET_WIDTH;
+      final int validityBufferSize = getSizeFromCount(valueCount);
+
+      if (totalBytes > MAX_ALLOCATION_SIZE ||
+              offsetBufferSize > MAX_ALLOCATION_SIZE) {
+         throw new OversizedAllocationException("Requested amount of memory exceeds limit");
+      }
+
+      /* we are doing a new allocation -- release the current buffers */
+      clear();
+
+      try {
+         allocateBytes(totalBytes, validityBufferSize, offsetBufferSize);
+      } catch (Exception e) {
+         getLogger().error("ERROR: Failure in allocateNewSafe");
+         getLogger().error(e.getMessage());
+         clear();
+      }
+   }
+
+   private void allocateBytes(final long valueBufferSize, final long validityBufferSize,
+                              final long offsetBufferSize) {
+      /* allocate data buffer */
+      int curSize = (int)valueBufferSize;
+      valueBuffer = allocator.buffer(curSize);
+      valueBuffer.readerIndex(0);
+      valueAllocationSizeInBytes = curSize;
+      allocateValidityBuffer(validityBufferSize);
+      allocateOffsetBuffer(offsetBufferSize);
+   }
+
+   private void allocateOffsetBuffer(final long size) {
+      final int curSize = (int)size;
+      offsetBuffer = allocator.buffer(curSize);
+      offsetBuffer.readerIndex(0);
+      offsetAllocationSizeInBytes = curSize;
+      initOffsetBuffer();
+   }
+
+   private void allocateValidityBuffer(final long size) {
+      final int curSize = (int)size;
+      validityBuffer = allocator.buffer(curSize);
+      validityBuffer.readerIndex(0);
+      validityAllocationSizeInBytes = curSize;
+      initValidityBuffer();
+   }
+
+   public void reAlloc() {
+      reallocValueBuffer();
+      reallocValidityAndOffsetBuffers();
+   }
+
+   protected void reallocValueBuffer() {
+      long baseSize = valueAllocationSizeInBytes;
+      final int currentBufferCapacity = valueBuffer.capacity();
+
+      if (baseSize < (long)currentBufferCapacity) {
+         baseSize = (long)currentBufferCapacity;
+      }
+
+      long newAllocationSize = baseSize * 2L;
+      newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);
+
+      if (newAllocationSize > MAX_ALLOCATION_SIZE)  {
+         throw new OversizedAllocationException("Unable to expand the buffer");
+      }
+
+      final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize);
+      newBuf.setBytes(0, valueBuffer, 0, currentBufferCapacity);
+      valueBuffer.release();
+      valueBuffer = newBuf;
+      valueAllocationSizeInBytes = (int)newAllocationSize;
+   }
+
+   protected void reallocValidityAndOffsetBuffers() {
+      offsetBuffer = reallocBufferHelper(offsetBuffer, true);
+      validityBuffer = reallocBufferHelper(validityBuffer, false);
+   }
+
+   /* need to refactor this to keep the logic in an single place and make callers
+    * more intelligent. see handleSafe() for more comments on realloc
+    */
+
+   private ArrowBuf reallocBufferHelper(ArrowBuf buffer, final boolean offsetBuffer) {
+      final int currentBufferCapacity = buffer.capacity();
+      long baseSize  = (offsetBuffer ? offsetAllocationSizeInBytes
+              : validityAllocationSizeInBytes);
+
+      if (baseSize < (long)currentBufferCapacity) {
+         baseSize = (long)currentBufferCapacity;
+      }
+
+      long newAllocationSize = baseSize * 2L;
+      newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);
+
+      if (newAllocationSize > MAX_ALLOCATION_SIZE) {
+         throw new OversizedAllocationException("Unable to expand the buffer");
+      }
+
+      getLogger().debug("Reallocating vector [{}]. # of bytes: [{}] -> [{}]",
+              name, (offsetBuffer ? offsetAllocationSizeInBytes : validityAllocationSizeInBytes),
+              newAllocationSize);
+
+      final ArrowBuf newBuf = allocator.buffer((int)newAllocationSize);
+      newBuf.setBytes(0, buffer, 0, currentBufferCapacity);
+      final int halfNewCapacity = newBuf.capacity() / 2;
+      newBuf.setZero(halfNewCapacity, halfNewCapacity);
+      buffer.release(1);
+      buffer = newBuf;
+      if (offsetBuffer) {
+         offsetAllocationSizeInBytes = (int)newAllocationSize;
+      }
+      else {
+         validityAllocationSizeInBytes = (int)newAllocationSize;
+      }
+
+      return buffer;
+   }
+
+   @Override
+   public int getByteCapacity(){
+      return valueBuffer.capacity();
+   }
+
+   @Override
+   public int getCurrentSizeInBytes(){
+      /* TODO */
+      return 0;
+   }
+
+   @Override
+   public int getBufferSize() {
+      /* TODO */
+      return 0;
+   }
+
+   @Override
+   public int getBufferSizeFor(final int valueCount) {
+      if (valueCount == 0) {
+         return 0;
+      }
+
+      final int validityBufferSize = getSizeFromCount(valueCount);
+      final int offsetBufferSize = (valueCount + 1) * OFFSET_WIDTH;
+      /* get the end offset for this valueCount */
+      final int dataBufferSize = offsetBuffer.getInt(valueCount * OFFSET_WIDTH);
+      return validityBufferSize + offsetBufferSize + dataBufferSize;
+   }
+
+   @Override
+   public Field getField() {
+      return field;
+   }
+
+   @Override
+   public ArrowBuf[] getBuffers(boolean clear) {
+      final ArrowBuf[] buffers = new ArrowBuf[3];
+      buffers[0] = validityBuffer;
+      buffers[1] = offsetBuffer;
+      buffers[1] = valueBuffer;
+      if (clear) {
+         for (final ArrowBuf buffer:buffers) {
+            buffer.retain(1);
+         }
+         clear();
+      }
+      return buffers;
+   }
+
+   @Override
+   public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
+      return getTransferPair(ref, allocator);
+   }
+
+   @Override
+   public TransferPair getTransferPair(BufferAllocator allocator){
+      return getTransferPair(name, allocator);
+   }
+
+   public abstract TransferPair getTransferPair(String ref, BufferAllocator allocator);
+
+   public void transferTo(BaseNullableVariableWidthVector target){
+      compareTypes(target, "transferTo");
+      target.clear();
+      target.validityBuffer = validityBuffer.transferOwnership(target.allocator).buffer;
+      target.valueBuffer = valueBuffer.transferOwnership(target.allocator).buffer;
+      target.offsetBuffer = offsetBuffer.transferOwnership(target.allocator).buffer;
+      target.valueCount = valueCount;
+      target.setLastSet(lastSet);
+      clear();
+   }
+
+   public void splitAndTransferTo(int startIndex, int length,
+                                  BaseNullableVariableWidthVector target) {
+      compareTypes(target, "splitAndTransferTo");
+      target.clear();
+      splitAndTransferValidityBuffer(startIndex, length, target);
+      splitAndTransferOffsetBuffer(startIndex, length, target);
+      target.setLastSet(length - 1);
+      target.setValueCount(length);
+   }
+
+   /*
+    * transfer the offsets along with data
+    */
+   private void splitAndTransferOffsetBuffer(int startIndex, int length, BaseNullableVariableWidthVector target) {
+      final int start = offsetBuffer.getInt(startIndex * OFFSET_WIDTH);
+      final int end = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH);
+      final int dataLength = end - start;
+      target.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
+      for (int i = 0; i < length + 1; i++) {
+         final int relativeSourceOffset = offsetBuffer.getInt((startIndex + i) * OFFSET_WIDTH) - start;
+         target.offsetBuffer.setInt(i * OFFSET_WIDTH, relativeSourceOffset);
+      }
+      target.valueBuffer = valueBuffer.slice(start, dataLength).transferOwnership(target.allocator).buffer;
+   }
+
+   /*
+    * transfer the validity.
+    */
+   private void splitAndTransferValidityBuffer(int startIndex, int length,
+                                               BaseNullableVariableWidthVector target) {
+      assert startIndex + length <= valueCount;
+      int firstByteSource = BitVectorHelper.byteIndex(startIndex);
+      int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1);
+      int byteSizeTarget = getSizeFromCount(length);
+      int offset = startIndex % 8;
+
+      if (length > 0) {
+         if (offset == 0) {
+            // slice
+            if (target.validityBuffer != null) {
+               target.validityBuffer.release();
+            }
+            target.validityBuffer = validityBuffer.slice(firstByteSource, byteSizeTarget);
+            target.validityBuffer.retain(1);
+         }
+         else {
+            /* Copy data
+             * When the first bit starts from the middle of a byte (offset != 0),
+             * copy data from src BitVector.
+             * Each byte in the target is composed by a part in i-th byte,
+             * another part in (i+1)-th byte.
+             */
+            target.allocateValidityBuffer(byteSizeTarget);
+
+            for (int i = 0; i < byteSizeTarget - 1; i++) {
+               byte b1 = getBitsFromCurrentByte(this.validityBuffer, firstByteSource + i, offset);
+               byte b2 = getBitsFromNextByte(this.validityBuffer, firstByteSource + i + 1, offset);
+
+               target.validityBuffer.setByte(i, (b1 + b2));
+            }
+
+            /* Copying the last piece is done in the following manner:
+             * if the source vector has 1 or more bytes remaining, we copy
+             * the last piece as a byte formed by shifting data
+             * from the current byte and the next byte.
+             *
+             * if the source vector has no more bytes remaining
+             * (we are at the last byte), we copy the last piece as a byte
+             * by shifting data from the current byte.
+             */
+            if((firstByteSource + byteSizeTarget - 1) < lastByteSource) {
+               byte b1 = getBitsFromCurrentByte(this.validityBuffer,
+                       firstByteSource + byteSizeTarget - 1, offset);
+               byte b2 = getBitsFromNextByte(this.validityBuffer,
+                       firstByteSource + byteSizeTarget, offset);
+
+               target.validityBuffer.setByte(byteSizeTarget - 1, b1 + b2);
+            }
+            else {
+               byte b1 = getBitsFromCurrentByte(this.validityBuffer,
+                       firstByteSource + byteSizeTarget - 1, offset);
+               target.validityBuffer.setByte(byteSizeTarget - 1, b1);
+            }
+         }
+      }
+   }
+
+   private static byte getBitsFromCurrentByte(ArrowBuf data, int index, int offset) {
+      return (byte)((data.getByte(index) & 0xFF) >>> offset);
+   }
+
+   private static byte getBitsFromNextByte(ArrowBuf data, int index, int offset) {
+      return (byte)((data.getByte(index) << (8 - offset)));
+   }
+
+
+   /******************************************************************
+    *                                                                *
+    *                common getters and setters                      *
+    *                                                                *
+    ******************************************************************/
+
+
+   /**
+    * Get the number of elements that are null in the vector
+    *
+    * @return the number of null elements.
+    */
+   public int getNullCount() {
+      int count = 0;
+      final int sizeInBytes = getSizeFromCount(valueCount);
+
+      for (int i = 0; i < sizeInBytes; ++i) {
+         final byte byteValue = validityBuffer.getByte(i);
+         /* Java uses two's complement binary representation, hence 11111111_b which is -1
+          * when converted to Int will have 32bits set to 1. Masking the MSB and then
+          * adding it back solves the issue.
+          */
+         count += Integer.bitCount(byteValue & 0x7F) - (byteValue >> 7);
+      }
+      int nullCount = (sizeInBytes * 8) - count;
+      /* if the valueCount is not a multiple of 8,
+       * the bits on the right were counted as null bits.
+       */
+      int remainder = valueCount % 8;
+      nullCount -= remainder == 0 ? 0 : 8 - remainder;
+      return nullCount;
+   }
+
+   /**
+    * Check if the given index is within the current value capacity
+    * of the vector
+    *
+    * @param index  position to check
+    * @return true if index is within the current value capacity
+    */
+   public boolean isSafe(int index) {
+      return index < getValueCapacity();
+   }
+
+   /**
+    * Check if element at given index is null.
+    *
+    * @param index  position of element
+    * @return true if element at given index is null
+    */
+   public boolean isNull(int index) {
+      return (isSet(index) == 0);
+   }
+
+   /**
+    * Same as {@link #isNull(int)}.
+    *
+    * @param index  position of element
+    * @return 1 if element at given index is not null, 0 otherwise
+    */
+   public int isSet(int index) {
+      final int byteIndex = index >> 3;
+      final byte b = validityBuffer.getByte(byteIndex);
+      final int bitIndex = index & 7;
+      return Long.bitCount(b & (1L << bitIndex));
+   }
+
+   /**
+    * Get the value count of vector. This will always be zero unless
+    * setValueCount(int) has been called prior to calling this.
+    *
+    * @return valueCount for the vector
+    */
+   public int getValueCount(){
+      return valueCount;
+   }
+
+   /**
+    * Sets the value count for the vector
+    *
+    * @param valueCount   value count
+    */
+   public void setValueCount(int valueCount) {
+      assert valueCount >= 0;
+      this.valueCount = valueCount;
+      while (valueCount > getValueCapacity()) {
+         reallocValidityAndOffsetBuffers();
+      }
+      fillHoles(valueCount);
+      lastSet = valueCount - 1;
+   }
+
+   public void fillEmpties(int index) {
+      handleSafe(index, emptyByteArray.length);
+      fillHoles(index);
+      lastSet = index - 1;
+   }
+
+   public void setLastSet(int value) {
+      lastSet = value;
+   }
+
+   public int getLastSet() {
+      return lastSet;
+   }
+
+   public long getStartEnd(int index) {
+      return (long)offsetBuffer.getInt(index * OFFSET_WIDTH);
+   }
+
+   public void setIndexDefined(int index) {
+      handleSafe(index, 0);
+      BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+   }
+
+
+   /******************************************************************
+    *                                                                *
+    *                helper methods for setters                      *
+    *                                                                *
+    ******************************************************************/
+
+
+   protected final void fillHoles(int index) {
+      for (int i = lastSet + 1; i < index; i++) {
+         setBytes(i, emptyByteArray, 0, emptyByteArray.length);
+      }
+      lastSet = index - 1;
+   }
+
+   protected final void setBytes(int index, byte[] value, int start, int length) {
+      /* end offset of current last element in the vector. this will
+       * be the start offset of new element we are trying to store.
+       */
+      final int startOffset = getstartOffset(index);
+      /* set new end offset */
+      offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + length);
+      /* store the var length data in value buffer */
+      valueBuffer.setBytes(startOffset, value, start, length);
+   }
+
+   protected final int getstartOffset(int index) {
+      return offsetBuffer.getInt(index * OFFSET_WIDTH);
+   }
+
+   protected final void handleSafe(int index, int dataLength) {
+      /*
+       * IMPORTANT:
+       * value buffer for variable length vectors moves independent
+       * of the companion validity and offset buffers. This is in
+       * contrast to what we have for fixed width vectors.
+       *
+       * Here there is no concept of getValueCapacity() in the
+       * data stream. getValueCapacity() is applicable only to validity
+       * and offset buffers.
+       *
+       * So even though we may have setup an initial capacity of 1024
+       * elements in the vector, it is quite possible
+       * that we need to reAlloc() the data buffer when we are setting
+       * the 5th element in the vector simply because previous
+       * variable length elements have exhausted the buffer capacity.
+       * However, we really don't need to reAlloc() validity and
+       * offset buffers until we try to set the 1025th element
+       * This is why we do a separate check for safe methods to
+       * determine which buffer needs reallocation.
+       */
+      while (index >= getValueCapacity()) {
+         reallocValidityAndOffsetBuffers();
+      }
+      final int startOffset = getstartOffset(index);
+      while (valueBuffer.capacity() < (startOffset + dataLength)) {
+         reallocValueBuffer();
+      }
+   }
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java
index 598e578..1db9624 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java
@@ -18,8 +18,10 @@
 
 package org.apache.arrow.vector;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 
 import com.google.flatbuffers.FlatBufferBuilder;
 import org.apache.arrow.memory.BufferAllocator;
@@ -122,5 +124,23 @@ public abstract class BaseValueVector implements ValueVector {
   public BufferAllocator getAllocator() {
     return allocator;
   }
+
+  protected void compareTypes(BaseValueVector target, String caller) {
+    if (this.getMinorType() != target.getMinorType()) {
+      throw new UnsupportedOperationException(caller + " should have vectors of exact same type");
+    }
+  }
+
+  protected ArrowBuf releaseBuffer(ArrowBuf buffer) {
+    buffer.release();
+    buffer = allocator.getEmpty();
+    return buffer;
+  }
+
+  public int getValueCount() { return 0; }
+
+  public void setValueCount(int valueCount) { }
+
+  public Object getObject(int index) { return null; }
 }
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java
new file mode 100644
index 0000000..2439bd2
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector;
+
+import io.netty.buffer.ArrowBuf;
+
+class BitVectorHelper {
+
+   /**
+    * Get the index of byte corresponding to bit index in validity buffer
+    */
+   protected static int byteIndex(int absoluteBitIndex) {
+      return absoluteBitIndex >> 3;
+   }
+
+   /**
+    * Get the relative index of bit within the byte in validity buffer
+    */
+   private static int bitIndex(int absoluteBitIndex) {
+      return absoluteBitIndex & 7;
+   }
+
+   protected static void setValidityBitToOne(ArrowBuf validityBuffer, int index) {
+      final int byteIndex = byteIndex(index);
+      final int bitIndex = bitIndex(index);
+      byte currentByte = validityBuffer.getByte(byteIndex);
+      final byte bitMask = (byte) (1L << bitIndex);
+      currentByte |= bitMask;
+      validityBuffer.setByte(byteIndex, currentByte);
+   }
+
+   protected static void setValidityBit(ArrowBuf validityBuffer, int index, int value) {
+      final int byteIndex = byteIndex(index);
+      final int bitIndex = bitIndex(index);
+      byte currentByte = validityBuffer.getByte(byteIndex);
+      final byte bitMask = (byte) (1L << bitIndex);
+      if (value != 0) {
+         currentByte |= bitMask;
+      } else {
+         currentByte -= (bitMask & currentByte);
+      }
+      validityBuffer.setByte(byteIndex, currentByte);
+   }
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/NullableIntVector.java b/java/vector/src/main/java/org/apache/arrow/vector/NullableIntVector.java
new file mode 100644
index 0000000..26b19fa
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/NullableIntVector.java
@@ -0,0 +1,299 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.arrow.vector;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.complex.impl.IntReaderImpl;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.holders.IntHolder;
+import org.apache.arrow.vector.holders.NullableIntHolder;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.util.TransferPair;
+import org.slf4j.Logger;
+
+/**
+ * NullableIntVector implements a fixed width vector of values which could
+ * be null. A validity buffer (bit vector) is maintained to track which
+ * elements in the vector are null.
+ */
+public class NullableIntVector extends BaseNullableFixedWidthVector {
+   private static final org.slf4j.Logger logger =
+           org.slf4j.LoggerFactory.getLogger(NullableIntVector.class);
+   private static final byte TYPE_WIDTH = 4;
+   private final FieldReader reader;
+
+   public NullableIntVector(String name, BufferAllocator allocator) {
+      this(name, FieldType.nullable(org.apache.arrow.vector.types.Types.MinorType.INT.getType()),
+              allocator);
+   }
+
+   public NullableIntVector(String name, FieldType fieldType, BufferAllocator allocator) {
+      super(name, allocator, fieldType, TYPE_WIDTH);
+      reader = new IntReaderImpl(NullableIntVector.this);
+   }
+
+   @Override
+   protected org.slf4j.Logger getLogger() {
+      return logger;
+   }
+
+   @Override
+   public FieldReader getReader(){
+      return reader;
+   }
+
+   @Override
+   public Types.MinorType getMinorType() {
+      return Types.MinorType.INT;
+   }
+
+
+   /******************************************************************
+    *                                                                *
+    *          vector value retrieval methods                        *
+    *                                                                *
+    ******************************************************************/
+
+
+   /**
+    * Get the element at the given index from the vector.
+    *
+    * @param index   position of element
+    * @return element at given index
+    */
+   public int get(int index) throws IllegalStateException {
+      if(isSet(index) == 0) {
+         throw new IllegalStateException("Value at index is null");
+      }
+      return valueBuffer.getInt(index * TYPE_WIDTH);
+   }
+
+   /**
+    * Get the element at the given index from the vector and
+    * sets the state in holder. If element at given index
+    * is null, holder.isSet will be zero.
+    *
+    * @param index   position of element
+    */
+   public void get(int index, NullableIntHolder holder){
+      if(isSet(index) == 0) {
+         holder.isSet = 0;
+         return;
+      }
+      holder.isSet = 1;
+      holder.value = valueBuffer.getInt(index * TYPE_WIDTH);
+   }
+
+   /**
+    * Same as {@link #get(int)}.
+    *
+    * @param index   position of element
+    * @return element at given index
+    */
+   public Integer getObject(int index) {
+      if (isSet(index) == 0) {
+         return null;
+      } else {
+         return get(index);
+      }
+   }
+
+   public void copyFrom(int fromIndex, int thisIndex, NullableIntVector from) {
+      if (from.isSet(fromIndex) != 0) {
+         set(thisIndex, from.get(fromIndex));
+      }
+   }
+
+   public void copyFromSafe(int fromIndex, int thisIndex, NullableIntVector from) {
+      handleSafe(thisIndex);
+      copyFrom(fromIndex, thisIndex, from);
+   }
+
+
+   /******************************************************************
+    *                                                                *
+    *          vector value setter methods                           *
+    *                                                                *
+    ******************************************************************/
+
+
+   private void setValue(int index, int value) {
+      valueBuffer.setInt(index * TYPE_WIDTH, value);
+   }
+
+   /**
+    * Set the element at the given index to the given value.
+    *
+    * @param index   position of element
+    * @param value   value of element
+    */
+   public void set(int index, int value) {
+      BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+      setValue(index, value);
+   }
+
+   /**
+    * Set the element at the given index to the value set in data holder.
+    * If the value in holder is not indicated as set, element in the
+    * at the given index will be null.
+    *
+    * @param index   position of element
+    * @param holder  nullable data holder for value of element
+    */
+   public void set(int index, NullableIntHolder holder) throws IllegalArgumentException {
+      if(holder.isSet < 0) {
+         throw new IllegalArgumentException();
+      }
+      else if(holder.isSet > 0) {
+         BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+         setValue(index, holder.value);
+      }
+      else {
+         BitVectorHelper.setValidityBit(validityBuffer, index, 0);
+      }
+   }
+
+   /**
+    * Set the element at the given index to the value set in data holder.
+    *
+    * @param index   position of element
+    * @param holder  data holder for value of element
+    */
+   public void set(int index, IntHolder holder){
+      BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+      setValue(index, holder.value);
+   }
+
+   /**
+    * Same as {@link #set(int, int)} except that it handles the
+    * case when index is greater than or equal to existing
+    * value capacity {@link #getValueCapacity()}.
+    *
+    * @param index   position of element
+    * @param value   value of element
+    */
+   public void setSafe(int index, int value) {
+      handleSafe(index);
+      set(index, value);
+   }
+
+   /**
+    * Same as {@link #set(int, NullableIntHolder)} except that it handles the
+    * case when index is greater than or equal to existing
+    * value capacity {@link #getValueCapacity()}.
+    *
+    * @param index   position of element
+    * @param holder  nullable data holder for value of element
+    */
+   public void setSafe(int index, NullableIntHolder holder) throws IllegalArgumentException {
+      handleSafe(index);
+      set(index, holder);
+   }
+
+   /**
+    * Same as {@link #set(int, IntHolder)} except that it handles the
+    * case when index is greater than or equal to existing
+    * value capacity {@link #getValueCapacity()}.
+    *
+    * @param index   position of element
+    * @param holder  data holder for value of element
+    */
+   public void setSafe(int index, IntHolder holder){
+      handleSafe(index);
+      set(index, holder);
+   }
+
+   /**
+    * Set the element at the given index to null.
+    *
+    * @param index   position of element
+    */
+   public void setNull(int index){
+      handleSafe(index);
+      /* not really needed to set the bit to 0 as long as
+       * the buffer always starts from 0.
+       */
+      BitVectorHelper.setValidityBit(validityBuffer, index, 0);
+   }
+
+   public void set(int index, int isSet, int valueField ) {
+      if (isSet > 0) {
+         set(index, valueField);
+      } else {
+         BitVectorHelper.setValidityBit(validityBuffer, index, 0);
+      }
+   }
+
+   public void setSafe(int index, int isSet, int valueField ) {
+      handleSafe(index);
+      set(index, isSet, valueField);
+   }
+
+
+   /******************************************************************
+    *                                                                *
+    *                      vector transfer                           *
+    *                                                                *
+    ******************************************************************/
+
+
+   @Override
+   public TransferPair getTransferPair(String ref, BufferAllocator allocator){
+      return new TransferImpl(ref, allocator);
+   }
+
+   @Override
+   public TransferPair makeTransferPair(ValueVector to) {
+      return new TransferImpl((NullableIntVector)to);
+   }
+
+   private class TransferImpl implements TransferPair {
+      NullableIntVector to;
+
+      public TransferImpl(String ref, BufferAllocator allocator){
+         to = new NullableIntVector(ref, field.getFieldType(), allocator);
+      }
+
+      public TransferImpl(NullableIntVector to){
+         this.to = to;
+      }
+
+      @Override
+      public NullableIntVector getTo(){
+         return to;
+      }
+
+      @Override
+      public void transfer(){
+         transferTo(to);
+      }
+
+      @Override
+      public void splitAndTransfer(int startIndex, int length) {
+         splitAndTransferTo(startIndex, length, to);
+      }
+
+      @Override
+      public void copyValueSafe(int fromIndex, int toIndex) {
+         to.copyFromSafe(fromIndex, toIndex, NullableIntVector.this);
+      }
+   }
+}
\ No newline at end of file
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/NullableVarCharVector.java b/java/vector/src/main/java/org/apache/arrow/vector/NullableVarCharVector.java
new file mode 100644
index 0000000..b902154
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/NullableVarCharVector.java
@@ -0,0 +1,451 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.arrow.vector;
+
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.complex.impl.VarCharReaderImpl;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.holders.VarCharHolder;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.util.Text;
+import org.apache.arrow.vector.util.TransferPair;
+
+import java.nio.ByteBuffer;
+
+public class NullableVarCharVector extends BaseNullableVariableWidthVector {
+   private static final org.slf4j.Logger logger =
+           org.slf4j.LoggerFactory.getLogger(NullableIntVector.class);
+   private final FieldReader reader;
+
+   public NullableVarCharVector(String name, BufferAllocator allocator) {
+      this(name, FieldType.nullable(org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()), allocator);
+   }
+
+   public NullableVarCharVector(String name, FieldType fieldType, BufferAllocator allocator) {
+      super(name, allocator, fieldType);
+      reader = new VarCharReaderImpl(NullableVarCharVector.this);
+   }
+
+   @Override
+   protected org.slf4j.Logger getLogger() {
+      return logger;
+   }
+
+   @Override
+   public FieldReader getReader(){
+      return reader;
+   }
+
+   @Override
+   public Types.MinorType getMinorType() {
+      return Types.MinorType.VARCHAR;
+   }
+
+
+   /******************************************************************
+    *                                                                *
+    *          vector value getter methods                           *
+    *                                                                *
+    ******************************************************************/
+
+
+   /**
+    * Get the variable length element at specified index as byte array.
+    *
+    * @param index   position of element to get
+    * @return array of bytes for non-null element, null otherwise
+    */
+   public byte[] get(int index) {
+      assert index >= 0;
+      if(isSet(index) == 0) {
+         throw new IllegalStateException("Value at index is null");
+      }
+      final int startOffset = getstartOffset(index);
+      final int dataLength =
+              offsetBuffer.getInt((index + 1) * OFFSET_WIDTH) - startOffset;
+      final byte[] result = new byte[dataLength];
+      valueBuffer.getBytes(startOffset, result, 0, dataLength);
+      return result;
+   }
+
+   /**
+    * Get the variable length element at specified index as Text.
+    *
+    * @param index   position of element to get
+    * @return Text object for non-null element, null otherwise
+    */
+   public Text getObject(int index) {
+      Text result = new Text();
+      byte[] b;
+      try {
+         b = get(index);
+      } catch (IllegalStateException e) {
+         return null;
+      }
+      result.set(b);
+      return result;
+   }
+
+   /**
+    * Get the variable length element at specified index as Text.
+    *
+    * @param index   position of element to get
+    * @return greater than 0 length for non-null element, 0 otherwise
+    */
+   public int getValueLength(int index) {
+      assert index >= 0;
+      if(isSet(index) == 0) {
+         return 0;
+      }
+      final int startOffset = getstartOffset(index);
+      final int dataLength =
+              offsetBuffer.getInt((index + 1) * OFFSET_WIDTH) - startOffset;
+      return dataLength;
+   }
+
+   /**
+    * Get the variable length element at specified index and sets the state
+    * in provided holder.
+    *
+    * @param index   position of element to get
+    * @param holder  data holder to be populated by this function
+    */
+   public void get(int index, NullableVarCharHolder holder){
+      assert index >= 0;
+      if(isSet(index) == 0) {
+         holder.isSet = 0;
+         return;
+      }
+      final int startOffset = getstartOffset(index);
+      final int dataLength =
+              offsetBuffer.getInt((index + 1) * OFFSET_WIDTH) - startOffset;
+      holder.isSet = 1;
+      holder.start = startOffset;
+      holder.end = dataLength;
+      holder.buffer = valueBuffer;
+   }
+
+
+
+   /******************************************************************
+    *                                                                *
+    *          vector value setter methods                           *
+    *                                                                *
+    ******************************************************************/
+
+
+
+   public void copyFrom(int fromIndex, int thisIndex, NullableVarCharVector from) {
+      fillHoles(thisIndex);
+      if (from.isSet(fromIndex) != 0) {
+         set(thisIndex, from.get(fromIndex));
+         lastSet = thisIndex;
+      }
+   }
+
+   public void copyFromSafe(int fromIndex, int thisIndex, NullableVarCharVector from) {
+      fillEmpties(thisIndex);
+      if (from.isSet(fromIndex) != 0) {
+         setSafe(thisIndex, from.get(fromIndex));
+         lastSet = thisIndex;
+      }
+   }
+
+
+   /**
+    * Set the variable length element at the specified index to the supplied
+    * byte array. This is same as using {@link #set(int, byte[], int, int)}
+    * with start as 0 and length as value.length
+    *
+    * @param index   position of the element to set
+    * @param value   array of bytes to write
+    */
+   public void set(int index, byte[] value) {
+      assert index >= 0;
+      fillHoles(index);
+      BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+      setBytes(index, value, 0, value.length);
+      lastSet = index;
+   }
+
+   /**
+    * Same as {@link #set(int, byte[])} except that it handles the
+    * case where index and length of new element are beyond the existing
+    * capacity of the vector.
+    *
+    * @param index   position of the element to set
+    * @param value   array of bytes to write
+    */
+   public void setSafe(int index, byte[] value) {
+      assert index >= 0;
+      fillEmpties(index);
+      handleSafe(index, value.length);
+      BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+      setBytes(index, value, 0, value.length);
+      lastSet = index;
+   }
+
+   /**
+    * Set the variable length element at the specified index to the supplied
+    * byte array.
+    *
+    * @param index   position of the element to set
+    * @param value   array of bytes to write
+    * @param start   start index in array of bytes
+    * @param length  length of data in array of bytes
+    */
+   public void set(int index, byte[] value, int start, int length) {
+      assert index >= 0;
+      fillHoles(index);
+      BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+      setBytes(index, value, start, length);
+      lastSet = index;
+   }
+
+   /**
+    * Same as {@link #set(int, byte[], int, int)} except that it handles the
+    * case where index and length of new element are beyond the existing
+    * capacity of the vector.
+    *
+    * @param index   position of the element to set
+    * @param value   array of bytes to write
+    * @param start   start index in array of bytes
+    * @param length  length of data in array of bytes
+    */
+   public void setSafe(int index, byte[] value, int start, int length) {
+      assert index >= 0;
+      fillEmpties(index);
+      handleSafe(index, length);
+      BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+      setBytes(index, value, start, length);
+      lastSet = index;
+   }
+
+   /**
+    * Set the variable length element at the specified index to the
+    * content in supplied ByteBuffer
+    *
+    * @param index   position of the element to set
+    * @param value   ByteBuffer with data
+    * @param start   start index in ByteBuffer
+    * @param length  length of data in ByteBuffer
+    */
+   public void set(int index, ByteBuffer value, int start, int length) {
+      assert index >= 0;
+      fillHoles(index);
+      BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+      final int startOffset = getstartOffset(index);
+      offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + length);
+      valueBuffer.setBytes(startOffset, value, start, length);
+      lastSet = index;
+   }
+
+   /**
+    * Same as {@link #set(int, ByteBuffer, int, int)} except that it handles the
+    * case where index and length of new element are beyond the existing
+    * capacity of the vector.
+    *
+    * @param index   position of the element to set
+    * @param value   ByteBuffer with data
+    * @param start   start index in ByteBuffer
+    * @param length  length of data in ByteBuffer
+    */
+   public void setSafe(int index, ByteBuffer value, int start, int length) {
+      assert index >= 0;
+      fillEmpties(index);
+      handleSafe(index, length);
+      BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+      final int startOffset = getstartOffset(index);
+      offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + length);
+      valueBuffer.setBytes(startOffset, value, start, length);
+      lastSet = index;
+   }
+
+   /**
+    * Set the variable length element at the specified index to the data
+    * buffer supplied in the holder
+    *
+    * @param index   position of the element to set
+    * @param holder  holder that carries data buffer.
+    */
+   public void set(int index, VarCharHolder holder) {
+      assert index >= 0;
+      fillHoles(index);
+      BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+      final int dataLength = holder.end - holder.start;
+      final int startOffset = getstartOffset(index);
+      offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + dataLength);
+      valueBuffer.setBytes(startOffset, holder.buffer, holder.start, dataLength);
+      lastSet = index;
+   }
+
+   /**
+    * Same as {@link #set(int, VarCharHolder)} except that it handles the
+    * case where index and length of new element are beyond the existing
+    * capacity of the vector.
+    *
+    * @param index   position of the element to set
+    * @param holder  holder that carries data buffer.
+    */
+   public void setSafe(int index, VarCharHolder holder) {
+      assert index >= 0;
+      final int dataLength = holder.end - holder.start;
+      fillEmpties(index);
+      handleSafe(index, dataLength);
+      BitVectorHelper.setValidityBitToOne(validityBuffer, index);
+      final int startOffset = getstartOffset(index);
+      offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + dataLength);
+      valueBuffer.setBytes(startOffset, holder.buffer, holder.start, dataLength);
+      lastSet = index;
+   }
+
+   /**
+    * Set the variable length element at the specified index to the data
+    * buffer supplied in the holder
+    *
+    * @param index   position of the element to set
+    * @param holder  holder that carries data buffer.
+    */
+   public void set(int index, NullableVarCharHolder holder) {
+      assert index >= 0;
+      fillHoles(index);
+      BitVectorHelper.setValidityBit(validityBuffer, index, holder.isSet);
+      final int dataLength = holder.end - holder.start;
+      final int startOffset = getstartOffset(index);
+      offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + dataLength);
+      valueBuffer.setBytes(startOffset, holder.buffer, holder.start, dataLength);
+      lastSet = index;
+   }
+
+   /**
+    * Same as {@link #set(int, NullableVarCharHolder)} except that it handles the
+    * case where index and length of new element are beyond the existing
+    * capacity of the vector.
+    *
+    * @param index   position of the element to set
+    * @param holder  holder that carries data buffer.
+    */
+   public void setSafe(int index, NullableVarCharHolder holder) {
+      assert index >= 0;
+      final int dataLength = holder.end - holder.start;
+      fillEmpties(index);
+      handleSafe(index, dataLength);
+      BitVectorHelper.setValidityBit(validityBuffer, index, holder.isSet);
+      final int startOffset = getstartOffset(index);
+      offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + dataLength);
+      valueBuffer.setBytes(startOffset, holder.buffer, holder.start, dataLength);
+      lastSet = index;
+   }
+
+   /**
+    * Sets the value length for an element.
+    *
+    * @param index   position of the element to set
+    * @param length  length of the element
+    */
+   public void setValueLengthSafe(int index, int length) {
+      assert index >= 0;
+      handleSafe(index, length);
+      final int startOffset = getstartOffset(index);
+      offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + length);
+   }
+
+   /**
+    * Set the element at the given index to null.
+    *
+    * @param index   position of element
+    */
+   public void setNull(int index){
+      handleSafe(index, 0);
+      /* not really needed to set the bit to 0 as long as
+       * the buffer always starts from 0.
+       */
+      BitVectorHelper.setValidityBit(validityBuffer, index, 0);
+   }
+
+   public void set(int index, int isSet, int startField, int endField, ArrowBuf bufferField ) {
+      assert index >= 0;
+      fillHoles(index);
+      BitVectorHelper.setValidityBit(validityBuffer, index, isSet);
+      final int startOffset = offsetBuffer.getInt(index * OFFSET_WIDTH);
+      offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + endField);
+      final ArrowBuf bb = bufferField.slice(startField, endField);
+      valueBuffer.setBytes(startOffset, bb);
+      lastSet = index;
+   }
+
+   public void setSafe(int index, int isSet, int startField, int endField, ArrowBuf bufferField ) {
+      assert index >= 0;
+      handleSafe(index, endField);
+      set(index, isSet, startField, endField, bufferField);
+   }
+
+
+   /******************************************************************
+    *                                                                *
+    *                      vector transfer                           *
+    *                                                                *
+    ******************************************************************/
+
+   @Override
+   public TransferPair getTransferPair(String ref, BufferAllocator allocator){
+      return new TransferImpl(ref, allocator);
+   }
+
+   @Override
+   public TransferPair makeTransferPair(ValueVector to) {
+      return new TransferImpl((NullableVarCharVector)to);
+   }
+
+   private class TransferImpl implements TransferPair {
+      NullableVarCharVector to;
+
+      public TransferImpl(String ref, BufferAllocator allocator){
+         to = new NullableVarCharVector(ref, field.getFieldType(), allocator);
+      }
+
+      public TransferImpl(NullableVarCharVector to){
+         this.to = to;
+      }
+
+      @Override
+      public NullableVarCharVector getTo(){
+         return to;
+      }
+
+      @Override
+      public void transfer(){
+         transferTo(to);
+      }
+
+      @Override
+      public void splitAndTransfer(int startIndex, int length) {
+         splitAndTransferTo(startIndex, length, to);
+      }
+
+      @Override
+      public void copyValueSafe(int fromIndex, int toIndex) {
+         to.copyFromSafe(fromIndex, toIndex, NullableVarCharVector.this);
+      }
+   }
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java
index fb7286f..e6048b4 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java
@@ -255,4 +255,10 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
    * @return buffer
    */
   public ArrowBuf getOffsetBuffer();
+
+  public int getValueCount();
+
+  public void setValueCount(int valueCount);
+
+  public Object getObject(int index);
 }
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
index f8385a7..96243eb 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
@@ -27,6 +27,8 @@ import org.apache.arrow.vector.schema.ArrowFieldNode;
 import org.apache.arrow.vector.schema.ArrowRecordBatch;
 import org.apache.arrow.vector.schema.ArrowVectorType;
 
+import javax.annotation.Nullable;
+
 public class VectorUnloader {
 
   private final VectorSchemaRoot root;
@@ -53,8 +55,17 @@ public class VectorUnloader {
   }
 
   private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
-    Accessor accessor = vector.getAccessor();
-    nodes.add(new ArrowFieldNode(accessor.getValueCount(), includeNullCount ? accessor.getNullCount() : -1));
+    Accessor accessor = null;
+    if (vector instanceof NullableIntVector) {
+      nodes.add(new ArrowFieldNode(((NullableIntVector)vector).getValueCount(),
+                includeNullCount ? ((NullableIntVector)vector).getNullCount() : -1));
+    } else if (vector instanceof NullableVarCharVector) {
+      nodes.add(new ArrowFieldNode(((NullableVarCharVector)vector).getValueCount(),
+                includeNullCount ? ((NullableVarCharVector)vector).getNullCount() : -1));
+    } else {
+      accessor = vector.getAccessor();
+      nodes.add(new ArrowFieldNode(accessor.getValueCount(), includeNullCount ? accessor.getNullCount() : -1));
+    }
     List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
     List<ArrowVectorType> expectedBuffers = vector.getField().getTypeLayout().getVectorTypes();
     if (fieldBuffers.size() != expectedBuffers.size()) {
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
index 5ac0037..fcef02f 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
@@ -259,4 +259,13 @@ public class ZeroVector implements FieldVector {
   public ArrowBuf getOffsetBuffer() {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public int getValueCount() { return 0; }
+
+  @Override
+  public void setValueCount(int valueCount) { }
+
+  @Override
+  public Object getObject(int index) { return null; }
 }
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
index b3be375..e95442a 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
@@ -34,14 +34,7 @@ import com.google.common.collect.ObjectArrays;
 import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.OutOfMemoryException;
-import org.apache.arrow.vector.AddOrGetResult;
-import org.apache.arrow.vector.BaseDataValueVector;
-import org.apache.arrow.vector.BaseValueVector;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.BufferBacked;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.ValueVector;
-import org.apache.arrow.vector.ZeroVector;
+import org.apache.arrow.vector.*;
 import org.apache.arrow.vector.complex.impl.UnionFixedSizeListReader;
 import org.apache.arrow.vector.schema.ArrowFieldNode;
 import org.apache.arrow.vector.types.Types.MinorType;
@@ -331,9 +324,15 @@ public class FixedSizeListVector extends BaseValueVector implements FieldVector,
         return null;
       }
       final List<Object> vals = new JsonStringArrayList<>(listSize);
-      final ValueVector.Accessor valuesAccessor = vector.getAccessor();
-      for (int i = 0; i < listSize; i++) {
-        vals.add(valuesAccessor.getObject(index * listSize + i));
+      if (vector instanceof NullableIntVector || vector instanceof NullableVarCharVector) {
+        for (int i = 0; i < listSize; i++) {
+          vals.add(vector.getObject(index * listSize + i));
+        }
+      } else {
+        final ValueVector.Accessor valuesAccessor = vector.getAccessor();
+        for (int i = 0; i < listSize; i++) {
+          vals.add(valuesAccessor.getObject(index * listSize + i));
+        }
       }
       return vals;
     }
@@ -367,7 +366,11 @@ public class FixedSizeListVector extends BaseValueVector implements FieldVector,
     @Override
     public void setValueCount(int valueCount) {
       bits.getMutator().setValueCount(valueCount);
-      vector.getMutator().setValueCount(valueCount * listSize);
+      if (vector instanceof  NullableIntVector || vector instanceof NullableVarCharVector) {
+        vector.setValueCount(valueCount * listSize);
+      } else {
+        vector.getMutator().setValueCount(valueCount * listSize);
+      }
     }
   }
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
index ea28a60..4b2c913 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
@@ -40,6 +40,8 @@ import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.UInt4Vector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.ZeroVector;
+import org.apache.arrow.vector.NullableIntVector;
+import org.apache.arrow.vector.NullableVarCharVector;
 import org.apache.arrow.vector.complex.impl.ComplexCopier;
 import org.apache.arrow.vector.complex.impl.UnionListReader;
 import org.apache.arrow.vector.complex.impl.UnionListWriter;
@@ -376,10 +378,18 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector,
       final UInt4Vector.Accessor offsetsAccessor = offsets.getAccessor();
       final int start = offsetsAccessor.get(index);
       final int end = offsetsAccessor.get(index + 1);
-      final ValueVector.Accessor valuesAccessor = getDataVector().getAccessor();
-      for (int i = start; i < end; i++) {
-        vals.add(valuesAccessor.getObject(i));
+      final ValueVector vv = getDataVector();
+      if (vv instanceof  NullableIntVector || vv instanceof NullableVarCharVector) {
+        for (int i = start; i < end; i++) {
+          vals.add(vv.getObject(i));
+        }
+      } else {
+        final ValueVector.Accessor valuesAccessor = vv.getAccessor();
+        for (int i = start; i < end; i++) {
+          vals.add(valuesAccessor.getObject(i));
+        }
       }
+
       return vals;
     }
 
@@ -432,7 +442,12 @@ public class ListVector extends BaseRepeatedValueVector implements FieldVector,
         offsets.getMutator().setValueCount(valueCount + 1);
       }
       final int childValueCount = valueCount == 0 ? 0 : offsets.getAccessor().get(valueCount);
-      vector.getMutator().setValueCount(childValueCount);
+      if (vector instanceof NullableIntVector || vector instanceof NullableVarCharVector) {
+        vector.setValueCount(childValueCount);
+      } else {
+        vector.getMutator().setValueCount(childValueCount);
+      }
+
       bits.getMutator().setValueCount(valueCount);
     }
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
index f46635a..95efa60 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
@@ -35,9 +35,7 @@ import com.google.common.primitives.Ints;
 import io.netty.buffer.ArrowBuf;
 
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.BaseValueVector;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.*;
 import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl;
 import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.holders.ComplexHolder;
@@ -272,10 +270,19 @@ public class MapVector extends AbstractMapVector {
       Map<String, Object> vv = new JsonStringHashMap<>();
       for (String child : getChildFieldNames()) {
         ValueVector v = getChild(child);
-        if (v != null && index < v.getAccessor().getValueCount()) {
-          Object value = v.getAccessor().getObject(index);
-          if (value != null) {
-            vv.put(child, value);
+        if (v instanceof  NullableVarCharVector || v instanceof  NullableIntVector) {
+          if (v != null && index < v.getValueCount()) {
+            Object value = v.getObject(index);
+            if (value != null) {
+              vv.put(child, value);
+            }
+          }
+        } else {
+          if (v != null && index < v.getAccessor().getValueCount()) {
+            Object value = v.getAccessor().getObject(index);
+            if (value != null) {
+              vv.put(child, value);
+            }
           }
         }
       }
@@ -302,7 +309,11 @@ public class MapVector extends AbstractMapVector {
     @Override
     public void setValueCount(int valueCount) {
       for (final ValueVector v : getChildren()) {
-        v.getMutator().setValueCount(valueCount);
+        if (v instanceof NullableIntVector || v instanceof NullableVarCharVector) {
+          v.setValueCount(valueCount);
+        } else {
+          v.getMutator().setValueCount(valueCount);
+        }
       }
       MapVector.this.valueCount = valueCount;
     }
@@ -362,4 +373,10 @@ public class MapVector extends AbstractMapVector {
     return getChildren();
   }
 
+  public int getValueCount() { return 0; }
+
+  public void setValueCount(int valueCount) { }
+
+  public Object getObject(int index) { return null; }
+
 }
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
index e1c7c90..0de8044 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
@@ -265,7 +265,7 @@ public class JsonFileReader implements AutoCloseable, DictionaryProvider {
           ((NullableVarBinaryVector) vector).getMutator().setLastSet(count - 1);
           break;
         case VARCHAR:
-          ((NullableVarCharVector) vector).getMutator().setLastSet(count - 1);
+          ((NullableVarCharVector) vector).setLastSet(count - 1);
           break;
       }
       vector.getMutator().setValueCount(count);
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java b/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java
index 1a801a6..4fe2861 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java
@@ -40,7 +40,7 @@ public class TestBufferOwnershipTransfer {
 
     NullableIntVector v1 = new NullableIntVector("v1", childAllocator1);
     v1.allocateNew();
-    v1.getMutator().setValueCount(4095);
+    v1.setValueCount(4095);
 
     NullableIntVector v2 = new NullableIntVector("v2", childAllocator2);
 
@@ -60,8 +60,8 @@ public class TestBufferOwnershipTransfer {
 
     NullableVarCharVector v1 = new NullableVarCharVector("v1", childAllocator1);
     v1.allocateNew();
-    v1.getMutator().setSafe(4094, "hello world".getBytes(), 0, 11);
-    v1.getMutator().setValueCount(4001);
+    v1.setSafe(4094, "hello world".getBytes(), 0, 11);
+    v1.setValueCount(4001);
 
     NullableVarCharVector v2 = new NullableVarCharVector("v2", childAllocator2);
 
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java
index f8c16e7..1185246 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestDictionaryVector.java
@@ -56,24 +56,22 @@ public class TestDictionaryVector {
     // Create a new value vector
     try (final NullableVarCharVector vector = newNullableVarCharVector("foo", allocator);
          final NullableVarCharVector dictionaryVector = newNullableVarCharVector("dict", allocator);) {
-      final NullableVarCharVector.Mutator m = vector.getMutator();
       vector.allocateNew(512, 5);
 
       // set some values
-      m.setSafe(0, zero, 0, zero.length);
-      m.setSafe(1, one, 0, one.length);
-      m.setSafe(2, one, 0, one.length);
-      m.setSafe(3, two, 0, two.length);
-      m.setSafe(4, zero, 0, zero.length);
-      m.setValueCount(5);
+      vector.setSafe(0, zero, 0, zero.length);
+      vector.setSafe(1, one, 0, one.length);
+      vector.setSafe(2, one, 0, one.length);
+      vector.setSafe(3, two, 0, two.length);
+      vector.setSafe(4, zero, 0, zero.length);
+      vector.setValueCount(5);
 
       // set some dictionary values
-      final NullableVarCharVector.Mutator m2 = dictionaryVector.getMutator();
       dictionaryVector.allocateNew(512, 3);
-      m2.setSafe(0, zero, 0, zero.length);
-      m2.setSafe(1, one, 0, one.length);
-      m2.setSafe(2, two, 0, two.length);
-      m2.setValueCount(3);
+      dictionaryVector.setSafe(0, zero, 0, zero.length);
+      dictionaryVector.setSafe(1, one, 0, one.length);
+      dictionaryVector.setSafe(2, two, 0, two.length);
+      dictionaryVector.setValueCount(3);
 
       Dictionary dictionary = new Dictionary(dictionaryVector, new DictionaryEncoding(1L, false, null));
 
@@ -81,20 +79,20 @@ public class TestDictionaryVector {
         // verify indices
         assertEquals(NullableIntVector.class, encoded.getClass());
 
-        NullableIntVector.Accessor indexAccessor = ((NullableIntVector) encoded).getAccessor();
-        assertEquals(5, indexAccessor.getValueCount());
-        assertEquals(0, indexAccessor.get(0));
-        assertEquals(1, indexAccessor.get(1));
-        assertEquals(1, indexAccessor.get(2));
-        assertEquals(2, indexAccessor.get(3));
-        assertEquals(0, indexAccessor.get(4));
+        NullableIntVector index = ((NullableIntVector)encoded);
+        assertEquals(5, index.getValueCount());
+        assertEquals(0, index.get(0));
+        assertEquals(1, index.get(1));
+        assertEquals(1, index.get(2));
+        assertEquals(2, index.get(3));
+        assertEquals(0, index.get(4));
 
         // now run through the decoder and verify we get the original back
         try (ValueVector decoded = DictionaryEncoder.decode(encoded, dictionary)) {
           assertEquals(vector.getClass(), decoded.getClass());
-          assertEquals(vector.getAccessor().getValueCount(), decoded.getAccessor().getValueCount());
+          assertEquals(vector.getValueCount(), ((NullableVarCharVector)decoded).getValueCount());
           for (int i = 0; i < 5; i++) {
-            assertEquals(vector.getAccessor().getObject(i), decoded.getAccessor().getObject(i));
+            assertEquals(vector.getObject(i), ((NullableVarCharVector)decoded).getObject(i));
           }
         }
       }
@@ -106,21 +104,20 @@ public class TestDictionaryVector {
     // Create a new value vector
     try (final NullableVarCharVector vector = newNullableVarCharVector("foo", allocator);
          final NullableVarCharVector dictionaryVector = newNullableVarCharVector("dict", allocator);) {
-      final NullableVarCharVector.Mutator m = vector.getMutator();
       vector.allocateNew();
 
       int count = 10000;
 
       for (int i = 0; i < 10000; ++i) {
-        vector.getMutator().setSafe(i, data[i % 3], 0, data[i % 3].length);
+        vector.setSafe(i, data[i % 3], 0, data[i % 3].length);
       }
-      vector.getMutator().setValueCount(count);
+      vector.setValueCount(count);
 
       dictionaryVector.allocateNew(512, 3);
-      dictionaryVector.getMutator().setSafe(0, zero, 0, zero.length);
-      dictionaryVector.getMutator().setSafe(1, one, 0, one.length);
-      dictionaryVector.getMutator().setSafe(2, two, 0, two.length);
-      dictionaryVector.getMutator().setValueCount(3);
+      dictionaryVector.setSafe(0, zero, 0, zero.length);
+      dictionaryVector.setSafe(1, one, 0, one.length);
+      dictionaryVector.setSafe(2, two, 0, two.length);
+      dictionaryVector.setValueCount(3);
 
       Dictionary dictionary = new Dictionary(dictionaryVector, new DictionaryEncoding(1L, false, null));
 
@@ -129,10 +126,10 @@ public class TestDictionaryVector {
         // verify indices
         assertEquals(NullableIntVector.class, encoded.getClass());
 
-        NullableIntVector.Accessor indexAccessor = ((NullableIntVector) encoded).getAccessor();
-        assertEquals(count, indexAccessor.getValueCount());
+        NullableIntVector index = ((NullableIntVector) encoded);
+        assertEquals(count, index.getValueCount());
         for (int i = 0; i < count; ++i) {
-          assertEquals(i % 3, indexAccessor.get(i));
+          assertEquals(i % 3, index.get(i));
         }
 
         // now run through the decoder and verify we get the original back
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestFixedSizeListVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestFixedSizeListVector.java
index 43d9387..168deac 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestFixedSizeListVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestFixedSizeListVector.java
@@ -54,15 +54,14 @@ public class TestFixedSizeListVector {
   public void testIntType() {
     try (FixedSizeListVector vector = FixedSizeListVector.empty("list", 2, allocator)) {
       NullableIntVector nested = (NullableIntVector) vector.addOrGetVector(FieldType.nullable(MinorType.INT.getType())).getVector();
-      NullableIntVector.Mutator mutator = nested.getMutator();
       vector.allocateNew();
 
       for (int i = 0; i < 10; i++) {
         vector.getMutator().setNotNull(i);
-        mutator.set(i * 2, i);
-        mutator.set(i * 2 + 1, i + 10);
+        nested.set(i * 2, i);
+        nested.set(i * 2 + 1, i + 10);
       }
-      vector.getMutator().setValueCount(10);
+      vector.setValueCount(10);
 
       UnionFixedSizeListReader reader = vector.getReader();
       for (int i = 0; i < 10; i++) {
@@ -119,7 +118,7 @@ public class TestFixedSizeListVector {
       ListVector.Mutator mutator = vector.getMutator();
       FixedSizeListVector tuples = (FixedSizeListVector) vector.addOrGetVector(FieldType.nullable(new ArrowType.FixedSizeList(2))).getVector();
       FixedSizeListVector.Mutator tupleMutator = tuples.getMutator();
-      NullableIntVector.Mutator innerMutator = (NullableIntVector.Mutator) tuples.addOrGetVector(FieldType.nullable(MinorType.INT.getType())).getVector().getMutator();
+      NullableIntVector innerVector = (NullableIntVector) tuples.addOrGetVector(FieldType.nullable(MinorType.INT.getType())).getVector();
       vector.allocateNew();
 
       for (int i = 0; i < 10; i++) {
@@ -127,8 +126,8 @@ public class TestFixedSizeListVector {
           int position = mutator.startNewValue(i);
           for (int j = 0; j < i % 7; j++) {
             tupleMutator.setNotNull(position + j);
-            innerMutator.set((position + j) * 2, j);
-            innerMutator.set((position + j) * 2 + 1, j + 1);
+            innerVector.set((position + j) * 2, j);
+            innerVector.set((position + j) * 2 + 1, j + 1);
           }
           mutator.endValue(i, i % 7);
         }
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestSplitAndTransfer.java b/java/vector/src/test/java/org/apache/arrow/vector/TestSplitAndTransfer.java
index 66e5375..7de3bcb 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestSplitAndTransfer.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestSplitAndTransfer.java
@@ -25,7 +25,6 @@ import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 
 import org.apache.arrow.vector.NullableVarCharVector;
-import org.apache.arrow.vector.NullableVarCharVector.Accessor;
 import org.apache.arrow.vector.util.TransferPair;
 
 import org.junit.After;
@@ -54,32 +53,30 @@ public class TestSplitAndTransfer {
             final int valueCount = 500;
             final String[] compareArray = new String[valueCount];
 
-            final NullableVarCharVector.Mutator mutator = varCharVector.getMutator();
             for (int i = 0; i < valueCount; i += 3) {
                 final String s = String.format("%010d", i);
-                mutator.set(i, s.getBytes());
+                varCharVector.set(i, s.getBytes());
                 compareArray[i] = s;
             }
-            mutator.setValueCount(valueCount);
+            varCharVector.setValueCount(valueCount);
 
             final TransferPair tp = varCharVector.getTransferPair(allocator);
             final NullableVarCharVector newVarCharVector = (NullableVarCharVector) tp.getTo();
-            final Accessor accessor = newVarCharVector.getAccessor();
             final int[][] startLengths = {{0, 201}, {201, 200}, {401, 99}};
 
             for (final int[] startLength : startLengths) {
                 final int start = startLength[0];
                 final int length = startLength[1];
                 tp.splitAndTransfer(start, length);
-                newVarCharVector.getMutator().setValueCount(length);
+                newVarCharVector.setValueCount(length);
                 for (int i = 0; i < length; i++) {
                     final boolean expectedSet = ((start + i) % 3) == 0;
                     if (expectedSet) {
                         final byte[] expectedValue = compareArray[start + i].getBytes();
-                        assertFalse(accessor.isNull(i));
-                        assertArrayEquals(expectedValue, accessor.get(i));
+                        assertFalse(newVarCharVector.isNull(i));
+                        assertArrayEquals(expectedValue, newVarCharVector.get(i));
                     } else {
-                        assertTrue(accessor.isNull(i));
+                        assertTrue(newVarCharVector.isNull(i));
                     }
                 }
                 newVarCharVector.clear();
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
index a239861..b7f88c3 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
@@ -694,8 +694,6 @@ public class TestValueVector {
   public void testNullableFixedType3() {
     // Create a new value vector for 1024 integers
     try (final NullableIntVector vector = newVector(NullableIntVector.class, EMPTY_SCHEMA_PATH, MinorType.INT, allocator)) {
-      final NullableIntVector.Mutator mutator = vector.getMutator();
-      final NullableIntVector.Accessor accessor = vector.getAccessor();
       boolean error = false;
       int initialCapacity = 1024;
 
@@ -706,26 +704,26 @@ public class TestValueVector {
       /* underlying buffer should be able to store 16 values */
       assertEquals(initialCapacity, vector.getValueCapacity());
 
-      mutator.set(0, 1);
-      mutator.set(1, 2);
-      mutator.set(100, 3);
-      mutator.set(1022, 4);
-      mutator.set(1023, 5);
+      vector.set(0, 1);
+      vector.set(1, 2);
+      vector.set(100, 3);
+      vector.set(1022, 4);
+      vector.set(1023, 5);
 
       /* check vector contents */
       int j = 1;
       for(int i = 0; i <= 1023; i++) {
         if((i >= 2 && i <= 99) || (i >= 101 && i <= 1021)) {
-          assertTrue("non-null data not expected at index: " + i, accessor.isNull(i));
+          assertTrue("non-null data not expected at index: " + i, vector.isNull(i));
         }
         else {
-          assertFalse("null data not expected at index: " + i, accessor.isNull(i));
-          assertEquals("unexpected value at index: " + i, j, accessor.get(i));
+          assertFalse("null data not expected at index: " + i, vector.isNull(i));
+          assertEquals("unexpected value at index: " + i, j, vector.get(i));
           j++;
         }
       }
 
-      mutator.setValueCount(1024);
+      vector.setValueCount(1024);
       Field field = vector.getField();
       TypeLayout typeLayout = field.getTypeLayout();
 
@@ -749,7 +747,7 @@ public class TestValueVector {
       assertEquals(-64, validityVectorBuf.getByte(127)); // 1022nd and 1023rd bit defined
 
       /* this should trigger a realloc() */
-      mutator.setSafe(1024, 6);
+      vector.setSafe(1024, 6);
 
       /* underlying buffer should now be able to store double the number of values */
       assertEquals(initialCapacity * 2, vector.getValueCapacity());
@@ -758,11 +756,11 @@ public class TestValueVector {
       j = 1;
       for(int i = 0; i < (initialCapacity * 2); i++) {
         if((i > 1024) || (i >= 2 && i <= 99) || (i >= 101 && i <= 1021)) {
-          assertTrue("non-null data not expected at index: " + i, accessor.isNull(i));
+          assertTrue("non-null data not expected at index: " + i, vector.isNull(i));
         }
         else {
-          assertFalse("null data not expected at index: " + i, accessor.isNull(i));
-          assertEquals("unexpected value at index: " + i, j, accessor.get(i));
+          assertFalse("null data not expected at index: " + i, vector.isNull(i));
+          assertEquals("unexpected value at index: " + i, j, vector.get(i));
           j++;
         }
       }
@@ -775,13 +773,101 @@ public class TestValueVector {
 
       /* vector data should have been zeroed out */
       for(int i = 0; i < (initialCapacity * 2); i++) {
-        assertTrue("non-null data not expected at index: " + i, accessor.isNull(i));
+        assertTrue("non-null data not expected at index: " + i, vector.isNull(i));
       }
 
       vector.allocateNew(4096);
       // vector has been erased
       for(int i = 0; i < 4096; i++) {
-        assertTrue("non-null data not expected at index: " + i, accessor.isNull(i));
+        assertTrue("non-null data not expected at index: " + i, vector.isNull(i));
+      }
+    }
+  }
+
+  @Test /* NullableIntVector */
+  public void testNullableFixedType4() {
+    try (final NullableIntVector vector = newVector(NullableIntVector.class, EMPTY_SCHEMA_PATH, MinorType.INT, allocator)) {
+
+      /* no memory allocation has happened yet */
+      assertEquals(0, vector.getValueCapacity());
+
+      vector.allocateNew();
+      int valueCapacity = vector.getValueCapacity();
+      assertEquals(vector.INITIAL_VALUE_ALLOCATION, valueCapacity);
+
+      int baseValue = 20000;
+
+      for (int i = 0; i < valueCapacity; i++) {
+        if ((i & 1) == 1) {
+          vector.set(i, baseValue + i);
+        }
+      }
+
+      for (int i = 0; i < valueCapacity; i++) {
+        if ((i & 1) == 1) {
+          assertFalse("unexpected null value at index: " + i, vector.isNull(i));
+          assertEquals("unexpected value at index: " + i, (baseValue + i), vector.get(i));
+        } else {
+          assertTrue("unexpected non-null value at index: " + i, vector.isNull(i));
+        }
+      }
+
+      vector.setSafe(valueCapacity, 20000000);
+      assertEquals(valueCapacity * 2, vector.getValueCapacity());
+
+      for (int i = 0; i < vector.getValueCapacity(); i++) {
+        if (i == valueCapacity) {
+          assertFalse("unexpected null value at index: " + i, vector.isNull(i));
+          assertEquals("unexpected value at index: " + i, 20000000, vector.get(i));
+        } else if (i < valueCapacity) {
+          if ((i & 1) == 1) {
+            assertFalse("unexpected null value at index: " + i, vector.isNull(i));
+            assertEquals("unexpected value at index: " + i, (baseValue + i), vector.get(i));
+          }
+        } else {
+          assertTrue("unexpected non-null value at index: " + i, vector.isNull(i));
+        }
+      }
+
+      vector.zeroVector();
+
+      for (int i = 0; i < vector.getValueCapacity(); i+=2) {
+          vector.set(i, baseValue + i);
+      }
+
+      for (int i = 0; i < vector.getValueCapacity(); i++) {
+        if (i%2 == 0) {
+          assertFalse("unexpected null value at index: " + i, vector.isNull(i));
+          assertEquals("unexpected value at index: " + i, (baseValue + i), vector.get(i));
+        } else {
+          assertTrue("unexpected non-null value at index: " + i, vector.isNull(i));
+        }
+      }
+
+      vector.setSafe((valueCapacity *  2) + 1000, 400000000);
+      assertEquals(valueCapacity * 4, vector.getValueCapacity());
+
+      for (int i = 0; i < vector.getValueCapacity(); i++) {
+        if (i == (valueCapacity*2 + 1000)) {
+          assertFalse("unexpected null value at index: " + i, vector.isNull(i));
+          assertEquals("unexpected value at index: " + i, 400000000, vector.get(i));
+        } else if (i < valueCapacity*2 && (i%2) == 0) {
+          assertFalse("unexpected null value at index: " + i, vector.isNull(i));
+          assertEquals("unexpected value at index: " + i, baseValue + i, vector.get(i));
+        } else {
+          assertTrue("unexpected non-null value at index: " + i, vector.isNull(i));
+        }
+      }
+
+      /* reset the vector */
+      vector.reset();
+
+       /* capacity shouldn't change after reset */
+      assertEquals(valueCapacity * 4, vector.getValueCapacity());
+
+      /* vector data should be zeroed out */
+      for(int i = 0; i < (valueCapacity * 4); i++) {
+        assertTrue("non-null data not expected at index: " + i, vector.isNull(i));
       }
     }
   }
@@ -805,32 +891,30 @@ public class TestValueVector {
 
     // Create a new value vector for 1024 integers.
     try (final NullableVarCharVector vector = newNullableVarCharVector(EMPTY_SCHEMA_PATH, allocator)) {
-      final NullableVarCharVector.Mutator m = vector.getMutator();
       vector.allocateNew(1024 * 10, 1024);
 
-      m.set(0, STR1);
-      m.set(1, STR2);
-      m.set(2, STR3);
-      m.setSafe(3, STR3, 1, STR3.length - 1);
-      m.setSafe(4, STR3, 2, STR3.length - 2);
+      vector.set(0, STR1);
+      vector.set(1, STR2);
+      vector.set(2, STR3);
+      vector.setSafe(3, STR3, 1, STR3.length - 1);
+      vector.setSafe(4, STR3, 2, STR3.length - 2);
       ByteBuffer STR3ByteBuffer = ByteBuffer.wrap(STR3);
-      m.setSafe(5, STR3ByteBuffer, 1, STR3.length - 1);
-      m.setSafe(6, STR3ByteBuffer, 2, STR3.length - 2);
+      vector.setSafe(5, STR3ByteBuffer, 1, STR3.length - 1);
+      vector.setSafe(6, STR3ByteBuffer, 2, STR3.length - 2);
 
       // Check the sample strings.
-      final NullableVarCharVector.Accessor accessor = vector.getAccessor();
-      assertArrayEquals(STR1, accessor.get(0));
-      assertArrayEquals(STR2, accessor.get(1));
-      assertArrayEquals(STR3, accessor.get(2));
-      assertArrayEquals(Arrays.copyOfRange(STR3, 1, STR3.length), accessor.get(3));
-      assertArrayEquals(Arrays.copyOfRange(STR3, 2, STR3.length), accessor.get(4));
-      assertArrayEquals(Arrays.copyOfRange(STR3, 1, STR3.length), accessor.get(5));
-      assertArrayEquals(Arrays.copyOfRange(STR3, 2, STR3.length), accessor.get(6));
+      assertArrayEquals(STR1, vector.get(0));
+      assertArrayEquals(STR2, vector.get(1));
+      assertArrayEquals(STR3, vector.get(2));
+      assertArrayEquals(Arrays.copyOfRange(STR3, 1, STR3.length), vector.get(3));
+      assertArrayEquals(Arrays.copyOfRange(STR3, 2, STR3.length), vector.get(4));
+      assertArrayEquals(Arrays.copyOfRange(STR3, 1, STR3.length), vector.get(5));
+      assertArrayEquals(Arrays.copyOfRange(STR3, 2, STR3.length), vector.get(6));
 
       // Ensure null value throws.
       boolean b = false;
       try {
-        vector.getAccessor().get(7);
+        vector.get(7);
       } catch (IllegalStateException e) {
         b = true;
       } finally {
@@ -1070,43 +1154,42 @@ public class TestValueVector {
   @Test /* NullableVarCharVector */
   public void testReallocAfterVectorTransfer3() {
     try (final NullableVarCharVector vector = new NullableVarCharVector(EMPTY_SCHEMA_PATH, allocator)) {
-      final NullableVarCharVector.Mutator mutator = vector.getMutator();
-      final NullableVarCharVector.Accessor accessor = vector.getAccessor();
-
       /* 4096 values with 10 byte per record */
       vector.allocateNew(4096 * 10, 4096);
       int valueCapacity = vector.getValueCapacity();
+      assertEquals(4096, valueCapacity);
 
       /* populate the vector */
       for (int i = 0; i < valueCapacity; i++) {
         if ((i & 1) == 1) {
-          mutator.set(i, STR1);
+          vector.set(i, STR1);
         }
         else {
-          mutator.set(i, STR2);
+          vector.set(i, STR2);
         }
       }
 
       /* Check the vector output */
       for (int i = 0; i < valueCapacity; i++) {
         if ((i & 1) == 1) {
-          assertArrayEquals(STR1, accessor.get(i));
+          assertArrayEquals(STR1, vector.get(i));
         }
         else {
-          assertArrayEquals(STR2, accessor.get(i));
+          assertArrayEquals(STR2, vector.get(i));
         }
       }
 
       /* trigger first realloc */
-      mutator.setSafe(valueCapacity, STR2, 0, STR2.length);
+      vector.setSafe(valueCapacity, STR2, 0, STR2.length);
+      assertEquals(valueCapacity * 2, vector.getValueCapacity());
 
       /* populate the remaining vector */
       for (int i = valueCapacity; i < vector.getValueCapacity(); i++) {
         if ((i & 1) == 1) {
-          mutator.set(i, STR1);
+          vector.set(i, STR1);
         }
         else {
-          mutator.set(i, STR2);
+          vector.set(i, STR2);
         }
       }
 
@@ -1114,23 +1197,24 @@ public class TestValueVector {
       valueCapacity = vector.getValueCapacity();
       for (int i = 0; i < valueCapacity; i++) {
         if ((i & 1) == 1) {
-          assertArrayEquals(STR1, accessor.get(i));
+          assertArrayEquals(STR1, vector.get(i));
         }
         else {
-          assertArrayEquals(STR2, accessor.get(i));
+          assertArrayEquals(STR2, vector.get(i));
         }
       }
 
       /* trigger second realloc */
-      mutator.setSafe(valueCapacity + 10, STR2, 0, STR2.length);
+      vector.setSafe(valueCapacity + 10, STR2, 0, STR2.length);
+      assertEquals(valueCapacity * 2, vector.getValueCapacity());
 
       /* populate the remaining vector */
       for (int i = valueCapacity; i < vector.getValueCapacity(); i++) {
         if ((i & 1) == 1) {
-          mutator.set(i, STR1);
+          vector.set(i, STR1);
         }
         else {
-          mutator.set(i, STR2);
+          vector.set(i, STR2);
         }
       }
 
@@ -1138,10 +1222,10 @@ public class TestValueVector {
       valueCapacity = vector.getValueCapacity();
       for (int i = 0; i < valueCapacity; i++) {
         if ((i & 1) == 1) {
-          assertArrayEquals(STR1, accessor.get(i));
+          assertArrayEquals(STR1, vector.get(i));
         }
         else {
-          assertArrayEquals(STR2, accessor.get(i));
+          assertArrayEquals(STR2, vector.get(i));
         }
       }
 
@@ -1152,13 +1236,112 @@ public class TestValueVector {
       TransferPair transferPair = vector.getTransferPair(allocator);
       transferPair.transfer();
       NullableVarCharVector toVector = (NullableVarCharVector)transferPair.getTo();
-      NullableVarCharVector.Mutator toMutator = toVector.getMutator();
-      NullableVarCharVector.Accessor toAccessor = toVector.getAccessor();
-
       valueCapacity = toVector.getValueCapacity();
 
-      /* trigger a realloc of this toVector */
-      toMutator.setSafe(valueCapacity + 10, STR2, 0, STR2.length);
+      for (int i = 0; i < valueCapacity; i++) {
+        if ((i & 1) == 1) {
+          assertArrayEquals(STR1, toVector.get(i));
+        }
+        else {
+          assertArrayEquals(STR2, toVector.get(i));
+        }
+      }
+
+      toVector.close();
+    }
+  }
+
+  @Test /* NullableIntVector */
+  public void testReallocAfterVectorTransfer4() {
+    try (final NullableIntVector vector = new NullableIntVector(EMPTY_SCHEMA_PATH, allocator)) {
+
+      /* 4096 values  */
+      vector.allocateNew(4096);
+      int valueCapacity = vector.getValueCapacity();
+      assertEquals(4096, valueCapacity);
+
+      /* populate the vector */
+      int baseValue = 1000;
+      for (int i = 0; i < valueCapacity; i++) {
+        if ((i & 1) == 0) {
+          vector.set(i, 1000 + i);
+        }
+      }
+
+      /* Check the vector output */
+      for (int i = 0; i < valueCapacity; i++) {
+        if ((i & 1) == 0) {
+          assertEquals(1000 + i, vector.get(i));
+        }
+        else {
+          assertTrue(vector.isNull(i));
+        }
+      }
+
+      /* trigger first realloc */
+      vector.setSafe(valueCapacity, 10000000);
+      assertEquals(valueCapacity * 2, vector.getValueCapacity());
+
+      /* populate the remaining vector */
+      for (int i = valueCapacity; i < vector.getValueCapacity(); i++) {
+        if ((i & 1) == 0) {
+          vector.set(i, 1000 + i);
+        }
+      }
+
+      /* Check the vector output */
+      valueCapacity = vector.getValueCapacity();
+      for (int i = 0; i < valueCapacity; i++) {
+        if ((i & 1) == 0) {
+          assertEquals(1000 + i, vector.get(i));
+        }
+        else {
+          assertTrue(vector.isNull(i));
+        }
+      }
+
+      /* trigger second realloc */
+      vector.setSafe(valueCapacity, 10000000);
+      assertEquals(valueCapacity * 2, vector.getValueCapacity());
+
+      /* populate the remaining vector */
+      for (int i = valueCapacity; i < vector.getValueCapacity(); i++) {
+        if ((i & 1) == 0) {
+          vector.set(i, 1000 + i);
+        }
+      }
+
+      /* Check the vector output */
+      valueCapacity = vector.getValueCapacity();
+      for (int i = 0; i < valueCapacity; i++) {
+        if ((i & 1) == 0) {
+          assertEquals(1000 + i, vector.get(i));
+        }
+        else {
+          assertTrue(vector.isNull(i));
+        }
+      }
+
+      /* we are potentially working with 4x the size of vector buffer
+       * that we initially started with. Now let's transfer the vector.
+       */
+
+      TransferPair transferPair = vector.getTransferPair(allocator);
+      transferPair.transfer();
+      NullableIntVector toVector = (NullableIntVector)transferPair.getTo();
+      /* value capacity of source and target vectors should be same after
+       * the transfer.
+       */
+      assertEquals(valueCapacity, toVector.getValueCapacity());
+
+      for (int i = 0; i < valueCapacity; i++) {
+        if ((i & 1) == 0) {
+          assertEquals(1000 + i, toVector.get(i));
+        }
+        else {
+          assertTrue(toVector.isNull(i));
+        }
+      }
 
       toVector.close();
     }
@@ -1199,31 +1382,33 @@ public class TestValueVector {
 
   @Test
   public void testReAllocNullableVariableWidthVector() {
-    // Create a new value vector for 1024 integers
     try (final NullableVarCharVector vector = newVector(NullableVarCharVector.class, EMPTY_SCHEMA_PATH, MinorType.VARCHAR, allocator)) {
-      final NullableVarCharVector.Mutator m = vector.getMutator();
       vector.allocateNew();
 
       int initialCapacity = vector.getValueCapacity();
+      assertEquals(4095, initialCapacity);
 
-      // Put values in indexes that fall within the initial allocation
-      m.setSafe(0, STR1, 0, STR1.length);
-      m.setSafe(initialCapacity - 1, STR2, 0, STR2.length);
+      /* Put values in indexes that fall within the initial allocation */
+      vector.setSafe(0, STR1, 0, STR1.length);
+      vector.setSafe(initialCapacity - 1, STR2, 0, STR2.length);
 
-      // Now try to put values in space that falls beyond the initial allocation
-      m.setSafe(initialCapacity + 200, STR3, 0, STR3.length);
+      /* the above set calls should NOT have triggered a realloc */
+      initialCapacity = vector.getValueCapacity();
+      assertEquals(4095, initialCapacity);
 
-      // Check valueCapacity is more than initial allocation
-      assertEquals((initialCapacity + 1) * 2 - 1, vector.getValueCapacity());
+      /* Now try to put values in space that falls beyond the initial allocation */
+      vector.setSafe(initialCapacity + 200, STR3, 0, STR3.length);
 
-      final NullableVarCharVector.Accessor accessor = vector.getAccessor();
-      assertArrayEquals(STR1, accessor.get(0));
-      assertArrayEquals(STR2, accessor.get(initialCapacity - 1));
-      assertArrayEquals(STR3, accessor.get(initialCapacity + 200));
+      /* Check valueCapacity is more than initial allocation */
+      assertEquals(((initialCapacity + 1) * 2) - 1, vector.getValueCapacity());
+
+      assertArrayEquals(STR1, vector.get(0));
+      assertArrayEquals(STR2, vector.get(initialCapacity - 1));
+      assertArrayEquals(STR3, vector.get(initialCapacity + 200));
 
       // Set the valueCount to be more than valueCapacity of current allocation. This is possible for NullableValueVectors
       // as we don't call setSafe for null values, but we do call setValueCount when the current batch is processed.
-      m.setValueCount(vector.getValueCapacity() + 200);
+      vector.setValueCount(vector.getValueCapacity() + 200);
     }
   }
 
@@ -1232,10 +1417,18 @@ public class TestValueVector {
     try (final NullableVarCharVector vector = newVector(NullableVarCharVector.class, EMPTY_SCHEMA_PATH, MinorType.VARCHAR, allocator)) {
       vector.allocateNew();
 
-      vector.getMutator().setSafe(4094, "hello".getBytes(), 0, 5);
-      vector.getMutator().setValueCount(4095);
+      int initialCapacity = vector.getValueCapacity();
+      assertEquals(4095, initialCapacity);
+
+      vector.setSafe(4094, "hello".getBytes(), 0, 5);
+      /* the above set method should NOT have trigerred a realloc */
+      initialCapacity = vector.getValueCapacity();
+      assertEquals(4095, initialCapacity);
 
-      assertEquals(4096 * 4, vector.getFieldBuffers().get(1).capacity());
+      vector.setValueCount(4095);
+      assertEquals(4096 * vector.OFFSET_WIDTH, vector.getFieldBuffers().get(1).capacity());
+      initialCapacity = vector.getValueCapacity();
+      assertEquals(4095, initialCapacity);
     }
   }
 
@@ -1243,42 +1436,129 @@ public class TestValueVector {
   public void testCopyFromWithNulls() {
     try (final NullableVarCharVector vector = newVector(NullableVarCharVector.class, EMPTY_SCHEMA_PATH, MinorType.VARCHAR, allocator);
          final NullableVarCharVector vector2 = newVector(NullableVarCharVector.class, EMPTY_SCHEMA_PATH, MinorType.VARCHAR, allocator)) {
+
       vector.allocateNew();
+      int capacity = vector.getValueCapacity();
+      assertEquals(4095, capacity);
 
       for (int i = 0; i < 4095; i++) {
         if (i % 3 == 0) {
           continue;
         }
         byte[] b = Integer.toString(i).getBytes();
-        vector.getMutator().setSafe(i, b, 0, b.length);
+        vector.setSafe(i, b, 0, b.length);
       }
 
-      vector.getMutator().setValueCount(4095);
+      /* NO reAlloc() should have happened in setSafe() */
+      capacity = vector.getValueCapacity();
+      assertEquals(4095, capacity);
+
+      vector.setValueCount(4095);
+
+      for (int i = 0; i < 4095; i++) {
+        if (i % 3 == 0) {
+          assertNull(vector.getObject(i));
+        } else {
+          assertEquals("unexpected value at index: " + i, Integer.toString(i), vector.getObject(i).toString());
+        }
+      }
 
       vector2.allocateNew();
+      capacity = vector2.getValueCapacity();
+      assertEquals(4095, capacity);
 
       for (int i = 0; i < 4095; i++) {
         vector2.copyFromSafe(i, i, vector);
+        if (i % 3 == 0) {
+          assertNull(vector2.getObject(i));
+        } else {
+          assertEquals("unexpected value at index: " + i, Integer.toString(i), vector2.getObject(i).toString());
+        }
       }
 
-      vector2.getMutator().setValueCount(4095);
+      /* NO reAlloc() should have happened in copyFrom */
+      capacity = vector2.getValueCapacity();
+      assertEquals(4095, capacity);
+
+      vector2.setValueCount(4095);
 
       for (int i = 0; i < 4095; i++) {
         if (i % 3 == 0) {
-          assertNull(vector2.getAccessor().getObject(i));
+          assertNull(vector2.getObject(i));
         } else {
-          assertEquals(Integer.toString(i), vector2.getAccessor().getObject(i).toString());
+          assertEquals("unexpected value at index: " + i, Integer.toString(i), vector2.getObject(i).toString());
         }
       }
     }
   }
 
   @Test
-  public void testSetLastSetUsage() {
-    try (final NullableVarCharVector vector = new NullableVarCharVector("myvector", allocator)) {
+  public void testCopyFromWithNulls1() {
+    try (final NullableVarCharVector vector = newVector(NullableVarCharVector.class, EMPTY_SCHEMA_PATH, MinorType.VARCHAR, allocator);
+         final NullableVarCharVector vector2 = newVector(NullableVarCharVector.class, EMPTY_SCHEMA_PATH, MinorType.VARCHAR, allocator)) {
 
-      final NullableVarCharVector.Mutator mutator = vector.getMutator();
+      vector.allocateNew();
+      int capacity = vector.getValueCapacity();
+      assertEquals(4095, capacity);
 
+      for (int i = 0; i < 4095; i++) {
+        if (i % 3 == 0) {
+          continue;
+        }
+        byte[] b = Integer.toString(i).getBytes();
+        vector.setSafe(i, b, 0, b.length);
+      }
+
+      /* NO reAlloc() should have happened in setSafe() */
+      capacity = vector.getValueCapacity();
+      assertEquals(4095, capacity);
+
+      vector.setValueCount(4095);
+
+      for (int i = 0; i < 4095; i++) {
+        if (i % 3 == 0) {
+          assertNull(vector.getObject(i));
+        } else {
+          assertEquals("unexpected value at index: " + i, Integer.toString(i), vector.getObject(i).toString());
+        }
+      }
+
+      /* set lesser initial capacity than actually needed
+       * to trigger reallocs in copyFromSafe()
+       */
+      vector2.allocateNew(1024 * 10, 1024);
+
+      capacity = vector2.getValueCapacity();
+      assertEquals(1024, capacity);
+
+      for (int i = 0; i < 4095; i++) {
+        vector2.copyFromSafe(i, i, vector);
+        if (i % 3 == 0) {
+          assertNull(vector2.getObject(i));
+        } else {
+          assertEquals("unexpected value at index: " + i, Integer.toString(i), vector2.getObject(i).toString());
+        }
+      }
+
+      /* 2 reAllocs should have happened in copyFromSafe() */
+      capacity = vector2.getValueCapacity();
+      assertEquals(4096, capacity);
+
+      vector2.setValueCount(4095);
+
+      for (int i = 0; i < 4095; i++) {
+        if (i % 3 == 0) {
+          assertNull(vector2.getObject(i));
+        } else {
+          assertEquals("unexpected value at index: " + i, Integer.toString(i), vector2.getObject(i).toString());
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSetLastSetUsage() {
+    try (final NullableVarCharVector vector = new NullableVarCharVector("myvector", allocator)) {
       vector.allocateNew(1024 * 10, 1024);
 
       setBytes(0, STR1, vector);
@@ -1289,32 +1569,102 @@ public class TestValueVector {
       setBytes(5, STR6, vector);
 
       /* Check current lastSet */
-      assertEquals(Integer.toString(-1), Integer.toString(mutator.getLastSet()));
+      assertEquals(Integer.toString(-1), Integer.toString(vector.getLastSet()));
 
       /* Check the vector output */
-      final NullableVarCharVector.Accessor accessor = vector.getAccessor();
-      assertArrayEquals(STR1, accessor.get(0));
-      assertArrayEquals(STR2, accessor.get(1));
-      assertArrayEquals(STR3, accessor.get(2));
-      assertArrayEquals(STR4, accessor.get(3));
-      assertArrayEquals(STR5, accessor.get(4));
-      assertArrayEquals(STR6, accessor.get(5));
+      assertArrayEquals(STR1, vector.get(0));
+      assertArrayEquals(STR2, vector.get(1));
+      assertArrayEquals(STR3, vector.get(2));
+      assertArrayEquals(STR4, vector.get(3));
+      assertArrayEquals(STR5, vector.get(4));
+      assertArrayEquals(STR6, vector.get(5));
 
       /*
        * If we don't do setLastSe(5) before setValueCount(), then the latter will corrupt
        * the value vector by filling in all positions [0,valuecount-1] will empty byte arrays.
        * Run the test by commenting out next line and we should see incorrect vector output.
        */
-      mutator.setLastSet(5);
-      mutator.setValueCount(20);
+      vector.setLastSet(5);
+      vector.setValueCount(20);
+
+      /* Check current lastSet */
+      assertEquals(Integer.toString(19), Integer.toString(vector.getLastSet()));
 
       /* Check the vector output again */
-      assertArrayEquals(STR1, accessor.get(0));
-      assertArrayEquals(STR2, accessor.get(1));
-      assertArrayEquals(STR3, accessor.get(2));
-      assertArrayEquals(STR4, accessor.get(3));
-      assertArrayEquals(STR5, accessor.get(4));
-      assertArrayEquals(STR6, accessor.get(5));
+      assertArrayEquals(STR1, vector.get(0));
+      assertArrayEquals(STR2, vector.get(1));
+      assertArrayEquals(STR3, vector.get(2));
+      assertArrayEquals(STR4, vector.get(3));
+      assertArrayEquals(STR5, vector.get(4));
+      assertArrayEquals(STR6, vector.get(5));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(6)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(7)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(8)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(9)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(10)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(11)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(12)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(13)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(14)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(15)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(16)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(17)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(18)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(19)));
+
+      /* Check offsets */
+      assertEquals(Integer.toString(0),
+              Integer.toString(vector.offsetBuffer.getInt(0 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(6),
+              Integer.toString(vector.offsetBuffer.getInt(1 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(16),
+              Integer.toString(vector.offsetBuffer.getInt(2 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(21),
+              Integer.toString(vector.offsetBuffer.getInt(3 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(30),
+              Integer.toString(vector.offsetBuffer.getInt(4 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(34),
+              Integer.toString(vector.offsetBuffer.getInt(5 * vector.OFFSET_WIDTH)));
+
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(6 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(7 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(8 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(9 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(10 * vector.OFFSET_WIDTH)));
+
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(11 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(12 * vector.OFFSET_WIDTH)));
+
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(13 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(14 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(15 * vector.OFFSET_WIDTH)));
+
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(16 * vector.OFFSET_WIDTH)));
+
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(17 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(18 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(19 * vector.OFFSET_WIDTH)));
+
+      vector.set(19, STR6);
+      assertArrayEquals(STR6, vector.get(19));
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(19 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(46),
+              Integer.toString(vector.offsetBuffer.getInt(20 * vector.OFFSET_WIDTH)));
     }
   }
 
@@ -1322,29 +1672,25 @@ public class TestValueVector {
   public void testVectorLoadUnload() {
 
     try (final NullableVarCharVector vector1 = new NullableVarCharVector("myvector", allocator)) {
-
-      final NullableVarCharVector.Mutator mutator1 = vector1.getMutator();
-
       vector1.allocateNew(1024 * 10, 1024);
 
-      mutator1.set(0, STR1);
-      mutator1.set(1, STR2);
-      mutator1.set(2, STR3);
-      mutator1.set(3, STR4);
-      mutator1.set(4, STR5);
-      mutator1.set(5, STR6);
-      assertEquals(Integer.toString(5), Integer.toString(mutator1.getLastSet()));
-      mutator1.setValueCount(15);
-      assertEquals(Integer.toString(14), Integer.toString(mutator1.getLastSet()));
+      vector1.set(0, STR1);
+      vector1.set(1, STR2);
+      vector1.set(2, STR3);
+      vector1.set(3, STR4);
+      vector1.set(4, STR5);
+      vector1.set(5, STR6);
+      assertEquals(Integer.toString(5), Integer.toString(vector1.getLastSet()));
+      vector1.setValueCount(15);
+      assertEquals(Integer.toString(14), Integer.toString(vector1.getLastSet()));
 
       /* Check the vector output */
-      final NullableVarCharVector.Accessor accessor1 = vector1.getAccessor();
-      assertArrayEquals(STR1, accessor1.get(0));
-      assertArrayEquals(STR2, accessor1.get(1));
-      assertArrayEquals(STR3, accessor1.get(2));
-      assertArrayEquals(STR4, accessor1.get(3));
-      assertArrayEquals(STR5, accessor1.get(4));
-      assertArrayEquals(STR6, accessor1.get(5));
+      assertArrayEquals(STR1, vector1.get(0));
+      assertArrayEquals(STR2, vector1.get(1));
+      assertArrayEquals(STR3, vector1.get(2));
+      assertArrayEquals(STR4, vector1.get(3));
+      assertArrayEquals(STR5, vector1.get(4));
+      assertArrayEquals(STR6, vector1.get(5));
 
       Field field = vector1.getField();
       String fieldName = field.getName();
@@ -1357,7 +1703,7 @@ public class TestValueVector {
 
       Schema schema = new Schema(fields);
 
-      VectorSchemaRoot schemaRoot1 = new VectorSchemaRoot(schema, fieldVectors, accessor1.getValueCount());
+      VectorSchemaRoot schemaRoot1 = new VectorSchemaRoot(schema, fieldVectors, vector1.getValueCount());
       VectorUnloader vectorUnloader = new VectorUnloader(schemaRoot1);
 
       try (
@@ -1370,24 +1716,21 @@ public class TestValueVector {
         vectorLoader.load(recordBatch);
 
         NullableVarCharVector vector2 = (NullableVarCharVector) schemaRoot2.getVector(fieldName);
-        NullableVarCharVector.Mutator mutator2 = vector2.getMutator();
-
         /*
          * lastSet would have internally been set by VectorLoader.load() when it invokes
          * loadFieldBuffers.
          */
-        assertEquals(Integer.toString(14), Integer.toString(mutator2.getLastSet()));
-        mutator2.setValueCount(25);
-        assertEquals(Integer.toString(24), Integer.toString(mutator2.getLastSet()));
+        assertEquals(Integer.toString(14), Integer.toString(vector2.getLastSet()));
+        vector2.setValueCount(25);
+        assertEquals(Integer.toString(24), Integer.toString(vector2.getLastSet()));
 
         /* Check the vector output */
-        final NullableVarCharVector.Accessor accessor2 = vector2.getAccessor();
-        assertArrayEquals(STR1, accessor2.get(0));
-        assertArrayEquals(STR2, accessor2.get(1));
-        assertArrayEquals(STR3, accessor2.get(2));
-        assertArrayEquals(STR4, accessor2.get(3));
-        assertArrayEquals(STR5, accessor2.get(4));
-        assertArrayEquals(STR6, accessor2.get(5));
+        assertArrayEquals(STR1, vector2.get(0));
+        assertArrayEquals(STR2, vector2.get(1));
+        assertArrayEquals(STR3, vector2.get(2));
+        assertArrayEquals(STR4, vector2.get(3));
+        assertArrayEquals(STR5, vector2.get(4));
+        assertArrayEquals(STR6, vector2.get(5));
       }
     }
   }
@@ -1396,8 +1739,6 @@ public class TestValueVector {
   public void testFillEmptiesUsage() {
     try (final NullableVarCharVector vector = new NullableVarCharVector("myvector", allocator)) {
 
-      final NullableVarCharVector.Mutator mutator = vector.getMutator();
-
       vector.allocateNew(1024 * 10, 1024);
 
       setBytes(0, STR1, vector);
@@ -1408,84 +1749,98 @@ public class TestValueVector {
       setBytes(5, STR6, vector);
 
       /* Check current lastSet */
-      assertEquals(Integer.toString(-1), Integer.toString(mutator.getLastSet()));
+      assertEquals(Integer.toString(-1), Integer.toString(vector.getLastSet()));
 
       /* Check the vector output */
-      final NullableVarCharVector.Accessor accessor = vector.getAccessor();
-      assertArrayEquals(STR1, accessor.get(0));
-      assertArrayEquals(STR2, accessor.get(1));
-      assertArrayEquals(STR3, accessor.get(2));
-      assertArrayEquals(STR4, accessor.get(3));
-      assertArrayEquals(STR5, accessor.get(4));
-      assertArrayEquals(STR6, accessor.get(5));
-
-      mutator.setLastSet(5);
+      assertArrayEquals(STR1, vector.get(0));
+      assertArrayEquals(STR2, vector.get(1));
+      assertArrayEquals(STR3, vector.get(2));
+      assertArrayEquals(STR4, vector.get(3));
+      assertArrayEquals(STR5, vector.get(4));
+      assertArrayEquals(STR6, vector.get(5));
+
+      vector.setLastSet(5);
       /* fill empty byte arrays from index [6, 9] */
-      mutator.fillEmpties(10);
+      vector.fillEmpties(10);
 
       /* Check current lastSet */
-      assertEquals(Integer.toString(9), Integer.toString(mutator.getLastSet()));
+      assertEquals(Integer.toString(9), Integer.toString(vector.getLastSet()));
 
       /* Check the vector output */
-      assertArrayEquals(STR1, accessor.get(0));
-      assertArrayEquals(STR2, accessor.get(1));
-      assertArrayEquals(STR3, accessor.get(2));
-      assertArrayEquals(STR4, accessor.get(3));
-      assertArrayEquals(STR5, accessor.get(4));
-      assertArrayEquals(STR6, accessor.get(5));
-      assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(6)));
-      assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(7)));
-      assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(8)));
-      assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(9)));
+      assertArrayEquals(STR1, vector.get(0));
+      assertArrayEquals(STR2, vector.get(1));
+      assertArrayEquals(STR3, vector.get(2));
+      assertArrayEquals(STR4, vector.get(3));
+      assertArrayEquals(STR5, vector.get(4));
+      assertArrayEquals(STR6, vector.get(5));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(6)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(7)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(8)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(9)));
 
       setBytes(10, STR1, vector);
       setBytes(11, STR2, vector);
 
-      mutator.setLastSet(11);
+      vector.setLastSet(11);
       /* fill empty byte arrays from index [12, 14] */
-      mutator.setValueCount(15);
+      vector.setValueCount(15);
 
       /* Check current lastSet */
-      assertEquals(Integer.toString(14), Integer.toString(mutator.getLastSet()));
+      assertEquals(Integer.toString(14), Integer.toString(vector.getLastSet()));
 
       /* Check the vector output */
-      assertArrayEquals(STR1, accessor.get(0));
-      assertArrayEquals(STR2, accessor.get(1));
-      assertArrayEquals(STR3, accessor.get(2));
-      assertArrayEquals(STR4, accessor.get(3));
-      assertArrayEquals(STR5, accessor.get(4));
-      assertArrayEquals(STR6, accessor.get(5));
-      assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(6)));
-      assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(7)));
-      assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(8)));
-      assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(9)));
-      assertArrayEquals(STR1, accessor.get(10));
-      assertArrayEquals(STR2, accessor.get(11));
-      assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(12)));
-      assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(13)));
-      assertEquals(Integer.toString(0), Integer.toString(accessor.getValueLength(14)));
+      assertArrayEquals(STR1, vector.get(0));
+      assertArrayEquals(STR2, vector.get(1));
+      assertArrayEquals(STR3, vector.get(2));
+      assertArrayEquals(STR4, vector.get(3));
+      assertArrayEquals(STR5, vector.get(4));
+      assertArrayEquals(STR6, vector.get(5));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(6)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(7)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(8)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(9)));
+      assertArrayEquals(STR1, vector.get(10));
+      assertArrayEquals(STR2, vector.get(11));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(12)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(13)));
+      assertEquals(Integer.toString(0), Integer.toString(vector.getValueLength(14)));
 
       /* Check offsets */
-      final UInt4Vector.Accessor offsetAccessor = vector.values.offsetVector.getAccessor();
-      assertEquals(Integer.toString(0), Integer.toString(offsetAccessor.get(0)));
-      assertEquals(Integer.toString(6), Integer.toString(offsetAccessor.get(1)));
-      assertEquals(Integer.toString(16), Integer.toString(offsetAccessor.get(2)));
-      assertEquals(Integer.toString(21), Integer.toString(offsetAccessor.get(3)));
-      assertEquals(Integer.toString(30), Integer.toString(offsetAccessor.get(4)));
-      assertEquals(Integer.toString(34), Integer.toString(offsetAccessor.get(5)));
-
-      assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(6)));
-      assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(7)));
-      assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(8)));
-      assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(9)));
-      assertEquals(Integer.toString(40), Integer.toString(offsetAccessor.get(10)));
-
-      assertEquals(Integer.toString(46), Integer.toString(offsetAccessor.get(11)));
-      assertEquals(Integer.toString(56), Integer.toString(offsetAccessor.get(12)));
-
-      assertEquals(Integer.toString(56), Integer.toString(offsetAccessor.get(13)));
-      assertEquals(Integer.toString(56), Integer.toString(offsetAccessor.get(14)));
-      assertEquals(Integer.toString(56), Integer.toString(offsetAccessor.get(15)));
+      assertEquals(Integer.toString(0),
+              Integer.toString(vector.offsetBuffer.getInt(0 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(6),
+              Integer.toString(vector.offsetBuffer.getInt(1 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(16),
+              Integer.toString(vector.offsetBuffer.getInt(2 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(21),
+              Integer.toString(vector.offsetBuffer.getInt(3 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(30),
+              Integer.toString(vector.offsetBuffer.getInt(4 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(34),
+              Integer.toString(vector.offsetBuffer.getInt(5 * vector.OFFSET_WIDTH)));
+
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(6 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(7 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(8 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(9 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(40),
+              Integer.toString(vector.offsetBuffer.getInt(10 * vector.OFFSET_WIDTH)));
+
+      assertEquals(Integer.toString(46),
+              Integer.toString(vector.offsetBuffer.getInt(11 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(56),
+              Integer.toString(vector.offsetBuffer.getInt(12 * vector.OFFSET_WIDTH)));
+
+      assertEquals(Integer.toString(56),
+              Integer.toString(vector.offsetBuffer.getInt(13 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(56),
+              Integer.toString(vector.offsetBuffer.getInt(14 * vector.OFFSET_WIDTH)));
+      assertEquals(Integer.toString(56),
+              Integer.toString(vector.offsetBuffer.getInt(15 * vector.OFFSET_WIDTH)));
     }
   }
 
@@ -1493,29 +1848,25 @@ public class TestValueVector {
   public void testGetBufferAddress1() {
 
     try (final NullableVarCharVector vector = new NullableVarCharVector("myvector", allocator)) {
-
-      final NullableVarCharVector.Mutator mutator = vector.getMutator();
-      final NullableVarCharVector.Accessor accessor = vector.getAccessor();
-
       vector.allocateNew(1024 * 10, 1024);
 
       /* populate the vector */
-      mutator.set(0, STR1);
-      mutator.set(1, STR2);
-      mutator.set(2, STR3);
-      mutator.set(3, STR4);
-      mutator.set(4, STR5);
-      mutator.set(5, STR6);
+      vector.set(0, STR1);
+      vector.set(1, STR2);
+      vector.set(2, STR3);
+      vector.set(3, STR4);
+      vector.set(4, STR5);
+      vector.set(5, STR6);
 
-      mutator.setValueCount(15);
+      vector.setValueCount(15);
 
       /* check the vector output */
-      assertArrayEquals(STR1, accessor.get(0));
-      assertArrayEquals(STR2, accessor.get(1));
-      assertArrayEquals(STR3, accessor.get(2));
-      assertArrayEquals(STR4, accessor.get(3));
-      assertArrayEquals(STR5, accessor.get(4));
-      assertArrayEquals(STR6, accessor.get(5));
+      assertArrayEquals(STR1, vector.get(0));
+      assertArrayEquals(STR2, vector.get(1));
+      assertArrayEquals(STR3, vector.get(2));
+      assertArrayEquals(STR4, vector.get(3));
+      assertArrayEquals(STR5, vector.get(4));
+      assertArrayEquals(STR6, vector.get(5));
 
       List<ArrowBuf> buffers = vector.getFieldBuffers();
       long bitAddress = vector.getValidityBufferAddress();
@@ -1531,23 +1882,18 @@ public class TestValueVector {
 
   @Test /* NullableIntVector */
   public void testGetBufferAddress2() {
-
     try (final NullableIntVector vector = new NullableIntVector("myvector", allocator)) {
-
-      final NullableIntVector.Mutator mutator = vector.getMutator();
-      final NullableIntVector.Accessor accessor = vector.getAccessor();
       boolean error = false;
-
       vector.allocateNew(16);
 
       /* populate the vector */
       for(int i = 0; i < 16; i += 2) {
-        mutator.set(i, i+10);
+        vector.set(i, i+10);
       }
 
       /* check the vector output */
       for(int i = 0; i < 16; i += 2) {
-        assertEquals(i+10, accessor.get(i));
+        assertEquals(i+10, vector.get(i));
       }
 
       List<ArrowBuf> buffers = vector.getFieldBuffers();
@@ -1580,11 +1926,15 @@ public class TestValueVector {
     vectorAllocator.close();
   }
 
+  /* this method is used by the tests to bypass the vector set methods that manipulate
+   * lastSet. The method is to test the lastSet property and that's why we load the vector
+   * in a way that lastSet is not set automatically.
+   */
   public static void setBytes(int index, byte[] bytes, NullableVarCharVector vector) {
-    final int currentOffset = vector.values.offsetVector.getAccessor().get(index);
+    final int currentOffset = vector.offsetBuffer.getInt(index * vector.OFFSET_WIDTH);
 
-    vector.bits.getMutator().setToOne(index);
-    vector.values.offsetVector.getMutator().set(index + 1, currentOffset + bytes.length);
-    vector.values.data.setBytes(currentOffset, bytes, 0, bytes.length);
+    BitVectorHelper.setValidityBitToOne(vector.validityBuffer, index);
+    vector.offsetBuffer.setInt((index + 1) * vector.OFFSET_WIDTH, currentOffset + bytes.length);
+    vector.valueBuffer.setBytes(currentOffset, bytes, 0, bytes.length);
   }
 }
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorReAlloc.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorReAlloc.java
index 4ac7536..531a46c 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorReAlloc.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorReAlloc.java
@@ -76,24 +76,23 @@ public class TestVectorReAlloc {
   @Test
   public void testNullableType() {
     try (final NullableVarCharVector vector = new NullableVarCharVector("", allocator)) {
-      final NullableVarCharVector.Mutator m = vector.getMutator();
       vector.setInitialCapacity(512);
       vector.allocateNew();
 
       assertEquals(512, vector.getValueCapacity());
 
       try {
-        m.set(512, "foo".getBytes(StandardCharsets.UTF_8));
+        vector.set(512, "foo".getBytes(StandardCharsets.UTF_8));
         Assert.fail("Expected out of bounds exception");
       } catch (Exception e) {
         // ok
       }
 
       vector.reAlloc();
-      assertEquals(1023, vector.getValueCapacity());
+      assertEquals(1024, vector.getValueCapacity());
 
-      m.set(512, "foo".getBytes(StandardCharsets.UTF_8));
-      assertEquals("foo", new String(vector.getAccessor().get(512), StandardCharsets.UTF_8));
+      vector.set(512, "foo".getBytes(StandardCharsets.UTF_8));
+      assertEquals("foo", new String(vector.get(512), StandardCharsets.UTF_8));
     }
   }
 
@@ -105,7 +104,7 @@ public class TestVectorReAlloc {
       vector.setInitialCapacity(512);
       vector.allocateNew();
 
-      assertEquals(1023, vector.getValueCapacity()); // TODO this doubles for some reason...
+      assertEquals(1023, vector.getValueCapacity());
 
       try {
         vector.getOffsetVector().getAccessor().get(2014);
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
index 7facf73..0b7928d 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
@@ -45,13 +45,21 @@ import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.types.pojo.Schema;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.*;
 
 public class TestVectorUnloadLoad {
 
-  static final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+  private BufferAllocator allocator;
+
+  @Before
+  public void init() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void terminate() throws Exception {
+    allocator.close();
+  }
 
   @Test
   public void testUnloadLoad() throws IOException {
@@ -183,24 +191,40 @@ public class TestVectorUnloadLoad {
    * @throws IOException
    */
   @Test
-  public void testLoadEmptyValidityBuffer() throws IOException {
+  public void testLoadValidityBuffer() throws IOException {
     Schema schema = new Schema(asList(
         new Field("intDefined", FieldType.nullable(new ArrowType.Int(32, true)), Collections.<Field>emptyList()),
         new Field("intNull", FieldType.nullable(new ArrowType.Int(32, true)), Collections.<Field>emptyList())
     ));
     int count = 10;
-    ArrowBuf validity = allocator.buffer(10).slice(0, 0);
-    ArrowBuf[] values = new ArrowBuf[2];
-    for (int i = 0; i < values.length; i++) {
-      ArrowBuf arrowBuf = allocator.buffer(count * 4); // integers
-      values[i] = arrowBuf;
+    ArrowBuf[] values = new ArrowBuf[4];
+    for (int i = 0; i < 4; i+=2) {
+      ArrowBuf buf1 = allocator.buffer((int)Math.ceil(count / 8.0));
+      ArrowBuf buf2 = allocator.buffer(count * 4); // integers
+      values[i] = buf1;
+      values[i+1] = buf2;
       for (int j = 0; j < count; j++) {
-        arrowBuf.setInt(j * 4, j);
+        if (i == 2) {
+          BitVectorHelper.setValidityBit(buf1, j, 0);
+        } else {
+          BitVectorHelper.setValidityBitToOne(buf1, j);
+        }
+
+        buf2.setInt(j * 4, j);
       }
-      arrowBuf.writerIndex(count * 4);
+      buf1.writerIndex((int)Math.ceil(count / 8));
+      buf2.writerIndex(count * 4);
     }
+
+    /*
+     * values[0] - validity buffer for first vector
+     * values[1] - data buffer for first vector
+     * values[2] - validity buffer for second vector
+     * values[3] - data buffer for second vector
+     */
+
     try (
-        ArrowRecordBatch recordBatch = new ArrowRecordBatch(count, asList(new ArrowFieldNode(count, 0), new ArrowFieldNode(count, count)), asList(validity, values[0], validity, values[1]));
+        ArrowRecordBatch recordBatch = new ArrowRecordBatch(count, asList(new ArrowFieldNode(count, 0), new ArrowFieldNode(count, count)), asList(values[0], values[1], values[2], values[3]));
         BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
         VectorSchemaRoot newRoot = VectorSchemaRoot.create(schema, finalVectorsAllocator);
     ) {
@@ -213,32 +237,31 @@ public class TestVectorUnloadLoad {
       NullableIntVector intDefinedVector = (NullableIntVector) newRoot.getVector("intDefined");
       NullableIntVector intNullVector = (NullableIntVector) newRoot.getVector("intNull");
       for (int i = 0; i < count; i++) {
-        assertFalse("#" + i, intDefinedVector.getAccessor().isNull(i));
-        assertEquals("#" + i, i, intDefinedVector.getAccessor().get(i));
-        assertTrue("#" + i, intNullVector.getAccessor().isNull(i));
+        assertFalse("#" + i, intDefinedVector.isNull(i));
+        assertEquals("#" + i, i, intDefinedVector.get(i));
+        assertTrue("#" + i, intNullVector.isNull(i));
       }
-      intDefinedVector.getMutator().setSafe(count + 10, 1234);
-      assertTrue(intDefinedVector.getAccessor().isNull(count + 1));
+      intDefinedVector.setSafe(count + 10, 1234);
+      assertTrue(intDefinedVector.isNull(count + 1));
       // empty slots should still default to unset
-      intDefinedVector.getMutator().setSafe(count + 1, 789);
-      assertFalse(intDefinedVector.getAccessor().isNull(count + 1));
-      assertEquals(789, intDefinedVector.getAccessor().get(count + 1));
-      assertTrue(intDefinedVector.getAccessor().isNull(count));
-      assertTrue(intDefinedVector.getAccessor().isNull(count + 2));
-      assertTrue(intDefinedVector.getAccessor().isNull(count + 3));
-      assertTrue(intDefinedVector.getAccessor().isNull(count + 4));
-      assertTrue(intDefinedVector.getAccessor().isNull(count + 5));
-      assertTrue(intDefinedVector.getAccessor().isNull(count + 6));
-      assertTrue(intDefinedVector.getAccessor().isNull(count + 7));
-      assertTrue(intDefinedVector.getAccessor().isNull(count + 8));
-      assertTrue(intDefinedVector.getAccessor().isNull(count + 9));
-      assertFalse(intDefinedVector.getAccessor().isNull(count + 10));
-      assertEquals(1234, intDefinedVector.getAccessor().get(count + 10));
+      intDefinedVector.setSafe(count + 1, 789);
+      assertFalse(intDefinedVector.isNull(count + 1));
+      assertEquals(789, intDefinedVector.get(count + 1));
+      assertTrue(intDefinedVector.isNull(count));
+      assertTrue(intDefinedVector.isNull(count + 2));
+      assertTrue(intDefinedVector.isNull(count + 3));
+      assertTrue(intDefinedVector.isNull(count + 4));
+      assertTrue(intDefinedVector.isNull(count + 5));
+      assertTrue(intDefinedVector.isNull(count + 6));
+      assertTrue(intDefinedVector.isNull(count + 7));
+      assertTrue(intDefinedVector.isNull(count + 8));
+      assertTrue(intDefinedVector.isNull(count + 9));
+      assertFalse(intDefinedVector.isNull(count + 10));
+      assertEquals(1234, intDefinedVector.get(count + 10));
     } finally {
       for (ArrowBuf arrowBuf : values) {
         arrowBuf.release();
       }
-      validity.release();
     }
   }
 
@@ -258,11 +281,11 @@ public class TestVectorUnloadLoad {
         FieldVector vector = field.createVector(originalVectorsAllocator);
         vector.allocateNew();
         sources.add(vector);
-        NullableIntVector.Mutator mutator = (NullableIntVector.Mutator) vector.getMutator();
+        NullableIntVector intVector = (NullableIntVector)vector;
         for (int i = 0; i < count; i++) {
-          mutator.set(i, i);
+          intVector.set(i, i);
         }
-        mutator.setValueCount(count);
+        intVector.setValueCount(count);
       }
 
       try (VectorSchemaRoot root = new VectorSchemaRoot(schema.getFields(), sources, count)) {
@@ -277,8 +300,8 @@ public class TestVectorUnloadLoad {
           List<FieldVector> targets = newRoot.getFieldVectors();
           Assert.assertEquals(sources.size(), targets.size());
           for (int k = 0; k < sources.size(); k++) {
-            NullableIntVector.Accessor src = (NullableIntVector.Accessor) sources.get(k).getAccessor();
-            NullableIntVector.Accessor tgt = (NullableIntVector.Accessor) targets.get(k).getAccessor();
+            NullableIntVector src = (NullableIntVector) sources.get(k);
+            NullableIntVector tgt = (NullableIntVector) targets.get(k);
             Assert.assertEquals(src.getValueCount(), tgt.getValueCount());
             for (int i = 0; i < count; i++) {
               Assert.assertEquals(src.get(i), tgt.get(i));
@@ -296,9 +319,4 @@ public class TestVectorUnloadLoad {
     VectorSchemaRoot vsr = new VectorSchemaRoot(schema.getFields(), fields, valueCount);
     return new VectorUnloader(vsr);
   }
-
-  @AfterClass
-  public static void afterClass() {
-    allocator.close();
-  }
 }
\ No newline at end of file
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java b/java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java
index ba62de0..60009b0 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java
@@ -97,8 +97,14 @@ public class BaseFileTest {
 
   protected void validateContent(int count, VectorSchemaRoot root) {
     for (int i = 0; i < count; i++) {
-      Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i));
-      Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i));
+      FieldVector fv = root.getVector("int");
+      if (fv instanceof NullableIntVector) {
+        Assert.assertEquals(i, fv.getObject(i));
+        Assert.assertEquals(Integer.valueOf(i), fv.getObject(i));
+      } else {
+        Assert.assertEquals(i, fv.getAccessor().getObject(i));
+        Assert.assertEquals(Long.valueOf(i), fv.getAccessor().getObject(i));
+      }
     }
   }
 
@@ -152,6 +158,7 @@ public class BaseFileTest {
     Assert.assertEquals(count, root.getRowCount());
     printVectors(root.getFieldVectors());
     for (int i = 0; i < count; i++) {
+
       Object intVal = root.getVector("int").getAccessor().getObject(i);
       if (i % 5 != 3) {
         Assert.assertEquals(i, intVal);
@@ -220,22 +227,20 @@ public class BaseFileTest {
     // Define dictionaries and add to provider
     NullableVarCharVector dictionary1Vector = newNullableVarCharVector("D1", bufferAllocator);
     dictionary1Vector.allocateNewSafe();
-    NullableVarCharVector.Mutator mutator = dictionary1Vector.getMutator();
-    mutator.set(0, "foo".getBytes(StandardCharsets.UTF_8));
-    mutator.set(1, "bar".getBytes(StandardCharsets.UTF_8));
-    mutator.set(2, "baz".getBytes(StandardCharsets.UTF_8));
-    mutator.setValueCount(3);
+    dictionary1Vector.set(0, "foo".getBytes(StandardCharsets.UTF_8));
+    dictionary1Vector.set(1, "bar".getBytes(StandardCharsets.UTF_8));
+    dictionary1Vector.set(2, "baz".getBytes(StandardCharsets.UTF_8));
+    dictionary1Vector.setValueCount(3);
 
     Dictionary dictionary1 = new Dictionary(dictionary1Vector, new DictionaryEncoding(1L, false, null));
     provider.put(dictionary1);
 
     NullableVarCharVector dictionary2Vector = newNullableVarCharVector("D2", bufferAllocator);
     dictionary2Vector.allocateNewSafe();
-    mutator = dictionary2Vector.getMutator();
-    mutator.set(0, "micro".getBytes(StandardCharsets.UTF_8));
-    mutator.set(1, "small".getBytes(StandardCharsets.UTF_8));
-    mutator.set(2, "large".getBytes(StandardCharsets.UTF_8));
-    mutator.setValueCount(3);
+    dictionary2Vector.set(0, "micro".getBytes(StandardCharsets.UTF_8));
+    dictionary2Vector.set(1, "small".getBytes(StandardCharsets.UTF_8));
+    dictionary2Vector.set(2, "large".getBytes(StandardCharsets.UTF_8));
+    dictionary2Vector.setValueCount(3);
 
     Dictionary dictionary2 = new Dictionary(dictionary2Vector, new DictionaryEncoding(2L, false, null));
     provider.put(dictionary2);
@@ -243,13 +248,12 @@ public class BaseFileTest {
     // Populate the vectors
     NullableVarCharVector vector1A = newNullableVarCharVector("varcharA", bufferAllocator);
     vector1A.allocateNewSafe();
-    mutator = vector1A.getMutator();
-    mutator.set(0, "foo".getBytes(StandardCharsets.UTF_8));
-    mutator.set(1, "bar".getBytes(StandardCharsets.UTF_8));
-    mutator.set(3, "baz".getBytes(StandardCharsets.UTF_8));
-    mutator.set(4, "bar".getBytes(StandardCharsets.UTF_8));
-    mutator.set(5, "baz".getBytes(StandardCharsets.UTF_8));
-    mutator.setValueCount(6);
+    vector1A.set(0, "foo".getBytes(StandardCharsets.UTF_8));
+    vector1A.set(1, "bar".getBytes(StandardCharsets.UTF_8));
+    vector1A.set(3, "baz".getBytes(StandardCharsets.UTF_8));
+    vector1A.set(4, "bar".getBytes(StandardCharsets.UTF_8));
+    vector1A.set(5, "baz".getBytes(StandardCharsets.UTF_8));
+    vector1A.setValueCount(6);
 
     FieldVector encodedVector1A = (FieldVector) DictionaryEncoder.encode(vector1A, dictionary1);
     vector1A.close();  // Done with this vector after encoding
@@ -257,22 +261,20 @@ public class BaseFileTest {
     // Write this vector using indices instead of encoding
     NullableIntVector encodedVector1B = new NullableIntVector("varcharB", bufferAllocator);
     encodedVector1B.allocateNewSafe();
-    NullableIntVector.Mutator mutator1B = encodedVector1B.getMutator();
-    mutator1B.set(0, 2);  // "baz"
-    mutator1B.set(1, 1);  // "bar"
-    mutator1B.set(2, 2);  // "baz"
-    mutator1B.set(4, 1);  // "bar"
-    mutator1B.set(5, 0);  // "foo"
-    mutator1B.setValueCount(6);
+    encodedVector1B.set(0, 2);  // "baz"
+    encodedVector1B.set(1, 1);  // "bar"
+    encodedVector1B.set(2, 2);  // "baz"
+    encodedVector1B.set(4, 1);  // "bar"
+    encodedVector1B.set(5, 0);  // "foo"
+    encodedVector1B.setValueCount(6);
 
     NullableVarCharVector vector2 = newNullableVarCharVector("sizes", bufferAllocator);
     vector2.allocateNewSafe();
-    mutator = vector2.getMutator();
-    mutator.set(1, "large".getBytes(StandardCharsets.UTF_8));
-    mutator.set(2, "small".getBytes(StandardCharsets.UTF_8));
-    mutator.set(3, "small".getBytes(StandardCharsets.UTF_8));
-    mutator.set(4, "large".getBytes(StandardCharsets.UTF_8));
-    mutator.setValueCount(6);
+    vector2.set(1, "large".getBytes(StandardCharsets.UTF_8));
+    vector2.set(2, "small".getBytes(StandardCharsets.UTF_8));
+    vector2.set(3, "small".getBytes(StandardCharsets.UTF_8));
+    vector2.set(4, "large".getBytes(StandardCharsets.UTF_8));
+    vector2.setValueCount(6);
 
     FieldVector encodedVector2 = (FieldVector) DictionaryEncoder.encode(vector2, dictionary2);
     vector2.close();  // Done with this vector after encoding
@@ -355,8 +357,8 @@ public class BaseFileTest {
     // Define the dictionary and add to the provider
     NullableVarCharVector dictionaryVector = newNullableVarCharVector("D2", bufferAllocator);
     dictionaryVector.allocateNewSafe();
-    dictionaryVector.getMutator().set(0, "foo".getBytes(StandardCharsets.UTF_8));
-    dictionaryVector.getMutator().set(1, "bar".getBytes(StandardCharsets.UTF_8));
+    dictionaryVector.set(0, "foo".getBytes(StandardCharsets.UTF_8));
+    dictionaryVector.set(1, "bar".getBytes(StandardCharsets.UTF_8));
     dictionaryVector.getMutator().setValueCount(2);
 
     Dictionary dictionary = new Dictionary(dictionaryVector, new DictionaryEncoding(2L, false, null));
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
index 81e5898..feae08e 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
@@ -581,7 +581,7 @@ public class TestArrowFile extends BaseFileTest {
         tuples.getMutator().setNotNull(i);
         floats.getMutator().set(i * 2, i + 0.1f);
         floats.getMutator().set(i * 2 + 1, i + 10.1f);
-        ints.getMutator().set(i, i);
+        ints.set(i, i);
       }
 
       parent.getMutator().setValueCount(10);

-- 
To stop receiving notification emails like this one, please contact
"commits@arrow.apache.org" <co...@arrow.apache.org>.