You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ja...@apache.org on 2016/02/17 13:39:39 UTC

[04/17] arrow git commit: ARROW-1: Initial Arrow Code Commit

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedListVector.java
new file mode 100644
index 0000000..778fe81
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedListVector.java
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.complex;
+
+import io.netty.buffer.ArrowBuf;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.OutOfMemoryException;
+import org.apache.arrow.vector.AddOrGetResult;
+import org.apache.arrow.vector.UInt4Vector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VectorDescriptor;
+import org.apache.arrow.vector.complex.impl.NullReader;
+import org.apache.arrow.vector.complex.impl.RepeatedListReaderImpl;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.holders.ComplexHolder;
+import org.apache.arrow.vector.holders.RepeatedListHolder;
+import org.apache.arrow.vector.types.MaterializedField;
+import org.apache.arrow.vector.types.Types.DataMode;
+import org.apache.arrow.vector.types.Types.MajorType;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.util.CallBack;
+import org.apache.arrow.vector.util.JsonStringArrayList;
+import org.apache.arrow.vector.util.TransferPair;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class RepeatedListVector extends AbstractContainerVector
+    implements RepeatedValueVector, RepeatedFixedWidthVectorLike {
+
+  public final static MajorType TYPE = new MajorType(MinorType.LIST, DataMode.REPEATED);
+  private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this);
+  private final DelegateRepeatedVector delegate;
+
+  protected static class DelegateRepeatedVector extends BaseRepeatedValueVector {
+
+    private final RepeatedListAccessor accessor = new RepeatedListAccessor();
+    private final RepeatedListMutator mutator = new RepeatedListMutator();
+    private final EmptyValuePopulator emptyPopulator;
+    private transient DelegateTransferPair ephPair;
+
+    public class RepeatedListAccessor extends BaseRepeatedValueVector.BaseRepeatedAccessor {
+
+      @Override
+      public Object getObject(int index) {
+        final List<Object> list = new JsonStringArrayList<>();
+        final int start = offsets.getAccessor().get(index);
+        final int until = offsets.getAccessor().get(index+1);
+        for (int i = start; i < until; i++) {
+          list.add(vector.getAccessor().getObject(i));
+        }
+        return list;
+      }
+
+      public void get(int index, RepeatedListHolder holder) {
+        assert index <= getValueCapacity();
+        holder.start = getOffsetVector().getAccessor().get(index);
+        holder.end = getOffsetVector().getAccessor().get(index+1);
+      }
+
+      public void get(int index, ComplexHolder holder) {
+        final FieldReader reader = getReader();
+        reader.setPosition(index);
+        holder.reader = reader;
+      }
+
+      public void get(int index, int arrayIndex, ComplexHolder holder) {
+        final RepeatedListHolder listHolder = new RepeatedListHolder();
+        get(index, listHolder);
+        int offset = listHolder.start + arrayIndex;
+        if (offset >= listHolder.end) {
+          holder.reader = NullReader.INSTANCE;
+        } else {
+          FieldReader r = getDataVector().getReader();
+          r.setPosition(offset);
+          holder.reader = r;
+        }
+      }
+    }
+
+    public class RepeatedListMutator extends BaseRepeatedValueVector.BaseRepeatedMutator {
+
+      public int add(int index) {
+        final int curEnd = getOffsetVector().getAccessor().get(index+1);
+        getOffsetVector().getMutator().setSafe(index + 1, curEnd + 1);
+        return curEnd;
+      }
+
+      @Override
+      public void startNewValue(int index) {
+        emptyPopulator.populate(index+1);
+        super.startNewValue(index);
+      }
+
+      @Override
+      public void setValueCount(int valueCount) {
+        emptyPopulator.populate(valueCount);
+        super.setValueCount(valueCount);
+      }
+    }
+
+
+    public class DelegateTransferPair implements TransferPair {
+      private final DelegateRepeatedVector target;
+      private final TransferPair[] children;
+
+      public DelegateTransferPair(DelegateRepeatedVector target) {
+        this.target = Preconditions.checkNotNull(target);
+        if (target.getDataVector() == DEFAULT_DATA_VECTOR) {
+          target.addOrGetVector(VectorDescriptor.create(getDataVector().getField()));
+          target.getDataVector().allocateNew();
+        }
+        this.children = new TransferPair[] {
+            getOffsetVector().makeTransferPair(target.getOffsetVector()),
+            getDataVector().makeTransferPair(target.getDataVector())
+        };
+      }
+
+      @Override
+      public void transfer() {
+        for (TransferPair child:children) {
+          child.transfer();
+        }
+      }
+
+      @Override
+      public ValueVector getTo() {
+        return target;
+      }
+
+      @Override
+      public void splitAndTransfer(int startIndex, int length) {
+        target.allocateNew();
+        for (int i = 0; i < length; i++) {
+          copyValueSafe(startIndex + i, i);
+        }
+      }
+
+      @Override
+      public void copyValueSafe(int srcIndex, int destIndex) {
+        final RepeatedListHolder holder = new RepeatedListHolder();
+        getAccessor().get(srcIndex, holder);
+        target.emptyPopulator.populate(destIndex+1);
+        final TransferPair vectorTransfer = children[1];
+        int newIndex = target.getOffsetVector().getAccessor().get(destIndex);
+        //todo: make this a bulk copy.
+        for (int i = holder.start; i < holder.end; i++, newIndex++) {
+          vectorTransfer.copyValueSafe(i, newIndex);
+        }
+        target.getOffsetVector().getMutator().setSafe(destIndex + 1, newIndex);
+      }
+    }
+
+    public DelegateRepeatedVector(String path, BufferAllocator allocator) {
+      this(MaterializedField.create(path, TYPE), allocator);
+    }
+
+    public DelegateRepeatedVector(MaterializedField field, BufferAllocator allocator) {
+      super(field, allocator);
+      emptyPopulator = new EmptyValuePopulator(getOffsetVector());
+    }
+
+    @Override
+    public void allocateNew() throws OutOfMemoryException {
+      if (!allocateNewSafe()) {
+        throw new OutOfMemoryException();
+      }
+    }
+
+    @Override
+    public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
+      return makeTransferPair(new DelegateRepeatedVector(ref, allocator));
+    }
+
+    @Override
+    public TransferPair makeTransferPair(ValueVector target) {
+      return new DelegateTransferPair(DelegateRepeatedVector.class.cast(target));
+    }
+
+    @Override
+    public RepeatedListAccessor getAccessor() {
+      return accessor;
+    }
+
+    @Override
+    public RepeatedListMutator getMutator() {
+      return mutator;
+    }
+
+    @Override
+    public FieldReader getReader() {
+      throw new UnsupportedOperationException();
+    }
+
+    public void copyFromSafe(int fromIndex, int thisIndex, DelegateRepeatedVector from) {
+      if(ephPair == null || ephPair.target != from) {
+        ephPair = DelegateTransferPair.class.cast(from.makeTransferPair(this));
+      }
+      ephPair.copyValueSafe(fromIndex, thisIndex);
+    }
+
+  }
+
+  protected class RepeatedListTransferPair implements TransferPair {
+    private final TransferPair delegate;
+
+    public RepeatedListTransferPair(TransferPair delegate) {
+      this.delegate = delegate;
+    }
+
+    public void transfer() {
+      delegate.transfer();
+    }
+
+    @Override
+    public void splitAndTransfer(int startIndex, int length) {
+      delegate.splitAndTransfer(startIndex, length);
+    }
+
+    @Override
+    public ValueVector getTo() {
+      final DelegateRepeatedVector delegateVector = DelegateRepeatedVector.class.cast(delegate.getTo());
+      return new RepeatedListVector(getField(), allocator, callBack, delegateVector);
+    }
+
+    @Override
+    public void copyValueSafe(int from, int to) {
+      delegate.copyValueSafe(from, to);
+    }
+  }
+
+  public RepeatedListVector(String path, BufferAllocator allocator, CallBack callBack) {
+    this(MaterializedField.create(path, TYPE), allocator, callBack);
+  }
+
+  public RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) {
+    this(field, allocator, callBack, new DelegateRepeatedVector(field, allocator));
+  }
+
+  protected RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack, DelegateRepeatedVector delegate) {
+    super(field, allocator, callBack);
+    this.delegate = Preconditions.checkNotNull(delegate);
+
+    final List<MaterializedField> children = Lists.newArrayList(field.getChildren());
+    final int childSize = children.size();
+    assert childSize < 3;
+    final boolean hasChild = childSize > 0;
+    if (hasChild) {
+      // the last field is data field
+      final MaterializedField child = children.get(childSize-1);
+      addOrGetVector(VectorDescriptor.create(child));
+    }
+  }
+
+
+    @Override
+  public RepeatedListReaderImpl getReader() {
+    return reader;
+  }
+
+  @Override
+  public DelegateRepeatedVector.RepeatedListAccessor getAccessor() {
+    return delegate.getAccessor();
+  }
+
+  @Override
+  public DelegateRepeatedVector.RepeatedListMutator getMutator() {
+    return delegate.getMutator();
+  }
+
+  @Override
+  public UInt4Vector getOffsetVector() {
+    return delegate.getOffsetVector();
+  }
+
+  @Override
+  public ValueVector getDataVector() {
+    return delegate.getDataVector();
+  }
+
+  @Override
+  public void allocateNew() throws OutOfMemoryException {
+    delegate.allocateNew();
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+    return delegate.allocateNewSafe();
+  }
+
+  @Override
+  public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) {
+    final AddOrGetResult<T> result = delegate.addOrGetVector(descriptor);
+    if (result.isCreated() && callBack != null) {
+      callBack.doWork();
+    }
+    this.field = delegate.getField();
+    return result;
+  }
+
+  @Override
+  public int size() {
+    return delegate.size();
+  }
+
+  @Override
+  public int getBufferSize() {
+    return delegate.getBufferSize();
+  }
+
+  @Override
+  public int getBufferSizeFor(final int valueCount) {
+    return delegate.getBufferSizeFor(valueCount);
+  }
+
+  @Override
+  public void close() {
+    delegate.close();
+  }
+
+  @Override
+  public void clear() {
+    delegate.clear();
+  }
+
+  @Override
+  public TransferPair getTransferPair(BufferAllocator allocator) {
+    return new RepeatedListTransferPair(delegate.getTransferPair(allocator));
+  }
+
+  @Override
+  public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
+    return new RepeatedListTransferPair(delegate.getTransferPair(ref, allocator));
+  }
+
+  @Override
+  public TransferPair makeTransferPair(ValueVector to) {
+    final RepeatedListVector target = RepeatedListVector.class.cast(to);
+    return new RepeatedListTransferPair(delegate.makeTransferPair(target.delegate));
+  }
+
+  @Override
+  public int getValueCapacity() {
+    return delegate.getValueCapacity();
+  }
+
+  @Override
+  public ArrowBuf[] getBuffers(boolean clear) {
+    return delegate.getBuffers(clear);
+  }
+
+
+//  @Override
+//  public void load(SerializedField metadata, DrillBuf buf) {
+//    delegate.load(metadata, buf);
+//  }
+
+//  @Override
+//  public SerializedField getMetadata() {
+//    return delegate.getMetadata();
+//  }
+
+  @Override
+  public Iterator<ValueVector> iterator() {
+    return delegate.iterator();
+  }
+
+  @Override
+  public void setInitialCapacity(int numRecords) {
+    delegate.setInitialCapacity(numRecords);
+  }
+
+  /**
+   * @deprecated
+   *   prefer using {@link #addOrGetVector(org.apache.arrow.vector.VectorDescriptor)} instead.
+   */
+  @Override
+  public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
+    final AddOrGetResult<T> result = addOrGetVector(VectorDescriptor.create(type));
+    return result.getVector();
+  }
+
+  @Override
+  public <T extends ValueVector> T getChild(String name, Class<T> clazz) {
+    if (name != null) {
+      return null;
+    }
+    return typeify(delegate.getDataVector(), clazz);
+  }
+
+  @Override
+  public void allocateNew(int valueCount, int innerValueCount) {
+    clear();
+    getOffsetVector().allocateNew(valueCount + 1);
+    getMutator().reset();
+  }
+
+  @Override
+  public VectorWithOrdinal getChildVectorWithOrdinal(String name) {
+    if (name != null) {
+      return null;
+    }
+    return new VectorWithOrdinal(delegate.getDataVector(), 0);
+  }
+
+  public void copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) {
+    delegate.copyFromSafe(fromIndex, thisIndex, from.delegate);
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedMapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedMapVector.java
new file mode 100644
index 0000000..e7eacd3
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedMapVector.java
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.complex;
+
+import io.netty.buffer.ArrowBuf;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.OutOfMemoryException;
+import org.apache.arrow.vector.AddOrGetResult;
+import org.apache.arrow.vector.AllocationHelper;
+import org.apache.arrow.vector.UInt4Vector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VectorDescriptor;
+import org.apache.arrow.vector.complex.impl.NullReader;
+import org.apache.arrow.vector.complex.impl.RepeatedMapReaderImpl;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.holders.ComplexHolder;
+import org.apache.arrow.vector.holders.RepeatedMapHolder;
+import org.apache.arrow.vector.types.MaterializedField;
+import org.apache.arrow.vector.types.Types.DataMode;
+import org.apache.arrow.vector.types.Types.MajorType;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.util.CallBack;
+import org.apache.arrow.vector.util.JsonStringArrayList;
+import org.apache.arrow.vector.util.TransferPair;
+import org.apache.commons.lang3.ArrayUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class RepeatedMapVector extends AbstractMapVector
+    implements RepeatedValueVector, RepeatedFixedWidthVectorLike {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedMapVector.class);
+
+  public final static MajorType TYPE = new MajorType(MinorType.MAP, DataMode.REPEATED);
+
+  private final UInt4Vector offsets;   // offsets to start of each record (considering record indices are 0-indexed)
+  private final RepeatedMapReaderImpl reader = new RepeatedMapReaderImpl(RepeatedMapVector.this);
+  private final RepeatedMapAccessor accessor = new RepeatedMapAccessor();
+  private final Mutator mutator = new Mutator();
+  private final EmptyValuePopulator emptyPopulator;
+
+  public RepeatedMapVector(MaterializedField field, BufferAllocator allocator, CallBack callBack){
+    super(field, allocator, callBack);
+    this.offsets = new UInt4Vector(BaseRepeatedValueVector.OFFSETS_FIELD, allocator);
+    this.emptyPopulator = new EmptyValuePopulator(offsets);
+  }
+
+  @Override
+  public UInt4Vector getOffsetVector() {
+    return offsets;
+  }
+
+  @Override
+  public ValueVector getDataVector() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setInitialCapacity(int numRecords) {
+    offsets.setInitialCapacity(numRecords + 1);
+    for(final ValueVector v : (Iterable<ValueVector>) this) {
+      v.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
+    }
+  }
+
+  @Override
+  public RepeatedMapReaderImpl getReader() {
+    return reader;
+  }
+
+  @Override
+  public void allocateNew(int groupCount, int innerValueCount) {
+    clear();
+    try {
+      offsets.allocateNew(groupCount + 1);
+      for (ValueVector v : getChildren()) {
+        AllocationHelper.allocatePrecomputedChildCount(v, groupCount, 50, innerValueCount);
+      }
+    } catch (OutOfMemoryException e){
+      clear();
+      throw e;
+    }
+    offsets.zeroVector();
+    mutator.reset();
+  }
+
+  public Iterator<String> fieldNameIterator() {
+    return getChildFieldNames().iterator();
+  }
+
+  @Override
+  public List<ValueVector> getPrimitiveVectors() {
+    final List<ValueVector> primitiveVectors = super.getPrimitiveVectors();
+    primitiveVectors.add(offsets);
+    return primitiveVectors;
+  }
+
+  @Override
+  public int getBufferSize() {
+    if (getAccessor().getValueCount() == 0) {
+      return 0;
+    }
+    long bufferSize = offsets.getBufferSize();
+    for (final ValueVector v : (Iterable<ValueVector>) this) {
+      bufferSize += v.getBufferSize();
+    }
+    return (int) bufferSize;
+  }
+
+  @Override
+  public int getBufferSizeFor(final int valueCount) {
+    if (valueCount == 0) {
+      return 0;
+    }
+
+    long bufferSize = 0;
+    for (final ValueVector v : (Iterable<ValueVector>) this) {
+      bufferSize += v.getBufferSizeFor(valueCount);
+    }
+
+    return (int) bufferSize;
+  }
+
+  @Override
+  public void close() {
+    offsets.close();
+    super.close();
+  }
+
+  @Override
+  public TransferPair getTransferPair(BufferAllocator allocator) {
+    return new RepeatedMapTransferPair(this, getField().getPath(), allocator);
+  }
+
+  @Override
+  public TransferPair makeTransferPair(ValueVector to) {
+    return new RepeatedMapTransferPair(this, (RepeatedMapVector)to);
+  }
+
+  MapSingleCopier makeSingularCopier(MapVector to) {
+    return new MapSingleCopier(this, to);
+  }
+
+  protected static class MapSingleCopier {
+    private final TransferPair[] pairs;
+    public final RepeatedMapVector from;
+
+    public MapSingleCopier(RepeatedMapVector from, MapVector to) {
+      this.from = from;
+      this.pairs = new TransferPair[from.size()];
+
+      int i = 0;
+      ValueVector vector;
+      for (final String child:from.getChildFieldNames()) {
+        int preSize = to.size();
+        vector = from.getChild(child);
+        if (vector == null) {
+          continue;
+        }
+        final ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
+        if (to.size() != preSize) {
+          newVector.allocateNew();
+        }
+        pairs[i++] = vector.makeTransferPair(newVector);
+      }
+    }
+
+    public void copySafe(int fromSubIndex, int toIndex) {
+      for (TransferPair p : pairs) {
+        p.copyValueSafe(fromSubIndex, toIndex);
+      }
+    }
+  }
+
+  public TransferPair getTransferPairToSingleMap(String reference, BufferAllocator allocator) {
+    return new SingleMapTransferPair(this, reference, allocator);
+  }
+
+  @Override
+  public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
+    return new RepeatedMapTransferPair(this, ref, allocator);
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+    /* boolean to keep track if all the memory allocation were successful
+     * Used in the case of composite vectors when we need to allocate multiple
+     * buffers for multiple vectors. If one of the allocations failed we need to
+     * clear all the memory that we allocated
+     */
+    boolean success = false;
+    try {
+      if (!offsets.allocateNewSafe()) {
+        return false;
+      }
+      success =  super.allocateNewSafe();
+    } finally {
+      if (!success) {
+        clear();
+      }
+    }
+    offsets.zeroVector();
+    return success;
+  }
+
+  protected static class SingleMapTransferPair implements TransferPair {
+    private final TransferPair[] pairs;
+    private final RepeatedMapVector from;
+    private final MapVector to;
+    private static final MajorType MAP_TYPE = new MajorType(MinorType.MAP, DataMode.REQUIRED);
+
+    public SingleMapTransferPair(RepeatedMapVector from, String path, BufferAllocator allocator) {
+      this(from, new MapVector(MaterializedField.create(path, MAP_TYPE), allocator, from.callBack), false);
+    }
+
+    public SingleMapTransferPair(RepeatedMapVector from, MapVector to) {
+      this(from, to, true);
+    }
+
+    public SingleMapTransferPair(RepeatedMapVector from, MapVector to, boolean allocate) {
+      this.from = from;
+      this.to = to;
+      this.pairs = new TransferPair[from.size()];
+      int i = 0;
+      ValueVector vector;
+      for (final String child : from.getChildFieldNames()) {
+        int preSize = to.size();
+        vector = from.getChild(child);
+        if (vector == null) {
+          continue;
+        }
+        final ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
+        if (allocate && to.size() != preSize) {
+          newVector.allocateNew();
+        }
+        pairs[i++] = vector.makeTransferPair(newVector);
+      }
+    }
+
+
+    @Override
+    public void transfer() {
+      for (TransferPair p : pairs) {
+        p.transfer();
+      }
+      to.getMutator().setValueCount(from.getAccessor().getValueCount());
+      from.clear();
+    }
+
+    @Override
+    public ValueVector getTo() {
+      return to;
+    }
+
+    @Override
+    public void copyValueSafe(int from, int to) {
+      for (TransferPair p : pairs) {
+        p.copyValueSafe(from, to);
+      }
+    }
+
+    @Override
+    public void splitAndTransfer(int startIndex, int length) {
+      for (TransferPair p : pairs) {
+        p.splitAndTransfer(startIndex, length);
+      }
+      to.getMutator().setValueCount(length);
+    }
+  }
+
+  private static class RepeatedMapTransferPair implements TransferPair{
+
+    private final TransferPair[] pairs;
+    private final RepeatedMapVector to;
+    private final RepeatedMapVector from;
+
+    public RepeatedMapTransferPair(RepeatedMapVector from, String path, BufferAllocator allocator) {
+      this(from, new RepeatedMapVector(MaterializedField.create(path, TYPE), allocator, from.callBack), false);
+    }
+
+    public RepeatedMapTransferPair(RepeatedMapVector from, RepeatedMapVector to) {
+      this(from, to, true);
+    }
+
+    public RepeatedMapTransferPair(RepeatedMapVector from, RepeatedMapVector to, boolean allocate) {
+      this.from = from;
+      this.to = to;
+      this.pairs = new TransferPair[from.size()];
+      this.to.ephPair = null;
+
+      int i = 0;
+      ValueVector vector;
+      for (final String child : from.getChildFieldNames()) {
+        final int preSize = to.size();
+        vector = from.getChild(child);
+        if (vector == null) {
+          continue;
+        }
+
+        final ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
+        if (to.size() != preSize) {
+          newVector.allocateNew();
+        }
+
+        pairs[i++] = vector.makeTransferPair(newVector);
+      }
+    }
+
+    @Override
+    public void transfer() {
+      from.offsets.transferTo(to.offsets);
+      for (TransferPair p : pairs) {
+        p.transfer();
+      }
+      from.clear();
+    }
+
+    @Override
+    public ValueVector getTo() {
+      return to;
+    }
+
+    @Override
+    public void copyValueSafe(int srcIndex, int destIndex) {
+      RepeatedMapHolder holder = new RepeatedMapHolder();
+      from.getAccessor().get(srcIndex, holder);
+      to.emptyPopulator.populate(destIndex + 1);
+      int newIndex = to.offsets.getAccessor().get(destIndex);
+      //todo: make these bulk copies
+      for (int i = holder.start; i < holder.end; i++, newIndex++) {
+        for (TransferPair p : pairs) {
+          p.copyValueSafe(i, newIndex);
+        }
+      }
+      to.offsets.getMutator().setSafe(destIndex + 1, newIndex);
+    }
+
+    @Override
+    public void splitAndTransfer(final int groupStart, final int groups) {
+      final UInt4Vector.Accessor a = from.offsets.getAccessor();
+      final UInt4Vector.Mutator m = to.offsets.getMutator();
+
+      final int startPos = a.get(groupStart);
+      final int endPos = a.get(groupStart + groups);
+      final int valuesToCopy = endPos - startPos;
+
+      to.offsets.clear();
+      to.offsets.allocateNew(groups + 1);
+
+      int normalizedPos;
+      for (int i = 0; i < groups + 1; i++) {
+        normalizedPos = a.get(groupStart + i) - startPos;
+        m.set(i, normalizedPos);
+      }
+
+      m.setValueCount(groups + 1);
+      to.emptyPopulator.populate(groups);
+
+      for (final TransferPair p : pairs) {
+        p.splitAndTransfer(startPos, valuesToCopy);
+      }
+    }
+  }
+
+
+  transient private RepeatedMapTransferPair ephPair;
+
+  public void copyFromSafe(int fromIndex, int thisIndex, RepeatedMapVector from) {
+    if (ephPair == null || ephPair.from != from) {
+      ephPair = (RepeatedMapTransferPair) from.makeTransferPair(this);
+    }
+    ephPair.copyValueSafe(fromIndex, thisIndex);
+  }
+
+  @Override
+  public int getValueCapacity() {
+    return Math.max(offsets.getValueCapacity() - 1, 0);
+  }
+
+  @Override
+  public RepeatedMapAccessor getAccessor() {
+    return accessor;
+  }
+
+  @Override
+  public ArrowBuf[] getBuffers(boolean clear) {
+    final int expectedBufferSize = getBufferSize();
+    final int actualBufferSize = super.getBufferSize();
+
+    Preconditions.checkArgument(expectedBufferSize == actualBufferSize + offsets.getBufferSize());
+    return ArrayUtils.addAll(offsets.getBuffers(clear), super.getBuffers(clear));
+  }
+
+
+//  @Override
+//  public void load(SerializedField metadata, DrillBuf buffer) {
+//    final List<SerializedField> children = metadata.getChildList();
+//
+//    final SerializedField offsetField = children.get(0);
+//    offsets.load(offsetField, buffer);
+//    int bufOffset = offsetField.getBufferLength();
+//
+//    for (int i = 1; i < children.size(); i++) {
+//      final SerializedField child = children.get(i);
+//      final MaterializedField fieldDef = SerializedFieldHelper.create(child);
+//      ValueVector vector = getChild(fieldDef.getLastName());
+//      if (vector == null) {
+        // if we arrive here, we didn't have a matching vector.
+//        vector = BasicTypeHelper.getNewVector(fieldDef, allocator);
+//        putChild(fieldDef.getLastName(), vector);
+//      }
+//      final int vectorLength = child.getBufferLength();
+//      vector.load(child, buffer.slice(bufOffset, vectorLength));
+//      bufOffset += vectorLength;
+//    }
+//
+//    assert bufOffset == buffer.capacity();
+//  }
+//
+//
+//  @Override
+//  public SerializedField getMetadata() {
+//    SerializedField.Builder builder = getField() //
+//        .getAsBuilder() //
+//        .setBufferLength(getBufferSize()) //
+        // while we don't need to actually read this on load, we need it to make sure we don't skip deserialization of this vector
+//        .setValueCount(accessor.getValueCount());
+//    builder.addChild(offsets.getMetadata());
+//    for (final ValueVector child : getChildren()) {
+//      builder.addChild(child.getMetadata());
+//    }
+//    return builder.build();
+//  }
+
+  @Override
+  public Mutator getMutator() {
+    return mutator;
+  }
+
+  public class RepeatedMapAccessor implements RepeatedAccessor {
+    @Override
+    public Object getObject(int index) {
+      final List<Object> list = new JsonStringArrayList<>();
+      final int end = offsets.getAccessor().get(index+1);
+      String fieldName;
+      for (int i =  offsets.getAccessor().get(index); i < end; i++) {
+        final Map<String, Object> vv = Maps.newLinkedHashMap();
+        for (final MaterializedField field : getField().getChildren()) {
+          if (!field.equals(BaseRepeatedValueVector.OFFSETS_FIELD)) {
+            fieldName = field.getLastName();
+            final Object value = getChild(fieldName).getAccessor().getObject(i);
+            if (value != null) {
+              vv.put(fieldName, value);
+            }
+          }
+        }
+        list.add(vv);
+      }
+      return list;
+    }
+
+    @Override
+    public int getValueCount() {
+      return Math.max(offsets.getAccessor().getValueCount() - 1, 0);
+    }
+
+    @Override
+    public int getInnerValueCount() {
+      final int valueCount = getValueCount();
+      if (valueCount == 0) {
+        return 0;
+      }
+      return offsets.getAccessor().get(valueCount);
+    }
+
+    @Override
+    public int getInnerValueCountAt(int index) {
+      return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index);
+    }
+
+    @Override
+    public boolean isEmpty(int index) {
+      return false;
+    }
+
+    @Override
+    public boolean isNull(int index) {
+      return false;
+    }
+
+    public void get(int index, RepeatedMapHolder holder) {
+      assert index < getValueCapacity() :
+        String.format("Attempted to access index %d when value capacity is %d",
+            index, getValueCapacity());
+      final UInt4Vector.Accessor offsetsAccessor = offsets.getAccessor();
+      holder.start = offsetsAccessor.get(index);
+      holder.end = offsetsAccessor.get(index + 1);
+    }
+
+    public void get(int index, ComplexHolder holder) {
+      final FieldReader reader = getReader();
+      reader.setPosition(index);
+      holder.reader = reader;
+    }
+
+    public void get(int index, int arrayIndex, ComplexHolder holder) {
+      final RepeatedMapHolder h = new RepeatedMapHolder();
+      get(index, h);
+      final int offset = h.start + arrayIndex;
+
+      if (offset >= h.end) {
+        holder.reader = NullReader.INSTANCE;
+      } else {
+        reader.setSinglePosition(index, arrayIndex);
+        holder.reader = reader;
+      }
+    }
+  }
+
+  public class Mutator implements RepeatedMutator {
+    @Override
+    public void startNewValue(int index) {
+      emptyPopulator.populate(index + 1);
+      offsets.getMutator().setSafe(index + 1, offsets.getAccessor().get(index));
+    }
+
+    @Override
+    public void setValueCount(int topLevelValueCount) {
+      emptyPopulator.populate(topLevelValueCount);
+      offsets.getMutator().setValueCount(topLevelValueCount == 0 ? 0 : topLevelValueCount + 1);
+      int childValueCount = offsets.getAccessor().get(topLevelValueCount);
+      for (final ValueVector v : getChildren()) {
+        v.getMutator().setValueCount(childValueCount);
+      }
+    }
+
+    @Override
+    public void reset() {}
+
+    @Override
+    public void generateTestData(int values) {}
+
+    public int add(int index) {
+      final int prevEnd = offsets.getAccessor().get(index + 1);
+      offsets.getMutator().setSafe(index + 1, prevEnd + 1);
+      return prevEnd;
+    }
+  }
+
+  @Override
+  public void clear() {
+    getMutator().reset();
+
+    offsets.clear();
+    for(final ValueVector vector : getChildren()) {
+      vector.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedValueVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedValueVector.java
new file mode 100644
index 0000000..99c0a0a
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedValueVector.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.complex;
+
+import org.apache.arrow.vector.UInt4Vector;
+import org.apache.arrow.vector.ValueVector;
+
+/**
+ * An abstraction representing repeated value vectors.
+ *
+ * A repeated vector contains values that may either be flat or nested. A value consists of zero or more cells(inner values).
+ * Current design maintains data and offsets vectors. Each cell is stored in the data vector. Repeated vector
+ * uses the offset vector to determine the sequence of cells pertaining to an individual value.
+ *
+ */
+public interface RepeatedValueVector extends ValueVector, ContainerVectorLike {
+
+  final static int DEFAULT_REPEAT_PER_RECORD = 5;
+
+  /**
+   * Returns the underlying offset vector or null if none exists.
+   *
+   * TODO(DRILL-2995): eliminate exposing low-level interfaces.
+   */
+  UInt4Vector getOffsetVector();
+
+  /**
+   * Returns the underlying data vector or null if none exists.
+   */
+  ValueVector getDataVector();
+
+  @Override
+  RepeatedAccessor getAccessor();
+
+  @Override
+  RepeatedMutator getMutator();
+
+  interface RepeatedAccessor extends ValueVector.Accessor {
+    /**
+     * Returns total number of cells that vector contains.
+     *
+     * The result includes empty, null valued cells.
+     */
+    int getInnerValueCount();
+
+
+    /**
+     * Returns number of cells that the value at the given index contains.
+     */
+    int getInnerValueCountAt(int index);
+
+    /**
+     * Returns true if the value at the given index is empty, false otherwise.
+     *
+     * @param index  value index
+     */
+    boolean isEmpty(int index);
+  }
+
+  interface RepeatedMutator extends ValueVector.Mutator {
+    /**
+     * Starts a new value that is a container of cells.
+     *
+     * @param index  index of new value to start
+     */
+    void startNewValue(int index);
+
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedVariableWidthVectorLike.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedVariableWidthVectorLike.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedVariableWidthVectorLike.java
new file mode 100644
index 0000000..93b744e
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/RepeatedVariableWidthVectorLike.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.complex;
+
+public interface RepeatedVariableWidthVectorLike {
+  /**
+   * Allocate a new memory space for this vector.  Must be called prior to using the ValueVector.
+   *
+   * @param totalBytes   Desired size of the underlying data buffer.
+   * @param parentValueCount   Number of separate repeating groupings.
+   * @param childValueCount   Number of supported values in the vector.
+   */
+  void allocateNew(int totalBytes, int parentValueCount, int childValueCount);
+
+  /**
+   * Provide the maximum amount of variable width bytes that can be stored int his vector.
+   * @return
+   */
+  int getByteCapacity();
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/StateTool.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/StateTool.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/StateTool.java
new file mode 100644
index 0000000..852c72c
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/StateTool.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.complex;
+
+import java.util.Arrays;
+
+public class StateTool {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StateTool.class);
+
+  public static <T extends Enum<?>> void check(T currentState, T... expectedStates) {
+    for (T s : expectedStates) {
+      if (s == currentState) {
+        return;
+      }
+    }
+    throw new IllegalArgumentException(String.format("Expected to be in one of these states %s but was actuall in state %s", Arrays.toString(expectedStates), currentState));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/VectorWithOrdinal.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/VectorWithOrdinal.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/VectorWithOrdinal.java
new file mode 100644
index 0000000..d04fc1c
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/VectorWithOrdinal.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.complex;
+
+import org.apache.arrow.vector.ValueVector;
+
+public class VectorWithOrdinal {
+  public final ValueVector vector;
+  public final int ordinal;
+
+  public VectorWithOrdinal(ValueVector v, int ordinal) {
+    this.vector = v;
+    this.ordinal = ordinal;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java
new file mode 100644
index 0000000..264e241
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.complex.impl;
+
+import java.util.Iterator;
+
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
+import org.apache.arrow.vector.complex.writer.FieldWriter;
+import org.apache.arrow.vector.holders.UnionHolder;
+import org.apache.arrow.vector.types.MaterializedField;
+import org.apache.arrow.vector.types.Types.DataMode;
+import org.apache.arrow.vector.types.Types.MajorType;
+import org.apache.arrow.vector.types.Types.MinorType;
+
+
+abstract class AbstractBaseReader implements FieldReader{
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBaseReader.class);
+  private static final MajorType LATE_BIND_TYPE = new MajorType(MinorType.LATE, DataMode.OPTIONAL);
+
+  private int index;
+
+  public AbstractBaseReader() {
+    super();
+  }
+
+  public void setPosition(int index){
+    this.index = index;
+  }
+
+  int idx(){
+    return index;
+  }
+
+  @Override
+  public void reset() {
+    index = 0;
+  }
+
+  @Override
+  public Iterator<String> iterator() {
+    throw new IllegalStateException("The current reader doesn't support reading as a map.");
+  }
+
+  public MajorType getType(){
+    throw new IllegalStateException("The current reader doesn't support getting type information.");
+  }
+
+  @Override
+  public MaterializedField getField() {
+    return MaterializedField.create("unknown", LATE_BIND_TYPE);
+  }
+
+  @Override
+  public boolean next() {
+    throw new IllegalStateException("The current reader doesn't support getting next information.");
+  }
+
+  @Override
+  public int size() {
+    throw new IllegalStateException("The current reader doesn't support getting size information.");
+  }
+
+  @Override
+  public void read(UnionHolder holder) {
+    holder.reader = this;
+    holder.isSet = this.isSet() ? 1 : 0;
+  }
+
+  @Override
+  public void read(int index, UnionHolder holder) {
+    throw new IllegalStateException("The current reader doesn't support reading union type");
+  }
+
+  @Override
+  public void copyAsValue(UnionWriter writer) {
+    throw new IllegalStateException("The current reader doesn't support reading union type");
+  }
+
+  @Override
+  public void copyAsValue(ListWriter writer) {
+    ComplexCopier.copy(this, (FieldWriter)writer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseWriter.java
new file mode 100644
index 0000000..4e1e103
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseWriter.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.complex.impl;
+
+import org.apache.arrow.vector.complex.writer.FieldWriter;
+
+
+abstract class AbstractBaseWriter implements FieldWriter {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBaseWriter.class);
+
+  final FieldWriter parent;
+  private int index;
+
+  public AbstractBaseWriter(FieldWriter parent) {
+    this.parent = parent;
+  }
+
+  @Override
+  public String toString() {
+    return super.toString() + "[index = " + index + ", parent = " + parent + "]";
+  }
+
+  @Override
+  public FieldWriter getParent() {
+    return parent;
+  }
+
+  public boolean isRoot() {
+    return parent == null;
+  }
+
+  int idx() {
+    return index;
+  }
+
+  @Override
+  public void setPosition(int index) {
+    this.index = index;
+  }
+
+  @Override
+  public void end() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
new file mode 100644
index 0000000..4e2051f
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.complex.impl;
+
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StateTool;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
+import org.apache.arrow.vector.types.MaterializedField;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.Types.MinorType;
+
+import com.google.common.base.Preconditions;
+
+public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWriter {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexWriterImpl.class);
+
+  private SingleMapWriter mapRoot;
+  private SingleListWriter listRoot;
+  private final MapVector container;
+
+  Mode mode = Mode.INIT;
+  private final String name;
+  private final boolean unionEnabled;
+
+  private enum Mode { INIT, MAP, LIST };
+
+  public ComplexWriterImpl(String name, MapVector container, boolean unionEnabled){
+    super(null);
+    this.name = name;
+    this.container = container;
+    this.unionEnabled = unionEnabled;
+  }
+
+  public ComplexWriterImpl(String name, MapVector container){
+    this(name, container, false);
+  }
+
+  @Override
+  public MaterializedField getField() {
+    return container.getField();
+  }
+
+  @Override
+  public int getValueCapacity() {
+    return container.getValueCapacity();
+  }
+
+  private void check(Mode... modes){
+    StateTool.check(mode, modes);
+  }
+
+  @Override
+  public void reset(){
+    setPosition(0);
+  }
+
+  @Override
+  public void close() throws Exception {
+    clear();
+    mapRoot.close();
+    if (listRoot != null) {
+      listRoot.close();
+    }
+  }
+
+  @Override
+  public void clear(){
+    switch(mode){
+    case MAP:
+      mapRoot.clear();
+      break;
+    case LIST:
+      listRoot.clear();
+      break;
+    }
+  }
+
+  @Override
+  public void setValueCount(int count){
+    switch(mode){
+    case MAP:
+      mapRoot.setValueCount(count);
+      break;
+    case LIST:
+      listRoot.setValueCount(count);
+      break;
+    }
+  }
+
+  @Override
+  public void setPosition(int index){
+    super.setPosition(index);
+    switch(mode){
+    case MAP:
+      mapRoot.setPosition(index);
+      break;
+    case LIST:
+      listRoot.setPosition(index);
+      break;
+    }
+  }
+
+
+  public MapWriter directMap(){
+    Preconditions.checkArgument(name == null);
+
+    switch(mode){
+
+    case INIT:
+      MapVector map = (MapVector) container;
+      mapRoot = new SingleMapWriter(map, this, unionEnabled);
+      mapRoot.setPosition(idx());
+      mode = Mode.MAP;
+      break;
+
+    case MAP:
+      break;
+
+    default:
+        check(Mode.INIT, Mode.MAP);
+    }
+
+    return mapRoot;
+  }
+
+  @Override
+  public MapWriter rootAsMap() {
+    switch(mode){
+
+    case INIT:
+      MapVector map = container.addOrGet(name, Types.required(MinorType.MAP), MapVector.class);
+      mapRoot = new SingleMapWriter(map, this, unionEnabled);
+      mapRoot.setPosition(idx());
+      mode = Mode.MAP;
+      break;
+
+    case MAP:
+      break;
+
+    default:
+        check(Mode.INIT, Mode.MAP);
+    }
+
+    return mapRoot;
+  }
+
+
+  @Override
+  public void allocate() {
+    if(mapRoot != null) {
+      mapRoot.allocate();
+    } else if(listRoot != null) {
+      listRoot.allocate();
+    }
+  }
+
+  @Override
+  public ListWriter rootAsList() {
+    switch(mode){
+
+    case INIT:
+      listRoot = new SingleListWriter(name, container, this);
+      listRoot.setPosition(idx());
+      mode = Mode.LIST;
+      break;
+
+    case LIST:
+      break;
+
+    default:
+        check(Mode.INIT, Mode.MAP);
+    }
+
+    return listRoot;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/MapOrListWriterImpl.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/MapOrListWriterImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/MapOrListWriterImpl.java
new file mode 100644
index 0000000..f8a9d42
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/MapOrListWriterImpl.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.complex.impl;
+
+import org.apache.arrow.vector.complex.writer.BaseWriter;
+import org.apache.arrow.vector.complex.writer.BaseWriter.MapOrListWriter;
+import org.apache.arrow.vector.complex.writer.BigIntWriter;
+import org.apache.arrow.vector.complex.writer.BitWriter;
+import org.apache.arrow.vector.complex.writer.Float4Writer;
+import org.apache.arrow.vector.complex.writer.Float8Writer;
+import org.apache.arrow.vector.complex.writer.IntWriter;
+import org.apache.arrow.vector.complex.writer.VarBinaryWriter;
+import org.apache.arrow.vector.complex.writer.VarCharWriter;
+
+public class MapOrListWriterImpl implements MapOrListWriter {
+
+  public final BaseWriter.MapWriter map;
+  public final BaseWriter.ListWriter list;
+
+  public MapOrListWriterImpl(final BaseWriter.MapWriter writer) {
+    this.map = writer;
+    this.list = null;
+  }
+
+  public MapOrListWriterImpl(final BaseWriter.ListWriter writer) {
+    this.map = null;
+    this.list = writer;
+  }
+
+  public void start() {
+    if (map != null) {
+      map.start();
+    } else {
+      list.startList();
+    }
+  }
+
+  public void end() {
+    if (map != null) {
+      map.end();
+    } else {
+      list.endList();
+    }
+  }
+
+  public MapOrListWriter map(final String name) {
+    assert map != null;
+    return new MapOrListWriterImpl(map.map(name));
+  }
+
+  public MapOrListWriter listoftmap(final String name) {
+    assert list != null;
+    return new MapOrListWriterImpl(list.map());
+  }
+
+  public MapOrListWriter list(final String name) {
+    assert map != null;
+    return new MapOrListWriterImpl(map.list(name));
+  }
+
+  public boolean isMapWriter() {
+    return map != null;
+  }
+
+  public boolean isListWriter() {
+    return list != null;
+  }
+
+  public VarCharWriter varChar(final String name) {
+    return (map != null) ? map.varChar(name) : list.varChar();
+  }
+
+  public IntWriter integer(final String name) {
+    return (map != null) ? map.integer(name) : list.integer();
+  }
+
+  public BigIntWriter bigInt(final String name) {
+    return (map != null) ? map.bigInt(name) : list.bigInt();
+  }
+
+  public Float4Writer float4(final String name) {
+    return (map != null) ? map.float4(name) : list.float4();
+  }
+
+  public Float8Writer float8(final String name) {
+    return (map != null) ? map.float8(name) : list.float8();
+  }
+
+  public BitWriter bit(final String name) {
+    return (map != null) ? map.bit(name) : list.bit();
+  }
+
+  public VarBinaryWriter binary(final String name) {
+    return (map != null) ? map.varBinary(name) : list.varBinary();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
new file mode 100644
index 0000000..ea62e02
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.complex.impl;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VectorDescriptor;
+import org.apache.arrow.vector.ZeroVector;
+import org.apache.arrow.vector.complex.AbstractMapVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.UnionVector;
+import org.apache.arrow.vector.complex.writer.FieldWriter;
+import org.apache.arrow.vector.types.MaterializedField;
+import org.apache.arrow.vector.types.Types.DataMode;
+import org.apache.arrow.vector.types.Types.MajorType;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.util.BasicTypeHelper;
+import org.apache.arrow.vector.util.TransferPair;
+
+/**
+ * This FieldWriter implementation delegates all FieldWriter API calls to an inner FieldWriter. This inner field writer
+ * can start as a specific type, and this class will promote the writer to a UnionWriter if a call is made that the specifically
+ * typed writer cannot handle. A new UnionVector is created, wrapping the original vector, and replaces the original vector
+ * in the parent vector, which can be either an AbstractMapVector or a ListVector.
+ */
+public class PromotableWriter extends AbstractPromotableFieldWriter {
+
+  private final AbstractMapVector parentContainer;
+  private final ListVector listVector;
+  private int position;
+
+  private enum State {
+    UNTYPED, SINGLE, UNION
+  }
+
+  private MinorType type;
+  private ValueVector vector;
+  private UnionVector unionVector;
+  private State state;
+  private FieldWriter writer;
+
+  public PromotableWriter(ValueVector v, AbstractMapVector parentContainer) {
+    super(null);
+    this.parentContainer = parentContainer;
+    this.listVector = null;
+    init(v);
+  }
+
+  public PromotableWriter(ValueVector v, ListVector listVector) {
+    super(null);
+    this.listVector = listVector;
+    this.parentContainer = null;
+    init(v);
+  }
+
+  private void init(ValueVector v) {
+    if (v instanceof UnionVector) {
+      state = State.UNION;
+      unionVector = (UnionVector) v;
+      writer = new UnionWriter(unionVector);
+    } else if (v instanceof ZeroVector) {
+      state = State.UNTYPED;
+    } else {
+      setWriter(v);
+    }
+  }
+
+  private void setWriter(ValueVector v) {
+    state = State.SINGLE;
+    vector = v;
+    type = v.getField().getType().getMinorType();
+    Class writerClass = BasicTypeHelper
+        .getWriterImpl(v.getField().getType().getMinorType(), v.getField().getDataMode());
+    if (writerClass.equals(SingleListWriter.class)) {
+      writerClass = UnionListWriter.class;
+    }
+    Class vectorClass = BasicTypeHelper.getValueVectorClass(v.getField().getType().getMinorType(), v.getField()
+        .getDataMode());
+    try {
+      Constructor constructor = null;
+      for (Constructor c : writerClass.getConstructors()) {
+        if (c.getParameterTypes().length == 3) {
+          constructor = c;
+        }
+      }
+      if (constructor == null) {
+        constructor = writerClass.getConstructor(vectorClass, AbstractFieldWriter.class);
+        writer = (FieldWriter) constructor.newInstance(vector, null);
+      } else {
+        writer = (FieldWriter) constructor.newInstance(vector, null, true);
+      }
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void setPosition(int index) {
+    super.setPosition(index);
+    FieldWriter w = getWriter();
+    if (w == null) {
+      position = index;
+    } else {
+      w.setPosition(index);
+    }
+  }
+
+  protected FieldWriter getWriter(MinorType type) {
+    if (state == State.UNION) {
+      return writer;
+    }
+    if (state == State.UNTYPED) {
+      if (type == null) {
+        return null;
+      }
+      ValueVector v = listVector.addOrGetVector(new VectorDescriptor(new MajorType(type, DataMode.OPTIONAL))).getVector();
+      v.allocateNew();
+      setWriter(v);
+      writer.setPosition(position);
+    }
+    if (type != this.type) {
+      return promoteToUnion();
+    }
+    return writer;
+  }
+
+  @Override
+  public boolean isEmptyMap() {
+    return writer.isEmptyMap();
+  }
+
+  protected FieldWriter getWriter() {
+    return getWriter(type);
+  }
+
+  private FieldWriter promoteToUnion() {
+    String name = vector.getField().getLastName();
+    TransferPair tp = vector.getTransferPair(vector.getField().getType().getMinorType().name().toLowerCase(), vector.getAllocator());
+    tp.transfer();
+    if (parentContainer != null) {
+      unionVector = parentContainer.addOrGet(name, new MajorType(MinorType.UNION, DataMode.OPTIONAL), UnionVector.class);
+    } else if (listVector != null) {
+      unionVector = listVector.promoteToUnion();
+    }
+    unionVector.addVector(tp.getTo());
+    writer = new UnionWriter(unionVector);
+    writer.setPosition(idx());
+    for (int i = 0; i < idx(); i++) {
+      unionVector.getMutator().setType(i, vector.getField().getType().getMinorType());
+    }
+    vector = null;
+    state = State.UNION;
+    return writer;
+  }
+
+  @Override
+  public void allocate() {
+    getWriter().allocate();
+  }
+
+  @Override
+  public void clear() {
+    getWriter().clear();
+  }
+
+  @Override
+  public MaterializedField getField() {
+    return getWriter().getField();
+  }
+
+  @Override
+  public int getValueCapacity() {
+    return getWriter().getValueCapacity();
+  }
+
+  @Override
+  public void close() throws Exception {
+    getWriter().close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/RepeatedListReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/RepeatedListReaderImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/RepeatedListReaderImpl.java
new file mode 100644
index 0000000..dd1a152
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/RepeatedListReaderImpl.java
@@ -0,0 +1,145 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.arrow.vector.complex.impl;
+
+
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.complex.RepeatedListVector;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
+import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.arrow.vector.holders.RepeatedListHolder;
+import org.apache.arrow.vector.types.Types.DataMode;
+import org.apache.arrow.vector.types.Types.MajorType;
+import org.apache.arrow.vector.types.Types.MinorType;
+
+public class RepeatedListReaderImpl extends AbstractFieldReader{
+  private static final int NO_VALUES = Integer.MAX_VALUE - 1;
+  private static final MajorType TYPE = new MajorType(MinorType.LIST, DataMode.REPEATED);
+  private final String name;
+  private final RepeatedListVector container;
+  private FieldReader reader;
+
+  public RepeatedListReaderImpl(String name, RepeatedListVector container) {
+    super();
+    this.name = name;
+    this.container = container;
+  }
+
+  @Override
+  public MajorType getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void copyAsValue(ListWriter writer) {
+    if (currentOffset == NO_VALUES) {
+      return;
+    }
+    RepeatedListWriter impl = (RepeatedListWriter) writer;
+    impl.container.copyFromSafe(idx(), impl.idx(), container);
+  }
+
+  @Override
+  public void copyAsField(String name, MapWriter writer) {
+    if (currentOffset == NO_VALUES) {
+      return;
+    }
+    RepeatedListWriter impl = (RepeatedListWriter) writer.list(name);
+    impl.container.copyFromSafe(idx(), impl.idx(), container);
+  }
+
+  private int currentOffset;
+  private int maxOffset;
+
+  @Override
+  public void reset() {
+    super.reset();
+    currentOffset = 0;
+    maxOffset = 0;
+    if (reader != null) {
+      reader.reset();
+    }
+    reader = null;
+  }
+
+  @Override
+  public int size() {
+    return maxOffset - currentOffset;
+  }
+
+  @Override
+  public void setPosition(int index) {
+    if (index < 0 || index == NO_VALUES) {
+      currentOffset = NO_VALUES;
+      return;
+    }
+
+    super.setPosition(index);
+    RepeatedListHolder h = new RepeatedListHolder();
+    container.getAccessor().get(index, h);
+    if (h.start == h.end) {
+      currentOffset = NO_VALUES;
+    } else {
+      currentOffset = h.start-1;
+      maxOffset = h.end;
+      if(reader != null) {
+        reader.setPosition(currentOffset);
+      }
+    }
+  }
+
+  @Override
+  public boolean next() {
+    if (currentOffset +1 < maxOffset) {
+      currentOffset++;
+      if (reader != null) {
+        reader.setPosition(currentOffset);
+      }
+      return true;
+    } else {
+      currentOffset = NO_VALUES;
+      return false;
+    }
+  }
+
+  @Override
+  public Object readObject() {
+    return container.getAccessor().getObject(idx());
+  }
+
+  @Override
+  public FieldReader reader() {
+    if (reader == null) {
+      ValueVector child = container.getChild(name);
+      if (child == null) {
+        reader = NullReader.INSTANCE;
+      } else {
+        reader = child.getReader();
+      }
+      reader.setPosition(currentOffset);
+    }
+    return reader;
+  }
+
+  public boolean isSet() {
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/RepeatedMapReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/RepeatedMapReaderImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/RepeatedMapReaderImpl.java
new file mode 100644
index 0000000..09a831d
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/RepeatedMapReaderImpl.java
@@ -0,0 +1,192 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.arrow.vector.complex.impl;
+
+import java.util.Map;
+
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.complex.RepeatedMapVector;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.arrow.vector.holders.RepeatedMapHolder;
+import org.apache.arrow.vector.types.Types.MajorType;
+
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("unused")
+public class RepeatedMapReaderImpl extends AbstractFieldReader{
+  private static final int NO_VALUES = Integer.MAX_VALUE - 1;
+
+  private final RepeatedMapVector vector;
+  private final Map<String, FieldReader> fields = Maps.newHashMap();
+
+  public RepeatedMapReaderImpl(RepeatedMapVector vector) {
+    this.vector = vector;
+  }
+
+  private void setChildrenPosition(int index) {
+    for (FieldReader r : fields.values()) {
+      r.setPosition(index);
+    }
+  }
+
+  @Override
+  public FieldReader reader(String name) {
+    FieldReader reader = fields.get(name);
+    if (reader == null) {
+      ValueVector child = vector.getChild(name);
+      if (child == null) {
+        reader = NullReader.INSTANCE;
+      } else {
+        reader = child.getReader();
+      }
+      fields.put(name, reader);
+      reader.setPosition(currentOffset);
+    }
+    return reader;
+  }
+
+  @Override
+  public FieldReader reader() {
+    if (currentOffset == NO_VALUES) {
+      return NullReader.INSTANCE;
+    }
+
+    setChildrenPosition(currentOffset);
+    return new SingleLikeRepeatedMapReaderImpl(vector, this);
+  }
+
+  private int currentOffset;
+  private int maxOffset;
+
+  @Override
+  public void reset() {
+    super.reset();
+    currentOffset = 0;
+    maxOffset = 0;
+    for (FieldReader reader:fields.values()) {
+      reader.reset();
+    }
+    fields.clear();
+  }
+
+  @Override
+  public int size() {
+    if (isNull()) {
+      return 0;
+    }
+    return maxOffset - (currentOffset < 0 ? 0 : currentOffset);
+  }
+
+  @Override
+  public void setPosition(int index) {
+    if (index < 0 || index == NO_VALUES) {
+      currentOffset = NO_VALUES;
+      return;
+    }
+
+    super.setPosition(index);
+    RepeatedMapHolder h = new RepeatedMapHolder();
+    vector.getAccessor().get(index, h);
+    if (h.start == h.end) {
+      currentOffset = NO_VALUES;
+    } else {
+      currentOffset = h.start-1;
+      maxOffset = h.end;
+      setChildrenPosition(currentOffset);
+    }
+  }
+
+  public void setSinglePosition(int index, int childIndex) {
+    super.setPosition(index);
+    RepeatedMapHolder h = new RepeatedMapHolder();
+    vector.getAccessor().get(index, h);
+    if (h.start == h.end) {
+      currentOffset = NO_VALUES;
+    } else {
+      int singleOffset = h.start + childIndex;
+      assert singleOffset < h.end;
+      currentOffset = singleOffset;
+      maxOffset = singleOffset + 1;
+      setChildrenPosition(singleOffset);
+    }
+  }
+
+  @Override
+  public boolean next() {
+    if (currentOffset +1 < maxOffset) {
+      setChildrenPosition(++currentOffset);
+      return true;
+    } else {
+      currentOffset = NO_VALUES;
+      return false;
+    }
+  }
+
+  public boolean isNull() {
+    return currentOffset == NO_VALUES;
+  }
+
+  @Override
+  public Object readObject() {
+    return vector.getAccessor().getObject(idx());
+  }
+
+  @Override
+  public MajorType getType() {
+    return vector.getField().getType();
+  }
+
+  @Override
+  public java.util.Iterator<String> iterator() {
+    return vector.fieldNameIterator();
+  }
+
+  @Override
+  public boolean isSet() {
+    return true;
+  }
+
+  @Override
+  public void copyAsValue(MapWriter writer) {
+    if (currentOffset == NO_VALUES) {
+      return;
+    }
+    RepeatedMapWriter impl = (RepeatedMapWriter) writer;
+    impl.container.copyFromSafe(idx(), impl.idx(), vector);
+  }
+
+  public void copyAsValueSingle(MapWriter writer) {
+    if (currentOffset == NO_VALUES) {
+      return;
+    }
+    SingleMapWriter impl = (SingleMapWriter) writer;
+    impl.container.copyFromSafe(currentOffset, impl.idx(), vector);
+  }
+
+  @Override
+  public void copyAsField(String name, MapWriter writer) {
+    if (currentOffset == NO_VALUES) {
+      return;
+    }
+    RepeatedMapWriter impl = (RepeatedMapWriter) writer.map(name);
+    impl.container.copyFromSafe(idx(), impl.idx(), vector);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleLikeRepeatedMapReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleLikeRepeatedMapReaderImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleLikeRepeatedMapReaderImpl.java
new file mode 100644
index 0000000..086d26e
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleLikeRepeatedMapReaderImpl.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.complex.impl;
+
+import java.util.Iterator;
+
+import org.apache.arrow.vector.complex.RepeatedMapVector;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.Types.MajorType;
+import org.apache.arrow.vector.types.Types.MinorType;
+
+public class SingleLikeRepeatedMapReaderImpl extends AbstractFieldReader{
+
+  private RepeatedMapReaderImpl delegate;
+
+  public SingleLikeRepeatedMapReaderImpl(RepeatedMapVector vector, FieldReader delegate) {
+    this.delegate = (RepeatedMapReaderImpl) delegate;
+  }
+
+  @Override
+  public int size() {
+    throw new UnsupportedOperationException("You can't call size on a single map reader.");
+  }
+
+  @Override
+  public boolean next() {
+    throw new UnsupportedOperationException("You can't call next on a single map reader.");
+  }
+
+  @Override
+  public MajorType getType() {
+    return Types.required(MinorType.MAP);
+  }
+
+
+  @Override
+  public void copyAsValue(MapWriter writer) {
+    delegate.copyAsValueSingle(writer);
+  }
+
+  public void copyAsValueSingle(MapWriter writer){
+    delegate.copyAsValueSingle(writer);
+  }
+
+  @Override
+  public FieldReader reader(String name) {
+    return delegate.reader(name);
+  }
+
+  @Override
+  public void setPosition(int index) {
+    delegate.setPosition(index);
+  }
+
+  @Override
+  public Object readObject() {
+    return delegate.readObject();
+  }
+
+  @Override
+  public Iterator<String> iterator() {
+    return delegate.iterator();
+  }
+
+  @Override
+  public boolean isSet() {
+    return ! delegate.isNull();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleListReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleListReaderImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleListReaderImpl.java
new file mode 100644
index 0000000..f16f628
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleListReaderImpl.java
@@ -0,0 +1,88 @@
+
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.arrow.vector.complex.impl;
+
+
+import org.apache.arrow.vector.complex.AbstractContainerVector;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
+import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.Types.MajorType;
+import org.apache.arrow.vector.types.Types.MinorType;
+
+@SuppressWarnings("unused")
+public class SingleListReaderImpl extends AbstractFieldReader{
+
+  private static final MajorType TYPE = Types.optional(MinorType.LIST);
+  private final String name;
+  private final AbstractContainerVector container;
+  private FieldReader reader;
+
+  public SingleListReaderImpl(String name, AbstractContainerVector container) {
+    super();
+    this.name = name;
+    this.container = container;
+  }
+
+  @Override
+  public MajorType getType() {
+    return TYPE;
+  }
+
+
+  @Override
+  public void setPosition(int index) {
+    super.setPosition(index);
+    if (reader != null) {
+      reader.setPosition(index);
+    }
+  }
+
+  @Override
+  public Object readObject() {
+    return reader.readObject();
+  }
+
+  @Override
+  public FieldReader reader() {
+    if (reader == null) {
+      reader = container.getChild(name).getReader();
+      setPosition(idx());
+    }
+    return reader;
+  }
+
+  @Override
+  public boolean isSet() {
+    return false;
+  }
+
+  @Override
+  public void copyAsValue(ListWriter writer) {
+    throw new UnsupportedOperationException("Generic list copying not yet supported.  Please resolve to typed list.");
+  }
+
+  @Override
+  public void copyAsField(String name, MapWriter writer) {
+    throw new UnsupportedOperationException("Generic list copying not yet supported.  Please resolve to typed list.");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleMapReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleMapReaderImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleMapReaderImpl.java
new file mode 100644
index 0000000..84b9980
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleMapReaderImpl.java
@@ -0,0 +1,108 @@
+
+
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.arrow.vector.complex.impl;
+
+
+import java.util.Map;
+
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.arrow.vector.types.Types.MajorType;
+
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("unused")
+public class SingleMapReaderImpl extends AbstractFieldReader{
+
+  private final MapVector vector;
+  private final Map<String, FieldReader> fields = Maps.newHashMap();
+
+  public SingleMapReaderImpl(MapVector vector) {
+    this.vector = vector;
+  }
+
+  private void setChildrenPosition(int index){
+    for(FieldReader r : fields.values()){
+      r.setPosition(index);
+    }
+  }
+
+  @Override
+  public FieldReader reader(String name){
+    FieldReader reader = fields.get(name);
+    if(reader == null){
+      ValueVector child = vector.getChild(name);
+      if(child == null){
+        reader = NullReader.INSTANCE;
+      }else{
+        reader = child.getReader();
+      }
+      fields.put(name, reader);
+      reader.setPosition(idx());
+    }
+    return reader;
+  }
+
+  @Override
+  public void setPosition(int index){
+    super.setPosition(index);
+    for(FieldReader r : fields.values()){
+      r.setPosition(index);
+    }
+  }
+
+  @Override
+  public Object readObject() {
+    return vector.getAccessor().getObject(idx());
+  }
+
+  @Override
+  public boolean isSet() {
+    return true;
+  }
+
+  @Override
+  public MajorType getType(){
+    return vector.getField().getType();
+  }
+
+  @Override
+  public java.util.Iterator<String> iterator(){
+    return vector.fieldNameIterator();
+  }
+
+  @Override
+  public void copyAsValue(MapWriter writer){
+    SingleMapWriter impl = (SingleMapWriter) writer;
+    impl.container.copyFromSafe(idx(), impl.idx(), vector);
+  }
+
+  @Override
+  public void copyAsField(String name, MapWriter writer){
+    SingleMapWriter impl = (SingleMapWriter) writer.map(name);
+    impl.container.copyFromSafe(idx(), impl.idx(), vector);
+  }
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/arrow/blob/fa5f0299/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionListReader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionListReader.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionListReader.java
new file mode 100644
index 0000000..9b54d02
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/UnionListReader.java
@@ -0,0 +1,98 @@
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.arrow.vector.complex.impl;
+
+import org.apache.arrow.vector.UInt4Vector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
+import org.apache.arrow.vector.complex.writer.FieldWriter;
+import org.apache.arrow.vector.holders.UnionHolder;
+import org.apache.arrow.vector.types.Types.DataMode;
+import org.apache.arrow.vector.types.Types.MajorType;
+import org.apache.arrow.vector.types.Types.MinorType;
+
+public class UnionListReader extends AbstractFieldReader {
+
+  private ListVector vector;
+  private ValueVector data;
+  private UInt4Vector offsets;
+
+  public UnionListReader(ListVector vector) {
+    this.vector = vector;
+    this.data = vector.getDataVector();
+    this.offsets = vector.getOffsetVector();
+  }
+
+  @Override
+  public boolean isSet() {
+    return true;
+  }
+
+  MajorType type = new MajorType(MinorType.LIST, DataMode.OPTIONAL);
+
+  public MajorType getType() {
+    return type;
+  }
+
+  private int currentOffset;
+  private int maxOffset;
+
+  @Override
+  public void setPosition(int index) {
+    super.setPosition(index);
+    currentOffset = offsets.getAccessor().get(index) - 1;
+    maxOffset = offsets.getAccessor().get(index + 1);
+  }
+
+  @Override
+  public FieldReader reader() {
+    return data.getReader();
+  }
+
+  @Override
+  public Object readObject() {
+    return vector.getAccessor().getObject(idx());
+  }
+
+  @Override
+  public void read(int index, UnionHolder holder) {
+    setPosition(idx());
+    for (int i = -1; i < index; i++) {
+      next();
+    }
+    holder.reader = data.getReader();
+    holder.isSet = data.getReader().isSet() ? 1 : 0;
+  }
+
+  @Override
+  public boolean next() {
+    if (currentOffset + 1 < maxOffset) {
+      data.getReader().setPosition(++currentOffset);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public void copyAsValue(ListWriter writer) {
+    ComplexCopier.copy(this, (FieldWriter) writer);
+  }
+}