You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/12 04:48:29 UTC

[07/10] Add support for RepeatedMapVector, MapVector and RepeatedListVector.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 396834c..4ff3708 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -18,12 +18,14 @@
 package org.apache.drill.exec.record;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
 
 import java.util.List;
 
-import io.netty.buffer.CompositeByteBuf;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import javax.jdo.metadata.FieldMetadata;
+
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -63,7 +65,7 @@ public class WritableBatch {
     Preconditions.checkState(!cleared,
         "Attempted to reconstruct a container from a WritableBatch after it had been cleared");
     if (buffers.length > 0) { /* If we have ByteBuf's associated with value vectors */
-      
+
       CompositeByteBuf cbb = new CompositeByteBuf(buffers[0].alloc(), true, buffers.length);
 
       /* Copy data from each buffer into the compound buffer */
@@ -71,8 +73,7 @@ public class WritableBatch {
         cbb.addComponent(buf);
       }
 
-
-      List<FieldMetadata> fields = def.getFieldList();
+      List<SerializedField> fields = def.getFieldList();
 
       int bufferOffset = 0;
 
@@ -82,7 +83,7 @@ public class WritableBatch {
       int vectorIndex = 0;
 
       for (VectorWrapper<?> vv : container) {
-        FieldMetadata fmd = fields.get(vectorIndex);
+        SerializedField fmd = fields.get(vectorIndex);
         ValueVector v = vv.getValueVector();
         ByteBuf bb = cbb.slice(bufferOffset, fmd.getBufferLength());
 //        v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
@@ -127,7 +128,7 @@ public class WritableBatch {
 
   public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) {
     List<ByteBuf> buffers = Lists.newArrayList();
-    List<FieldMetadata> metadata = Lists.newArrayList();
+    List<SerializedField> metadata = Lists.newArrayList();
 
     for (ValueVector vv : vectors) {
       metadata.add(vv.getMetadata());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 872052c..04a9768 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -44,15 +44,15 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
   public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) {
     this(name, context, fs, storageConfig, new JSONFormatConfig());
   }
-  
+
   public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, JSONFormatConfig formatPluginConfig) {
     super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("json"), "json");
   }
-  
+
   @Override
-  public RecordReader getRecordReader(FragmentContext context, FileWork fileWork, 
+  public RecordReader getRecordReader(FragmentContext context, FileWork fileWork,
       List<SchemaPath> columns) throws ExecutionSetupException {
-    return new JSONRecordReader(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns);
+    return new JSONRecordReader2(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns);
   }
 
   @Override
@@ -78,6 +78,6 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
         return true;
       return false;
     }
-    
+
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
deleted file mode 100644
index 1c8539c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ /dev/null
@@ -1,532 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.json;
-
-import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
-import static com.fasterxml.jackson.core.JsonToken.END_OBJECT;
-import static com.fasterxml.jackson.core.JsonToken.FIELD_NAME;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.expr.holders.NullableBitHolder;
-import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
-import org.apache.drill.exec.expr.holders.NullableIntHolder;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.schema.DiffSchema;
-import org.apache.drill.exec.schema.Field;
-import org.apache.drill.exec.schema.NamedField;
-import org.apache.drill.exec.schema.ObjectSchema;
-import org.apache.drill.exec.schema.RecordSchema;
-import org.apache.drill.exec.schema.SchemaIdGenerator;
-import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.VectorHolder;
-import org.apache.drill.exec.vector.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class JSONRecordReader implements RecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
-  private static final int DEFAULT_LENGTH = 4000;
-  public static final Charset UTF_8 = Charset.forName("UTF-8");
-
-  private final Map<String, VectorHolder> valueVectorMap;
-  private final FileSystem fileSystem;
-  private final Path hadoopPath;
-
-  private JsonParser parser;
-  private SchemaIdGenerator generator;
-  private DiffSchema diffSchema;
-  private RecordSchema currentSchema;
-  private List<Field> removedFields;
-  private OutputMutator outputMutator;
-  private int batchSize;
-  private final List<SchemaPath> columns;
-
-  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, int batchSize,
-                          List<SchemaPath> columns) throws OutOfMemoryException {
-    this.hadoopPath = new Path(inputPath);
-    this.fileSystem = fileSystem;
-    this.batchSize = batchSize;
-    valueVectorMap = Maps.newHashMap();
-    this.columns = columns;
-  }
-
-  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem,
-                          List<SchemaPath> columns) throws OutOfMemoryException {
-    this(fragmentContext, inputPath, fileSystem, DEFAULT_LENGTH, columns);
-  }
-
-  private JsonParser getParser() {
-    return parser;
-  }
-
-  @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
-    outputMutator = output;
-    output.removeAllFields();
-    currentSchema = new ObjectSchema();
-    diffSchema = new DiffSchema();
-    removedFields = Lists.newArrayList();
-
-    try {
-      JsonFactory factory = new JsonFactory();
-      parser = factory.createJsonParser(fileSystem.open(hadoopPath));
-      parser.nextToken(); // Read to the first START_OBJECT token
-      generator = new SchemaIdGenerator();
-    } catch (IOException e) {
-      throw new ExecutionSetupException(e);
-    }
-  }
-
-  @Override
-  public int next() {
-    if (parser.isClosed() || !parser.hasCurrentToken()) {
-      return 0;
-    }
-
-    resetBatch();
-
-    int nextRowIndex = 0;
-
-    try {
-      while (ReadType.OBJECT.readRecord(this, null, nextRowIndex++, 0)) {
-        parser.nextToken(); // Read to START_OBJECT token
-
-        if (!parser.hasCurrentToken()) {
-          parser.close();
-          break;
-        }
-      }
-
-      parser.nextToken();
-
-      if (!parser.hasCurrentToken()) {
-        parser.close();
-      }
-
-      // Garbage collect fields never referenced in this batch
-      for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
-        diffSchema.addRemovedField(field);
-        outputMutator.removeField(field.getAsMaterializedField());
-      }
-
-      if (diffSchema.isChanged()) {
-        outputMutator.setNewSchema();
-      }
-
-
-    } catch (IOException | SchemaChangeException e) {
-      logger.error("Error reading next in Json reader", e);
-      throw new DrillRuntimeException(e);
-    }
-
-    for (VectorHolder holder : valueVectorMap.values()) {
-      if (holder.isRepeated()) {
-        holder.setGroupCount(nextRowIndex);
-      }
-      holder.getValueVector().getMutator().setValueCount(nextRowIndex);
-    }
-
-    return nextRowIndex;
-  }
-
-  private void resetBatch() {
-    for (VectorHolder value : valueVectorMap.values()) {
-      value.reset();
-    }
-
-    currentSchema.resetMarkedFields();
-    diffSchema.reset();
-    removedFields.clear();
-  }
-
-  @Override
-  public void cleanup() {
-    try {
-      parser.close();
-    } catch (IOException e) {
-      logger.warn("Error closing Json parser", e);
-    }
-  }
-
-
-  private RecordSchema getCurrentSchema() {
-    return currentSchema;
-  }
-
-  private void setCurrentSchema(RecordSchema schema) {
-    currentSchema = schema;
-  }
-
-  private List<Field> getRemovedFields() {
-    return removedFields;
-  }
-
-  private boolean fieldSelected(String field){
-
-    SchemaPath sp = SchemaPath.getCompoundPath(field.split("\\."));
-    if (this.columns != null && this.columns.size() > 0){
-      for (SchemaPath expr : this.columns){
-        if ( sp.equals(expr)){
-          return true;
-        }
-      }
-      return false;
-    }
-    return true;
-  }
-
-  public static enum ReadType {
-    ARRAY(END_ARRAY) {
-      @Override
-      public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
-        return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
-      }
-
-      @Override
-      public RecordSchema createSchema() throws IOException {
-        return new ObjectSchema();
-      }
-    },
-    OBJECT(END_OBJECT) {
-      @Override
-      public Field createField(RecordSchema parentSchema,
-                               String prefixFieldName,
-                               String fieldName,
-                               MajorType fieldType,
-                               int index) {
-        return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
-      }
-
-      @Override
-      public RecordSchema createSchema() throws IOException {
-        return new ObjectSchema();
-      }
-    };
-
-    private final JsonToken endObject;
-
-    ReadType(JsonToken endObject) {
-      this.endObject = endObject;
-    }
-
-    public JsonToken getEndObject() {
-      return endObject;
-    }
-
-    @SuppressWarnings("ConstantConditions")
-    public boolean readRecord(JSONRecordReader reader,
-                              String prefixFieldName,
-                              int rowIndex,
-                              int groupCount) throws IOException, SchemaChangeException {
-      JsonParser parser = reader.getParser();
-      JsonToken token = parser.nextToken();
-      JsonToken endObject = getEndObject();
-      int colIndex = 0;
-      boolean isFull = false;
-      while (token != endObject) {
-        if (token == FIELD_NAME) {
-          token = parser.nextToken();
-          continue;
-        }
-
-        String fieldName = parser.getCurrentName();
-        if ( fieldName != null && ! reader.fieldSelected(fieldName)){
-          // this field was not requested in the query
-          token = parser.nextToken();
-          colIndex += 1;
-          continue;
-        }
-        MajorType fieldType = JacksonHelper.getFieldType(token, this == ReadType.ARRAY);
-        ReadType readType = null;
-        switch (token) {
-          case START_ARRAY:
-            readType = ReadType.ARRAY;
-            groupCount++;
-            break;
-          case START_OBJECT:
-            readType = ReadType.OBJECT;
-            groupCount = 0;
-            break;
-        }
-
-        if (fieldType != null) { // Including nulls
-          boolean currentFieldFull = !recordData(
-              readType,
-              reader,
-              fieldType,
-              prefixFieldName,
-              fieldName,
-              rowIndex,
-              colIndex,
-              groupCount);
-          if(readType == ReadType.ARRAY) {
-            groupCount--;
-          }
-          isFull = isFull || currentFieldFull;
-        }
-        token = parser.nextToken();
-        colIndex += 1;
-      }
-      return !isFull;
-    }
-
-    private void removeChildFields(List<Field> removedFields, Field field) {
-      RecordSchema schema = field.getAssignedSchema();
-      if (schema == null) {
-        return;
-      }
-      for (Field childField : schema.getFields()) {
-        removedFields.add(childField);
-        if (childField.hasSchema()) {
-          removeChildFields(removedFields, childField);
-        }
-      }
-    }
-
-    private boolean recordData(JSONRecordReader.ReadType readType,
-                               JSONRecordReader reader,
-                               MajorType fieldType,
-                               String prefixFieldName,
-                               String fieldName,
-                               int rowIndex,
-                               int colIndex,
-                               int groupCount) throws IOException, SchemaChangeException {
-      RecordSchema currentSchema = reader.getCurrentSchema();
-      Field field = currentSchema.getField(fieldName == null ? prefixFieldName : fieldName, colIndex);
-      boolean isFieldFound = field != null;
-      List<Field> removedFields = reader.getRemovedFields();
-      boolean newFieldLateBound = fieldType.getMinorType().equals(MinorType.LATE);
-
-      if (isFieldFound && !field.getFieldType().equals(fieldType)) {
-        boolean existingFieldLateBound = field.getFieldType().getMinorType().equals(MinorType.LATE);
-
-        if (newFieldLateBound && !existingFieldLateBound) {
-          fieldType = Types.overrideMinorType(fieldType, field.getFieldType().getMinorType());
-        } else if (!newFieldLateBound && existingFieldLateBound) {
-          field.setFieldType(Types.overrideMinorType(field.getFieldType(), fieldType.getMinorType()));
-        } else if (!newFieldLateBound && !existingFieldLateBound) {
-          if (field.hasSchema()) {
-            removeChildFields(removedFields, field);
-          }
-          removedFields.add(field);
-          currentSchema.removeField(field, colIndex);
-
-          isFieldFound = false;
-        }
-      }
-
-      if (!isFieldFound) {
-        field = createField(
-            currentSchema,
-            prefixFieldName,
-            fieldName,
-            fieldType,
-            colIndex
-        );
-
-        reader.recordNewField(field);
-        currentSchema.addField(field);
-      }
-
-      field.setRead(true);
-
-      VectorHolder holder = getOrCreateVectorHolder(reader, field);
-      if (readType != null) {
-        RecordSchema fieldSchema = field.getAssignedSchema();
-        RecordSchema newSchema = readType.createSchema();
-
-        if (readType != ReadType.ARRAY) {
-          reader.setCurrentSchema(fieldSchema);
-          if (fieldSchema == null) reader.setCurrentSchema(newSchema);
-          readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
-        } else {
-          readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
-        }
-
-        reader.setCurrentSchema(currentSchema);
-
-      } else if (holder != null && !newFieldLateBound && fieldType.getMinorType() != MinorType.LATE) {
-        return addValueToVector(
-            rowIndex,
-            holder,
-            JacksonHelper.getValueFromFieldType(
-                reader.getParser(),
-                fieldType.getMinorType()
-            ),
-            fieldType.getMinorType(),
-            groupCount
-        );
-      }
-
-      return true;
-    }
-
-    private static <T> boolean addValueToVector(int index, VectorHolder holder, T val, MinorType minorType, int groupCount) {
-      switch (minorType) {
-        case BIGINT: {
-          holder.incAndCheckLength(NullableIntHolder.WIDTH * 8 + 1);
-          if (groupCount == 0) {
-            if (val != null) {
-              NullableBigIntVector int4 = (NullableBigIntVector) holder.getValueVector();
-              NullableBigIntVector.Mutator m = int4.getMutator();
-              m.set(index, (Long) val);
-            }
-          } else {
-            if (val == null) {
-              throw new UnsupportedOperationException("Nullable repeated int is not supported.");
-            }
-
-            RepeatedBigIntVector repeatedInt4 = (RepeatedBigIntVector) holder.getValueVector();
-            RepeatedBigIntVector.Mutator m = repeatedInt4.getMutator();
-            holder.setGroupCount(index);
-            m.add(index, (Long) val);
-          }
-
-          return holder.hasEnoughSpace(NullableIntHolder.WIDTH * 8 + 1);
-        }
-        case FLOAT4: {
-          holder.incAndCheckLength(NullableFloat4Holder.WIDTH * 8 + 1);
-          if (groupCount == 0) {
-            if (val != null) {
-              NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
-              NullableFloat4Vector.Mutator m = float4.getMutator();
-              m.set(index, (Float) val);
-            }
-          } else {
-            if (val == null) {
-              throw new UnsupportedOperationException("Nullable repeated float is not supported.");
-            }
-
-            RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) holder.getValueVector();
-            RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator();
-            holder.setGroupCount(index);
-            m.add(index, (Float) val);
-          }
-          return holder.hasEnoughSpace(NullableFloat4Holder.WIDTH * 8 + 1);
-        }
-        case VARCHAR: {
-          if (val == null) {
-            return (index + 1) * 4 <= holder.getLength();
-          } else {
-            byte[] bytes = ((String) val).getBytes(UTF_8);
-            int length = bytes.length;
-            holder.incAndCheckLength(length);
-            if (groupCount == 0) {
-              NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector();
-              NullableVarCharVector.Mutator m = varLen4.getMutator();
-              m.set(index, bytes);
-            } else {
-              RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) holder.getValueVector();
-              RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator();
-              holder.setGroupCount(index);
-              m.add(index, bytes);
-            }
-            return holder.hasEnoughSpace(length + 4 + 1);
-          }
-        }
-        case BIT: {
-          holder.incAndCheckLength(NullableBitHolder.WIDTH + 1);
-          if (groupCount == 0) {
-            if (val != null) {
-              NullableBitVector bit = (NullableBitVector) holder.getValueVector();
-              NullableBitVector.Mutator m = bit.getMutator();
-              m.set(index, (Boolean) val ? 1 : 0);
-            }
-          } else {
-            if (val == null) {
-              throw new UnsupportedOperationException("Nullable repeated boolean is not supported.");
-            }
-
-            RepeatedBitVector repeatedBit = (RepeatedBitVector) holder.getValueVector();
-            RepeatedBitVector.Mutator m = repeatedBit.getMutator();
-            holder.setGroupCount(index);
-            m.add(index, (Boolean) val ? 1 : 0);
-          }
-          return holder.hasEnoughSpace(NullableBitHolder.WIDTH + 1);
-        }
-        default:
-          throw new DrillRuntimeException("Type not supported to add value. Type: " + minorType);
-      }
-    }
-
-    private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException {
-      return reader.getOrCreateVectorHolder(field);
-    }
-
-    public abstract RecordSchema createSchema() throws IOException;
-
-    public abstract Field createField(RecordSchema parentSchema,
-                                      String prefixFieldName,
-                                      String fieldName,
-                                      MajorType fieldType,
-                                      int index);
-  }
-
-  private void recordNewField(Field field) {
-    diffSchema.recordNewField(field);
-  }
-
-  private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
-    String fullFieldName = field.getFullFieldName();
-    VectorHolder holder = valueVectorMap.get(fullFieldName);
-
-    if (holder == null) {
-      MajorType type = field.getFieldType();
-      MinorType minorType = type.getMinorType();
-
-      if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) {
-        return null;
-      }
-
-      MaterializedField f = MaterializedField.create(SchemaPath.getCompoundPath(fullFieldName.split("\\.")), type);
-
-      ValueVector v = outputMutator.addField(f, TypeHelper.getValueVectorClass(minorType, type.getMode()));
-      AllocationHelper.allocate(v, batchSize, 50);
-      holder = new VectorHolder(v);
-      valueVectorMap.put(fullFieldName, holder);
-      return holder;
-    }
-    return holder;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
new file mode 100644
index 0000000..bb52a20
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.fn.JsonRecordSplitter;
+import org.apache.drill.exec.vector.complex.fn.UTF8JsonRecordSplitter;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.hive12.common.collect.Lists;
+
+public class JSONRecordReader2 implements RecordReader{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader2.class);
+
+  private OutputMutator mutator;
+  private VectorContainerWriter writer;
+  private Path hadoopPath;
+  private FileSystem fileSystem;
+  private InputStream stream;
+  private JsonReader jsonReader;
+
+  public JSONRecordReader2(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem,
+                          List<SchemaPath> columns) throws OutOfMemoryException {
+    this.hadoopPath = new Path(inputPath);
+    this.fileSystem = fileSystem;
+  }
+
+  @Override
+  public void setup(OutputMutator output) throws ExecutionSetupException {
+    try{
+      stream = fileSystem.open(hadoopPath);
+      JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(stream);
+      this.writer = new VectorContainerWriter(output);
+      this.mutator = output;
+      jsonReader = new JsonReader(splitter);
+    }catch(IOException e){
+      throw new ExecutionSetupException("Failure reading JSON file.", e);
+    }
+  }
+
+  @Override
+  public int next() {
+    writer.allocate();
+    writer.reset();
+
+    int i =0;
+
+    try{
+      outside: while(true){
+        writer.setPosition(i);
+
+        switch(jsonReader.write(writer)){
+        case WRITE_SUCCEED:
+          i++;
+          break;
+
+        case NO_MORE:
+//          System.out.println("no more records - main loop");
+          break outside;
+
+        case WRITE_FAILED:
+//          System.out.println("==== hit bounds at " + i);
+          break outside;
+        };
+      }
+
+
+      writer.setValueCount(i);
+      mutator.setNewSchema();
+      return i;
+
+    }catch(IOException | SchemaChangeException e){
+      throw new DrillRuntimeException("Failure while reading JSON file.", e);
+    }
+
+  }
+
+  @Override
+  public void cleanup() {
+    try {
+      stream.close();
+    } catch (IOException e) {
+      logger.warn("Failure while closing stream.", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 25931db..9e01268 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -224,7 +224,7 @@ public class HiveRecordReader implements RecordReader {
         PrimitiveCategory pCat = primitiveCategories.get(i);
         MajorType type = getMajorType(pCat);
         MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(columnNames.get(i)), type);
-        ValueVector vv = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+        ValueVector vv = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
         vectors.add(vv);
       }
       for (int i = 0; i < selectedPartitionNames.size(); i++) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index eb9e7a6..5c07dc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -76,7 +76,7 @@ public class MockRecordReader implements RecordReader {
 
       for (int i = 0; i < config.getTypes().length; i++) {
         MajorType type = config.getTypes()[i].getMajorType();
-        valueVectors[i] = output.addField(getVector(config.getTypes()[i].getName(), type, batchRecordCount), TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+        valueVectors[i] = output.addField(getVector(config.getTypes()[i].getName(), type, batchRecordCount), (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
       }
       output.setNewSchema();
     } catch (SchemaChangeException e) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 75cd799..5d28456 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.ExpressionPosition;
@@ -41,10 +42,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.*;
-import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
-import org.apache.drill.exec.vector.VarCharVector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -263,7 +260,7 @@ public class ParquetRecordReader implements RecordReader {
 
         //convertedTypes.put()
         fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
-        v = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+        v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
         if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
           createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v,
             convertedType);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
index 86aec44..f7a74c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
@@ -19,11 +19,11 @@ package org.apache.drill.exec.vector;
 
 public class AllocationHelper {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocationHelper.class);
-  
+
   public static void allocate(ValueVector v, int valueCount, int bytesPerValue){
     allocate(v, valueCount, bytesPerValue, 5);
   }
-  
+
   public static void allocate(ValueVector v, int valueCount, int bytesPerValue, int repeatedPerTop){
     if(v instanceof FixedWidthVector){
       ((FixedWidthVector) v).allocateNew(valueCount);
@@ -34,7 +34,7 @@ public class AllocationHelper {
     }else if(v instanceof RepeatedVariableWidthVector){
       ((RepeatedVariableWidthVector) v).allocateNew(valueCount * bytesPerValue * repeatedPerTop, valueCount, valueCount * repeatedPerTop);
     }else{
-      throw new UnsupportedOperationException();
+      v.allocateNew();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index ddddab1..9641e6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -17,21 +17,25 @@
  */
 package org.apache.drill.exec.vector;
 
+import java.util.Iterator;
+
 import io.netty.buffer.ByteBuf;
 
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.DeadBuf;
 import org.apache.drill.exec.record.MaterializedField;
 
+import com.google.hive12.common.collect.Iterators;
+
 public abstract class BaseDataValueVector extends BaseValueVector{
 
   protected ByteBuf data = DeadBuf.DEAD_BUFFER;
   protected int valueCount;
-  
+
   public BaseDataValueVector(MaterializedField field, BufferAllocator allocator) {
     super(field, allocator);
-    
+
   }
 
   /**
@@ -46,7 +50,7 @@ public abstract class BaseDataValueVector extends BaseValueVector{
     }
   }
 
-  
+
   @Override
   public ByteBuf[] getBuffers(){
     ByteBuf[] out;
@@ -60,18 +64,24 @@ public abstract class BaseDataValueVector extends BaseValueVector{
     clear();
     return out;
   }
-  
+
   public int getBufferSize() {
     if(valueCount == 0) return 0;
     return data.writerIndex();
   }
 
   @Override
-  public abstract FieldMetadata getMetadata();
+  public abstract SerializedField getMetadata();
 
   public ByteBuf getData(){
     return data;
   }
-  
-  
+
+  @Override
+  public Iterator<ValueVector> iterator() {
+    return Iterators.emptyIterator();
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index 7cc1adf..7a61475 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -17,14 +17,21 @@
  */
 package org.apache.drill.exec.vector;
 
+import java.util.Iterator;
+
 import org.apache.drill.common.expression.FieldReference;
+
 import io.netty.buffer.ByteBuf;
+
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 
+import com.google.hive12.common.collect.Iterators;
+
 public abstract class BaseValueVector implements ValueVector{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseValueVector.class);
-  
+
   protected final BufferAllocator allocator;
   protected final MaterializedField field;
 
@@ -32,21 +39,24 @@ public abstract class BaseValueVector implements ValueVector{
     this.allocator = allocator;
     this.field = field;
   }
-  
+
   @Override
   public void close() {
     clear();
   }
-  
+
   @Override
   public MaterializedField getField() {
     return field;
   }
-  
+
   public MaterializedField getField(FieldReference ref){
     return getField().clone(ref);
   }
-  
+
+  protected SerializedField.Builder getMetadataBuilder(){
+    return getField().getAsBuilder();
+  }
 
   abstract public ByteBuf getData();
 
@@ -54,12 +64,15 @@ public abstract class BaseValueVector implements ValueVector{
     public abstract int getValueCount();
     public void reset(){}
   }
-  
+
   abstract class BaseMutator implements Mutator{
     public void reset(){}
   }
-  
-  
-  
+
+  @Override
+  public Iterator<ValueVector> iterator() {
+    return Iterators.emptyIterator();
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 63384a3..597b0f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -24,9 +24,12 @@ import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.NullableBitHolder;
 import org.apache.drill.exec.memory.AccountingByteBuf;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.impl.BitReaderImpl;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
 /**
  * Bit implements a vector of bit-width values. Elements in the vector are accessed by position from the logical start
@@ -49,11 +52,10 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   }
 
   @Override
-  public FieldMetadata getMetadata() {
-    return FieldMetadata.newBuilder()
-        .setDef(getField().getDef())
-        .setValueCount(valueCount)
-        .setBufferLength( (int) Math.ceil(valueCount / 8.0))
+  public SerializedField getMetadata() {
+    return field.getAsBuilder() //
+        .setValueCount(valueCount) //
+        .setBufferLength( (int) Math.ceil(valueCount / 8.0)) //
         .build();
   }
 
@@ -66,13 +68,26 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   }
 
   public void allocateNew() {
+    if(!allocateNewSafe()) throw new OutOfMemoryRuntimeException();
+  }
+
+  public boolean allocateNewSafe() {
     clear();
     if (allocationMonitor > 5) {
       allocationValueCount = Math.min(1, (int)(allocationValueCount * 0.9));
     } else if (allocationMonitor < -5) {
       allocationValueCount = (int) (allocationValueCount * 1.1);
     }
-    allocateNew(allocationValueCount);
+
+    clear();
+    valueCapacity = allocationValueCount;
+    int valueSize = getSizeFromCount(allocationValueCount);
+    data = allocator.buffer(valueSize);
+    if(data == null) return false;
+    for (int i = 0; i < valueSize; i++) {
+      data.setByte(i, 0);
+    }
+    return true;
   }
 
   /**
@@ -112,8 +127,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
   }
 
   @Override
-  public void load(FieldMetadata metadata, ByteBuf buffer) {
-    assert this.field.getDef().equals(metadata.getDef());
+  public void load(SerializedField metadata, ByteBuf buffer) {
+    assert this.field.matches(metadata);
     int loaded = load(metadata.getValueCount(), buffer);
     assert metadata.getBufferLength() == loaded;
   }
@@ -177,9 +192,6 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     }
   }
 
-  private void copyTo(int startIndex, int length, BitVector target) {
-
-  }
 
   private class TransferImpl implements TransferPair {
     BitVector to;
@@ -205,8 +217,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     }
 
     @Override
-    public void copyValue(int fromIndex, int toIndex) {
-      to.copyFrom(fromIndex, toIndex, BitVector.this);
+    public boolean copyValueSafe(int fromIndex, int toIndex) {
+      return to.copyFromSafe(fromIndex, toIndex, BitVector.this);
     }
   }
 
@@ -233,7 +245,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
     }
 
     @Override
-    public final Object getObject(int index) {
+    public final Boolean getObject(int index) {
       return new Boolean(get(index) != 0);
     }
 
@@ -245,9 +257,15 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
       holder.value = get(index);
     }
 
-    final void get(int index, NullableBitHolder holder) {
+    public final void get(int index, NullableBitHolder holder) {
+      holder.isSet = 1;
       holder.value = get(index);
     }
+
+    @Override
+    public FieldReader getReader() {
+      return new BitReaderImpl(BitVector.this);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
index 8e097e4..ad2ba1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
@@ -18,5 +18,5 @@
 package org.apache.drill.exec.vector;
 
 public interface RepeatedMutator extends ValueVector.Mutator {
-  public void startNewGroup(int index);
+  public boolean startNewGroup(int index);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index 258b354..8b871fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -22,24 +22,34 @@ import io.netty.buffer.ByteBuf;
 import java.io.Closeable;
 
 import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
 /**
  * ValueVectorTypes defines a set of template-generated classes which implement type-specific value vectors. The
  * template approach was chosen due to the lack of multiple inheritence. It is also important that all related logic be
  * as efficient as possible.
  */
-public interface ValueVector extends Closeable {
+public interface ValueVector extends Closeable, Iterable<ValueVector> {
+
 
   /**
    * Allocate new buffers. ValueVector implements logic to determine how much to allocate.
+   * @throws OutOfMemoryRuntimeException Thrown if no memory can be allocated.
    */
-  public void allocateNew();
+  public void allocateNew() throws OutOfMemoryRuntimeException;
+
+  /**
+   * Allocates new buffers. ValueVector implements logic to determine how much to allocate.
+   * @return Returns true if allocation was succesful.
+   */
+  public boolean allocateNewSafe();
 
   public int getBufferSize();
-  
+
   /**
    * Alternative to clear(). Allows use as closeable in try-with-resources.
    */
@@ -52,27 +62,26 @@ public interface ValueVector extends Closeable {
 
   /**
    * Get information about how this field is materialized.
-   * 
+   *
    * @return
    */
   public MaterializedField getField();
 
   /**
-   * Get a transfer pair to allow transferring this vectors data between this vector and a destination vector of the same
-   * type. Will also generate a second instance of this vector class that is connected through the TransferPair.
-   * 
-   * @return 
+   * Get a transfer pair to allow transferring this vectors data between this vector and a destination vector of the
+   * same type. Will also generate a second instance of this vector class that is connected through the TransferPair.
+   *
+   * @return
    */
   public TransferPair getTransferPair();
 
   public TransferPair makeTransferPair(ValueVector to);
-  
-  
+
   public TransferPair getTransferPair(FieldReference ref);
 
   /**
    * Given the current buffer allocation, return the maximum number of values that this buffer can contain.
-   * 
+   *
    * @return Maximum values buffer can contain. In the case of a Repeated field, this is the number of atoms, not
    *         repeated groups.
    */
@@ -80,37 +89,40 @@ public interface ValueVector extends Closeable {
 
   /**
    * Get Accessor to read value vector data.
-   * 
+   *
    * @return
    */
   public abstract Accessor getAccessor();
 
   /**
-   * Return the underlying buffers associated with this vector. Note that this doesn't impact the
-   * reference counts for this buffer so it only should be used for in-context access. Also note
-   * that this buffer changes regularly thus external classes shouldn't hold a reference to
-   * it (unless they change it).
+   * Return the underlying buffers associated with this vector. Note that this doesn't impact the reference counts for
+   * this buffer so it only should be used for in-context access. Also note that this buffer changes regularly thus
+   * external classes shouldn't hold a reference to it (unless they change it).
    *
    * @return The underlying ByteBuf.
    */
   public abstract ByteBuf[] getBuffers();
-  
+
   /**
-   * Load the data provided in the buffer.  Typically used when deserializing from the wire.
-   * @param metadata Metadata used to decode the incoming buffer.
-   * @param buffer The buffer that contains the ValueVector.
+   * Load the data provided in the buffer. Typically used when deserializing from the wire.
+   *
+   * @param metadata
+   *          Metadata used to decode the incoming buffer.
+   * @param buffer
+   *          The buffer that contains the ValueVector.
    */
-  public void load(FieldMetadata metadata, ByteBuf buffer);
-  
+  public void load(SerializedField metadata, ByteBuf buffer);
+
   /**
-   * Get the metadata for this field.  Used in serialization
+   * Get the metadata for this field. Used in serialization
+   *
    * @return FieldMetadata for this field.
    */
-  public FieldMetadata getMetadata();
-  
+  public SerializedField getMetadata();
+
   /**
    * Get a Mutator to update this vectors data.
-   * 
+   *
    * @return
    */
   public abstract Mutator getMutator();
@@ -125,23 +137,25 @@ public interface ValueVector extends Closeable {
 
     /**
      * Get the Java Object representation of the element at the specified position. Useful for testing.
-     * 
+     *
      * @param index
      *          Index of the value to get
      */
     public abstract Object getObject(int index);
 
     public int getValueCount();
-    
+
     public boolean isNull(int index);
 
     public void reset();
+
+    public FieldReader getReader();
   }
 
   public interface Mutator {
     /**
      * Set the top number values (optional/required) or number of value groupings (repeated) in this vector.
-     * 
+     *
      * @param valueCount
      */
     public void setValueCount(int valueCount);
@@ -150,4 +164,5 @@ public interface ValueVector extends Closeable {
 
     public void generateTestData(int values);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java
index 32f08b0..adee171 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java
@@ -40,6 +40,6 @@ public class GenericAccessor extends AbstractSqlAccessor {
 
   @Override
   TypeProtos.MajorType getType() {
-    return v.getMetadata().getDef().getMajorType();
+    return v.getMetadata().getMajorType();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
new file mode 100644
index 0000000..ab1d270
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.vector.ValueVector;
+
+public abstract class AbstractContainerVector implements ValueVector{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class);
+
+  public abstract <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz);
+  public abstract <T extends ValueVector> T get(String name, Class<T> clazz);
+  public abstract int size();
+
+  protected <T extends ValueVector> T typeify(ValueVector v, Class<T> clazz){
+    if(clazz.isAssignableFrom(v.getClass())){
+      return (T) v;
+    }else{
+      throw new IllegalStateException(String.format("Vector requested [%s] was different than type stored [%s].  Drill doesn't yet support hetergenous types.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+    }
+  }
+
+  public abstract VectorWithOrdinal getVectorWithOrdinal(String name);
+
+
+  public TypedFieldId getFieldIdIfMatches(TypedFieldId.Builder builder, boolean addToBreadCrumb, PathSegment seg){
+    if(seg == null){
+      if(addToBreadCrumb) builder.intermediateType(this.getField().getType());
+      return builder.finalType(this.getField().getType()).build();
+    }
+
+    if(seg.isArray()){
+
+      if(seg.isLastPath()){
+        if(addToBreadCrumb) builder.intermediateType(this.getField().getType());
+        return builder //
+          .remainder(seg) //
+          .finalType(this.getField().getType()) //
+          .withIndex() //
+          .build();
+      }else{
+        if(addToBreadCrumb){
+          addToBreadCrumb = false;
+          builder.remainder(seg);
+        }
+        // this is a complex array reference, which means it doesn't correspond directly to a vector by itself.
+        seg = seg.getChild();
+
+      }
+
+    }else{
+      // name segment.
+    }
+
+    VectorWithOrdinal vord = getVectorWithOrdinal(seg.isArray() ? null : seg.getNameSegment().getPath());
+    if(vord == null) return null;
+
+
+    if(addToBreadCrumb){
+      builder.intermediateType(this.getField().getType());
+      builder.addId(vord.ordinal);
+    }
+
+    ValueVector v = vord.vector;
+
+    if(v instanceof AbstractContainerVector){
+      // we're looking for a multi path.
+      AbstractContainerVector c = (AbstractContainerVector) v;
+      return c.getFieldIdIfMatches(builder, addToBreadCrumb, seg.getChild());
+    }else{
+      if(seg.isLastPath()){
+        if(addToBreadCrumb) builder.intermediateType(v.getField().getType());
+        return builder.finalType(v.getField().getType()).build();
+      }else{
+        logger.warn("You tried to request a complex type inside a scalar object.");
+        return null;
+      }
+    }
+
+  }
+
+  protected boolean supportsDirectRead(){
+    return false;
+  }
+
+  protected class VectorWithOrdinal{
+    final ValueVector vector;
+    final int ordinal;
+
+    public VectorWithOrdinal(ValueVector v, int ordinal){
+      this.vector = v;
+      this.ordinal = ordinal;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
new file mode 100644
index 0000000..f126e5c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+public class AbstractMapVector {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractMapVector.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
new file mode 100644
index 0000000..91c0be5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.ComplexHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector.MapSingleCopier;
+import org.apache.drill.exec.vector.complex.impl.SingleMapReaderImpl;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.hive12.common.collect.Lists;
+
+public class MapVector extends AbstractContainerVector {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class);
+
+  public final static MajorType TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REQUIRED).build();
+
+  final HashMap<String, ValueVector> vectors = Maps.newHashMap();
+  private final Map<String, VectorWithOrdinal> vectorIds = Maps.newHashMap();
+  private final IntObjectOpenHashMap<ValueVector> vectorsById = new IntObjectOpenHashMap<>();
+  private final SingleMapReaderImpl reader = new SingleMapReaderImpl(MapVector.this);
+  private final Accessor accessor = new Accessor();
+  private final Mutator mutator = new Mutator();
+  private final BufferAllocator allocator;
+  private MaterializedField field;
+  private int valueCount;
+
+  public MapVector(String path, BufferAllocator allocator){
+    this.field = MaterializedField.create(SchemaPath.getSimplePath(path), TYPE);
+    this.allocator = allocator;
+  }
+  public MapVector(MaterializedField field, BufferAllocator allocator){
+    this.field = field;
+    this.allocator = allocator;
+  }
+
+  public int size(){
+    return vectors.size();
+  }
+
+  transient private MapTransferPair ephPair;
+  transient private MapSingleCopier ephPair2;
+
+  public boolean copyFromSafe(int fromIndex, int thisIndex, MapVector from){
+    if(ephPair == null || ephPair.from != from){
+      ephPair = (MapTransferPair) from.makeTransferPair(this);
+    }
+    return ephPair.copyValueSafe(fromIndex, thisIndex);
+  }
+
+  public boolean copyFromSafe(int fromSubIndex, int thisIndex, RepeatedMapVector from){
+    if(ephPair2 == null || ephPair2.from != from){
+      ephPair2 = from.makeSingularCopier(this);
+    }
+    return ephPair2.copySafe(fromSubIndex, thisIndex);
+  }
+
+  @Override
+  public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
+    ValueVector v = vectors.get(name);
+
+    if(v == null){
+      v = TypeHelper.getNewVector(field.getPath(), name, allocator, type);
+      Preconditions.checkNotNull(v, String.format("Failure to create vector of type %s.", type));
+      put(name, v);
+    }
+    return typeify(v, clazz);
+
+  }
+
+  protected void put(String name, ValueVector vv){
+    int ordinal = vectors.size();
+    if(vectors.put(name, vv) != null){
+      throw new IllegalStateException();
+    }
+    vectorIds.put(name, new VectorWithOrdinal(vv, ordinal));
+    vectorsById.put(ordinal, vv);
+    field.addChild(vv.getField());
+  }
+
+
+  @Override
+  protected boolean supportsDirectRead() {
+    return true;
+  }
+
+  public Iterator<String> fieldNameIterator(){
+    return vectors.keySet().iterator();
+  }
+
+  @Override
+  public void allocateNew() throws OutOfMemoryRuntimeException {
+    if(!allocateNewSafe()) throw new OutOfMemoryRuntimeException();
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+    for(ValueVector v : vectors.values()){
+      if(!v.allocateNewSafe()) return false;
+    }
+    return true;
+  }
+
+  @Override
+  public <T extends ValueVector> T get(String name, Class<T> clazz) {
+    ValueVector v = vectors.get(name);
+    if(v == null) throw new IllegalStateException(String.format("Attempting to access invalid map field of name %s.", name));
+    return typeify(v, clazz);
+  }
+
+  @Override
+  public int getBufferSize() {
+    if(valueCount == 0 || vectors.isEmpty()) return 0;
+    long buffer = 0;
+    for(ValueVector v : this){
+      buffer += v.getBufferSize();
+    }
+
+    return (int) buffer;
+  }
+
+  @Override
+  public void close() {
+    for(ValueVector v : this){
+      v.close();
+    }
+  }
+
+  @Override
+  public Iterator<ValueVector> iterator() {
+    return vectors.values().iterator();
+  }
+
+  @Override
+  public MaterializedField getField() {
+    return field;
+  }
+
+  @Override
+  public TransferPair getTransferPair() {
+    return new MapTransferPair(field.getPath());
+  }
+
+  @Override
+  public TransferPair makeTransferPair(ValueVector to) {
+    return new MapTransferPair( (MapVector) to);
+  }
+
+  @Override
+  public TransferPair getTransferPair(FieldReference ref) {
+    return new MapTransferPair(ref);
+  }
+
+  private class MapTransferPair implements TransferPair{
+    private MapVector from = MapVector.this;
+    private TransferPair[] pairs;
+    private MapVector to;
+
+    public MapTransferPair(SchemaPath path){
+      MapVector v = new MapVector(MaterializedField.create(path, TYPE), allocator);
+      pairs = new TransferPair[vectors.size()];
+      int i =0;
+      for(Map.Entry<String, ValueVector> e : vectors.entrySet()){
+        TransferPair otherSide = e.getValue().getTransferPair();
+        v.put(e.getKey(), otherSide.getTo());
+        pairs[i++] = otherSide;
+      }
+      this.to = v;
+    }
+
+    public MapTransferPair(MapVector to){
+      this.to = to;
+      pairs = new TransferPair[vectors.size()];
+      int i =0;
+      for(Map.Entry<String, ValueVector> e : vectors.entrySet()){
+        int preSize = to.vectors.size();
+        ValueVector v = to.addOrGet(e.getKey(), e.getValue().getField().getType(), e.getValue().getClass());
+        if(to.vectors.size() != preSize) v.allocateNew();
+        pairs[i++] = e.getValue().makeTransferPair(v);
+      }
+    }
+
+
+    @Override
+    public void transfer() {
+      for(TransferPair p : pairs){
+        p.transfer();
+      }
+      to.valueCount = valueCount;
+      clear();
+    }
+
+    @Override
+    public ValueVector getTo() {
+      return to;
+    }
+
+    @Override
+    public boolean copyValueSafe(int from, int to) {
+      for(TransferPair p : pairs){
+        if(!p.copyValueSafe(from, to)) return false;
+      }
+      return true;
+    }
+
+    @Override
+    public void splitAndTransfer(int startIndex, int length) {
+      throw new UnsupportedOperationException();
+    }
+
+  }
+
+  @Override
+  public int getValueCapacity() {
+    if(this.vectors.isEmpty()) return 0;
+    return vectors.values().iterator().next().getValueCapacity();
+  }
+
+  @Override
+  public Accessor getAccessor() {
+    return accessor;
+  }
+
+  @Override
+  public ByteBuf[] getBuffers() {
+    List<ByteBuf> bufs = Lists.newArrayList();
+    for(ValueVector v : vectors.values()){
+      for(ByteBuf b : v.getBuffers()){
+        bufs.add(b);
+      }
+    }
+    return bufs.toArray(new ByteBuf[bufs.size()]);
+  }
+
+  @Override
+  public void load(SerializedField metadata, ByteBuf buf) {
+    List<SerializedField> fields = metadata.getChildList();
+
+    int bufOffset = 0;
+    for (SerializedField fmd : fields) {
+      MaterializedField fieldDef = MaterializedField.create(fmd);
+
+      ValueVector v = vectors.get(fieldDef.getLastName());
+      if(v == null) {
+        // if we arrive here, we didn't have a matching vector.
+
+        v = TypeHelper.getNewVector(fieldDef, allocator);
+      }
+      if (fmd.getValueCount() == 0){
+        v.clear();
+      } else {
+        v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+      }
+      bufOffset += fmd.getBufferLength();
+      put(fieldDef.getLastName(), v);
+    }
+  }
+
+  @Override
+  public SerializedField getMetadata() {
+    SerializedField.Builder b = getField() //
+        .getAsBuilder() //
+        .setBufferLength(getBufferSize()) //
+        .setValueCount(valueCount);
+
+
+    for(ValueVector v : vectors.values()){
+      b.addChild(v.getMetadata());
+    }
+    return b.build();
+  }
+
+  @Override
+  public Mutator getMutator() {
+    return mutator;
+  }
+
+  public class Accessor implements ValueVector.Accessor{
+
+    @Override
+    public Object getObject(int index) {
+      Map<String, Object> vv = Maps.newHashMap();
+      for(Map.Entry<String, ValueVector> e : vectors.entrySet()){
+        ValueVector v = e.getValue();
+        String k = e.getKey();
+        Object value = v.getAccessor().getObject(index);
+        if(value != null){
+          vv.put(k, value);
+        }
+      }
+      return vv;
+    }
+
+    public void get(int index, ComplexHolder holder){
+      reader.setPosition(index);
+      holder.reader = reader;
+    }
+
+    @Override
+    public int getValueCount() {
+      return valueCount;
+    }
+
+    @Override
+    public boolean isNull(int index) {
+      return false;
+    }
+
+    @Override
+    public void reset() {
+    }
+
+    @Override
+    public FieldReader getReader() {
+      return new SingleMapReaderImpl(MapVector.this);
+    }
+
+  }
+
+  public ValueVector getVectorById(int id){
+    return vectorsById.get(id);
+  }
+
+  public class Mutator implements ValueVector.Mutator{
+
+    @Override
+    public void setValueCount(int valueCount) {
+      for(ValueVector v : vectors.values()){
+        v.getMutator().setValueCount(valueCount);
+      }
+      MapVector.this.valueCount = valueCount;
+    }
+
+    @Override
+    public void reset() {
+    }
+
+    @Override
+    public void generateTestData(int values) {
+    }
+
+  }
+
+  @Override
+  public void clear() {
+    for(ValueVector v : vectors.values()){
+      v.clear();;
+    }
+  }
+
+  public VectorWithOrdinal getVectorWithOrdinal(String name){
+    return vectorIds.get(name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java
new file mode 100644
index 0000000..6d86a64
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+public interface Positionable {
+  public void setPosition(int index);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdc5daed/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
new file mode 100644
index 0000000..93930b5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.ComplexHolder;
+import org.apache.drill.exec.expr.holders.RepeatedListHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.impl.NullReader;
+import org.apache.drill.exec.vector.complex.impl.RepeatedListReaderImpl;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+import com.google.common.collect.Lists;
+import com.google.hive12.common.base.Preconditions;
+
+
+public class RepeatedListVector extends AbstractContainerVector implements RepeatedFixedWidthVector{
+
+  private final UInt4Vector offsets;   // offsets to start of each record
+  private final BufferAllocator allocator;
+  private final Mutator mutator = new Mutator();
+  private final Accessor accessor = new Accessor();
+  private ValueVector vector;
+  private final MaterializedField field;
+  private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this);
+  private int allocationValueCount = 4000;
+  private int allocationMonitor = 0;
+
+  private int lastSet = 0;
+
+  private int valueCount;
+
+  public static MajorType TYPE = Types.repeated(MinorType.LIST);
+
+  public RepeatedListVector(MaterializedField field, BufferAllocator allocator){
+    this.allocator = allocator;
+    this.offsets = new UInt4Vector(null, allocator);
+    this.field = field;
+  }
+
+  public int size(){
+    return vector != null ? 1 : 0;
+  }
+
+  public RepeatedListVector(SchemaPath path, BufferAllocator allocator){
+    this(MaterializedField.create(path, TYPE), allocator);
+  }
+
+  transient private RepeatedListTransferPair ephPair;
+
+  public boolean copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from){
+    if(ephPair == null || ephPair.from != from){
+      ephPair = (RepeatedListTransferPair) from.makeTransferPair(this);
+    }
+    return ephPair.copyValueSafe(fromIndex, thisIndex);
+  }
+
+  public Mutator getMutator(){
+    return mutator;
+  }
+
+  @Override
+  public void allocateNew() throws OutOfMemoryRuntimeException {
+    if(!allocateNewSafe()) throw new OutOfMemoryRuntimeException();
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+    if(!offsets.allocateNewSafe()) return false;
+
+    if(vector != null){
+      return vector.allocateNewSafe();
+    }else{
+      return true;
+    }
+
+  }
+
+  public class Mutator implements ValueVector.Mutator{
+
+    public void startNewGroup(int index) {
+      offsets.getMutator().set(index+1, offsets.getAccessor().get(index));
+    }
+
+    public int add(int index){
+      int endOffset = index+1;
+      int currentChildOffset = offsets.getAccessor().get(endOffset);
+      int newChildOffset = currentChildOffset + 1;
+      boolean success = offsets.getMutator().setSafe(endOffset, newChildOffset);
+      lastSet = index;
+      if(!success) return -1;
+
+      // this is done at beginning so return the currentChildOffset, not the new offset.
+      return currentChildOffset;
+
+    }
+
+    @Override
+    public void setValueCount(int groupCount) {
+      populateEmpties(groupCount);
+      offsets.getMutator().setValueCount(groupCount+1);
+
+      if(vector != null){
+        int valueCount = offsets.getAccessor().get(groupCount);
+        vector.getMutator().setValueCount(valueCount);
+      }
+    }
+
+    @Override
+    public void reset() {
+      lastSet = 0;
+    }
+
+    @Override
+    public void generateTestData(int values) {
+    }
+
+  }
+
+  public class Accessor implements ValueVector.Accessor {
+
+    @Override
+    public Object getObject(int index) {
+      List<Object> l = Lists.newArrayList();
+      int end = offsets.getAccessor().get(index+1);
+      for(int i =  offsets.getAccessor().get(index); i < end; i++){
+        l.add(vector.getAccessor().getObject(i));
+      }
+      return l;
+    }
+
+    @Override
+    public int getValueCount() {
+      return offsets.getAccessor().getValueCount() - 1;
+    }
+
+    public void get(int index, RepeatedListHolder holder){
+      assert index <= getValueCapacity();
+      holder.start = offsets.getAccessor().get(index);
+      holder.end = offsets.getAccessor().get(index+1);
+    }
+
+    public void get(int index, ComplexHolder holder){
+      FieldReader reader = getReader();
+      reader.setPosition(index);
+      holder.reader = reader;
+    }
+
+    public void get(int index, int arrayIndex, ComplexHolder holder){
+      RepeatedListHolder h = new RepeatedListHolder();
+      get(index, h);
+      int offset = h.start + arrayIndex;
+
+      if(offset >= h.end){
+        holder.reader = NullReader.INSTANCE;
+      }else{
+        FieldReader r = vector.getAccessor().getReader();
+        r.setPosition(offset);
+        holder.reader = r;
+      }
+
+    }
+
+    @Override
+    public boolean isNull(int index) {
+      return false;
+    }
+
+    @Override
+    public void reset() {
+    }
+
+    @Override
+    public FieldReader getReader() {
+      return reader;
+    }
+
+  }
+
+  @Override
+  public int getBufferSize() {
+    return offsets.getBufferSize() + vector.getBufferSize();
+  }
+
+  @Override
+  public void close() {
+    offsets.close();
+    if(vector != null) vector.close();
+  }
+
+  @Override
+  public void clear() {
+    lastSet = 0;
+    offsets.clear();
+    if(vector != null) vector.clear();
+  }
+
+  @Override
+  public MaterializedField getField() {
+    return field;
+  }
+
+  @Override
+  public TransferPair getTransferPair() {
+    return new RepeatedListTransferPair(field.getPath());
+  }
+
+
+  public class RepeatedListTransferPair implements TransferPair{
+    private final RepeatedListVector from = RepeatedListVector.this;
+    private final RepeatedListVector to;
+    private final TransferPair vectorTransfer;
+
+    private RepeatedListTransferPair(RepeatedListVector to){
+      this.to = to;
+      if(to.vector == null){
+        to.vector = to.addOrGet(null, vector.getField().getType(), vector.getClass());
+        to.vector.allocateNew();
+      }
+      this.vectorTransfer = vector.makeTransferPair(to.vector);
+    }
+
+    private RepeatedListTransferPair(SchemaPath path){
+      this.to = new RepeatedListVector(path, allocator);
+      vectorTransfer = vector.getTransferPair();
+      this.to.vector = vectorTransfer.getTo();
+    }
+
+    @Override
+    public void transfer() {
+      offsets.transferTo(to.offsets);
+      vectorTransfer.transfer();
+      to.valueCount = valueCount;
+      clear();
+    }
+
+    @Override
+    public ValueVector getTo() {
+      return to;
+    }
+
+    @Override
+    public void splitAndTransfer(int startIndex, int length) {
+      throw new UnsupportedOperationException();
+    }
+
+
+    @Override
+    public boolean copyValueSafe(int from, int to) {
+      RepeatedListHolder holder = new RepeatedListHolder();
+      accessor.get(from, holder);
+      int newIndex = this.to.offsets.getAccessor().get(to);
+      //todo: make this a bulk copy.
+      for(int i = holder.start; i < holder.end; i++, newIndex++){
+        if(!vectorTransfer.copyValueSafe(i, newIndex)) return false;
+      }
+      if(!this.to.offsets.getMutator().setSafe(to, newIndex)) return false;
+
+      return true;
+    }
+
+  }
+
+  @Override
+  public TransferPair makeTransferPair(ValueVector to) {
+    if(!(to instanceof RepeatedListVector ) ) throw new IllegalArgumentException("You can't make a transfer pair from an incompatible .");
+    return new RepeatedListTransferPair( (RepeatedListVector) to);
+  }
+
+  @Override
+  public TransferPair getTransferPair(FieldReference ref) {
+    return new RepeatedListTransferPair(ref);
+  }
+
+  @Override
+  public int getValueCapacity() {
+    if(vector == null) return offsets.getValueCapacity() - 1;
+    return  Math.min(offsets.getValueCapacity() - 1, vector.getValueCapacity());
+  }
+
+  @Override
+  public Accessor getAccessor() {
+    return accessor;
+  }
+
+  @Override
+  public ByteBuf[] getBuffers() {
+    return ArrayUtils.addAll(offsets.getBuffers(), vector.getBuffers());
+  }
+
+  private void setVector(ValueVector v){
+    field.addChild(v.getField());
+    this.vector = v;
+  }
+
+  @Override
+  public void load(SerializedField metadata, ByteBuf buf) {
+    SerializedField childField = metadata.getChildList().get(0);
+
+    int bufOffset = offsets.load(metadata.getValueCount()+1, buf);
+
+    MaterializedField fieldDef = MaterializedField.create(childField);
+    if(vector == null) {
+      setVector(TypeHelper.getNewVector(fieldDef, allocator));
+    }
+
+    if (childField.getValueCount() == 0){
+      vector.clear();
+    } else {
+      vector.load(childField, buf.slice(bufOffset, childField.getBufferLength()));
+    }
+  }
+
+  @Override
+  public SerializedField getMetadata() {
+    return getField() //
+        .getAsBuilder() //
+        .setBufferLength(getBufferSize()) //
+        .setValueCount(accessor.getValueCount()) //
+        .addChild(vector.getMetadata()) //
+        .build();
+  }
+
+  private void populateEmpties(int groupCount){
+    int previousEnd = offsets.getAccessor().get(lastSet + 1);
+    for(int i = lastSet + 2; i <= groupCount; i++){
+      offsets.getMutator().setSafe(i, previousEnd);
+    }
+    lastSet = groupCount - 1;
+  }
+
+  @Override
+  public Iterator<ValueVector> iterator() {
+    return Collections.singleton(vector).iterator();
+  }
+
+  @Override
+  public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
+    Preconditions.checkArgument(name == null);
+
+    if(vector == null){
+      vector = TypeHelper.getNewVector(MaterializedField.create(field.getPath().getUnindexedArrayChild(), type), allocator);
+    }
+    return typeify(vector, clazz);
+  }
+
+  @Override
+  public <T extends ValueVector> T get(String name, Class<T> clazz) {
+    if(name != null) return null;
+    return typeify(vector, clazz);
+  }
+
+  @Override
+  public void allocateNew(int parentValueCount, int childValueCount) {
+    clear();
+    offsets.allocateNew(parentValueCount+1);
+    mutator.reset();
+    accessor.reset();
+  }
+
+  @Override
+  public int load(int parentValueCount, int childValueCount, ByteBuf buf) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public VectorWithOrdinal getVectorWithOrdinal(String name) {
+    if(name != null) return null;
+    return new VectorWithOrdinal(vector, 0);
+  }
+
+
+}