You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/01/13 23:30:12 UTC

[1/3] drill git commit: DRILL-1885: fix a problem regarding ordinal to vector mapping that report incorrect result or fails a query & refactor code, eliminiate redundancy support case insensitive vector lookup yet case sensitive result reporting rely on

Repository: drill
Updated Branches:
  refs/heads/master 5b012bf28 -> 69db15ebb


DRILL-1885: fix a problem regarding ordinal to vector mapping that report incorrect result or fails a query & refactor code, eliminiate redundancy support case insensitive vector lookup yet case sensitive result reporting rely on fields in the order that they show up in the schema while copying vectors


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a2190fa0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a2190fa0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a2190fa0

Branch: refs/heads/master
Commit: a2190fa0f2376958930edfd90f253a8ef4b5a34a
Parents: 5b012bf
Author: Hanifi Gunes <hg...@maprtech.com>
Authored: Wed Dec 31 14:44:17 2014 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Tue Jan 13 14:28:51 2015 -0800

----------------------------------------------------------------------
 .../common/collections/MapWithOrdinal.java      | 248 ++++++++++++++++
 .../drill/exec/physical/impl/ScanBatch.java     |   4 +-
 .../drill/exec/record/VectorContainer.java      |  14 +-
 .../vector/complex/AbstractContainerVector.java | 268 +++++++++++++++--
 .../exec/vector/complex/AbstractMapVector.java  |  22 --
 .../drill/exec/vector/complex/MapVector.java    | 241 ++++-----------
 .../exec/vector/complex/RepeatedListVector.java |  50 +---
 .../exec/vector/complex/RepeatedMapVector.java  | 295 +++++++------------
 .../exec/vector/complex/VectorWithOrdinal.java  |  30 ++
 .../vector/complex/impl/ComplexWriterImpl.java  |   2 +-
 .../complex/impl/RepeatedListReaderImpl.java    |   3 +-
 .../complex/impl/RepeatedMapReaderImpl.java     |   2 +-
 .../complex/impl/SingleListReaderImpl.java      |   3 +-
 .../complex/impl/SingleMapReaderImpl.java       |   2 +-
 .../complex/impl/VectorContainerWriter.java     |   2 +-
 .../exec/store/parquet/TestParquetComplex.java  |   5 +-
 .../fn/TestJsonReaderWithSparseFiles.java       |   3 -
 .../vector/complex/writer/TestRepeated.java     |   2 +-
 18 files changed, 700 insertions(+), 496 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java b/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java
new file mode 100644
index 0000000..5e54b2d
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/collections/MapWithOrdinal.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.collections;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import io.netty.util.collection.IntObjectHashMap;
+import io.netty.util.collection.IntObjectMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of map that supports constant time look-up by a generic key or an ordinal.
+ *
+ * This class extends the functionality a regular {@link Map} with ordinal lookup support.
+ * Upon insertion an unused ordinal is assigned to the inserted (key, value) tuple.
+ * Upon update the same ordinal id is re-used while value is replaced.
+ * Upon deletion of an existing item, its corresponding ordinal is recycled and could be used by another item.
+ *
+ * For any instance with N items, this implementation guarantees that ordinals are in the range of [0, N). However,
+ * the ordinal assignment is dynamic and may change after an insertion or deletion. Consumers of this class are
+ * responsible for explicitly checking the ordinal corresponding to a key via
+ * {@link org.apache.drill.common.collections.MapWithOrdinal#getOrdinal(Object)} before attempting to execute a lookup
+ * with an ordinal.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+
+public class MapWithOrdinal<K, V> implements Map<K, V> {
+  private final static Logger logger = LoggerFactory.getLogger(MapWithOrdinal.class);
+
+  private final Map<K, Entry<Integer, V>> primary = Maps.newLinkedHashMap();
+  private final IntObjectHashMap<V> secondary = new IntObjectHashMap<>();
+
+  private final Map<K, V> delegate = new Map<K, V>() {
+    @Override
+    public boolean isEmpty() {
+      return size() == 0;
+    }
+
+    @Override
+    public int size() {
+      return primary.size();
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+      return primary.containsKey(key);
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+      return primary.containsValue(value);
+    }
+
+    @Override
+    public V get(Object key) {
+      Entry<Integer, V> pair = primary.get(key);
+      if (pair != null) {
+        return pair.getValue();
+      }
+      return null;
+    }
+
+    @Override
+    public V put(K key, V value) {
+      final Entry<Integer, V> oldPair = primary.get(key);
+      // if key exists try replacing otherwise, assign a new ordinal identifier
+      final int ordinal = oldPair == null ? primary.size():oldPair.getKey();
+      primary.put(key, new AbstractMap.SimpleImmutableEntry<>(ordinal, value));
+      secondary.put(ordinal, value);
+      return oldPair==null ? null:oldPair.getValue();
+    }
+
+    @Override
+    public V remove(Object key) {
+      final Entry<Integer, V> oldPair = primary.remove(key);
+      if (oldPair!=null) {
+        final int lastOrdinal = secondary.size();
+        final V last = secondary.get(lastOrdinal);
+        // normalize mappings so that all numbers until primary.size() is assigned
+        // swap the last element with the deleted one
+        secondary.put(oldPair.getKey(), last);
+        primary.put((K) key, new AbstractMap.SimpleImmutableEntry<>(oldPair.getKey(), last));
+      }
+      return oldPair==null ? null:oldPair.getValue();
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> m) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void clear() {
+      primary.clear();
+      secondary.clear();
+    }
+
+    @Override
+    public Set<K> keySet() {
+      return primary.keySet();
+    }
+
+    @Override
+    public Collection<V> values() {
+      return Lists.newArrayList(Iterables.transform(secondary.entries(), new Function<IntObjectMap.Entry<V>, V>() {
+        @Override
+        public V apply(IntObjectMap.Entry<V> entry) {
+          return Preconditions.checkNotNull(entry).value();
+        }
+      }));
+    }
+
+    @Override
+    public Set<Entry<K, V>> entrySet() {
+      return Sets.newHashSet(Iterables.transform(primary.entrySet(), new Function<Entry<K, Entry<Integer, V>>, Entry<K, V>>() {
+        @Override
+        public Entry<K, V> apply(Entry<K, Entry<Integer, V>> entry) {
+          return new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), entry.getValue().getValue());
+        }
+      }));
+    }
+  };
+
+  /**
+   * Returns the value corresponding to the given ordinal
+   *
+   * @param id ordinal value for lookup
+   * @return an instance of V
+   */
+  public V getByOrdinal(int id) {
+    return secondary.get(id);
+  }
+
+  /**
+   * Returns the ordinal corresponding to the given key.
+   *
+   * @param key key for ordinal lookup
+   * @return ordinal value corresponding to key if it exists or -1
+   */
+  public int getOrdinal(K key) {
+    Entry<Integer, V> pair = primary.get(key);
+    if (pair != null) {
+      return pair.getKey();
+    }
+    return -1;
+  }
+
+  @Override
+  public int size() {
+    return delegate.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return delegate.isEmpty();
+  }
+
+  @Override
+  public V get(Object key) {
+    return delegate.get(key);
+  }
+
+  /**
+   * Inserts the tuple (key, value) into the map extending the semantics of {@link Map#put} with automatic ordinal
+   * assignment. A new ordinal is assigned if key does not exists. Otherwise the same ordinal is re-used but the value
+   * is replaced.
+   *
+   * {@see java.util.Map#put}
+   */
+  @Override
+  public V put(K key, V value) {
+    return delegate.put(key, value);
+  }
+
+  @Override
+  public Collection<V> values() {
+    return delegate.values();
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return delegate.containsKey(key);
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+    return delegate.containsValue(value);
+  }
+
+  /**
+   * Removes the element corresponding to the key if exists extending the semantics of {@link Map#remove} with ordinal
+   * re-cycling. The ordinal corresponding to the given key may be re-assigned to another tuple. It is important that
+   * consumer checks the ordinal value via {@link #getOrdinal(Object)} before attempting to look-up by ordinal.
+   *
+   * {@see java.util.Map#remove}
+   */
+  @Override
+  public V remove(Object key) {
+    return delegate.remove(key);
+  }
+
+  @Override
+  public void putAll(Map<? extends K, ? extends V> m) {
+    delegate.putAll(m);
+  }
+
+  @Override
+  public void clear() {
+    delegate.clear();
+  }
+
+  @Override
+  public Set<K> keySet() {
+    return delegate.keySet();
+  }
+
+  @Override
+  public Set<Entry<K, V>> entrySet() {
+    return delegate.entrySet();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 23833b6..29bf68e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -282,14 +282,14 @@ public class ScanBatch implements RecordBatch {
         if (!clazz.isAssignableFrom(v.getClass())) {
           throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
         }
-        container.add(v);
 
         ValueVector old = fieldVectorMap.put(field.key(), v);
         if(old != null){
-          container.remove(old);
           old.clear();
+          container.remove(old);
         }
 
+        container.add(v);
         // Adding new vectors to the container mark that the schema has changed
         schemaChange = true;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index d50760a..7b772cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -29,15 +29,13 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.MaterializedField.Key;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.AbstractMapVector;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public class VectorContainer extends AbstractMapVector implements Iterable<VectorWrapper<?>>, VectorAccessible {
+public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccessible {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainer.class);
 
   protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
@@ -54,16 +52,6 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto
     this.oContext = oContext;
   }
 
-  // public VectorContainer(List<ValueVector> vectors, List<ValueVector[]> hyperVectors) {
-  // assert !vectors.isEmpty() || !hyperVectors.isEmpty();
-  //
-  // addCollection(vectors);
-  //
-  // for (ValueVector[] vArr : hyperVectors) {
-  // add(vArr);
-  // }
-  // }
-
   public boolean isSchemaChanged() {
     return schemaChanged;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
index 1210d90..d633154 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
@@ -17,33 +17,255 @@
  */
 package org.apache.drill.exec.vector.complex;
 
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.collections.MapWithOrdinal;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.util.CallBack;
 import org.apache.drill.exec.vector.ValueVector;
 
-public abstract class AbstractContainerVector implements ValueVector{
+/**
+ * Base class for composite vectors.
+ *
+ * This class implements common functionality of composite vectors.
+ */
+public abstract class AbstractContainerVector implements ValueVector {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class);
 
-  public abstract <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz);
-  public abstract <T extends ValueVector> T get(String name, Class<T> clazz);
-  public abstract int size();
+  private final MapWithOrdinal<String, ValueVector> vectors =  new MapWithOrdinal<>();
+  private final MaterializedField field;
+  protected final BufferAllocator allocator;
+  protected final CallBack callBack;
+
+  protected AbstractContainerVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) {
+    this.field = Preconditions.checkNotNull(field);
+    this.allocator = allocator;
+    this.callBack = callBack;
+  }
+
+  @Override
+  public void allocateNew() throws OutOfMemoryRuntimeException {
+    if (!allocateNewSafe()) {
+      throw new OutOfMemoryRuntimeException();
+    }
+  }
+
+  @Override
+  public boolean allocateNewSafe() {
+    for (ValueVector v : vectors.values()) {
+      if (!v.allocateNewSafe()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Returns the field definition of this instance.
+   */
+  @Override
+  public MaterializedField getField() {
+    return field;
+  }
+
+  /**
+   * Adds a new field with the given parameters or replaces the existing one and consequently returns the resultant
+   * {@link org.apache.drill.exec.vector.ValueVector}.
+   *
+   * Execution takes place in the following order:
+   * <ul>
+   *   <li>
+   *     if field is new, create and insert a new vector of desired type.
+   *   </li>
+   *   <li>
+   *     if field exists and existing vector is of desired vector type, return the vector.
+   *   </li>
+   *   <li>
+   *     if field exists and null filled, clear the existing vector; create and insert a new vector of desired type.
+   *   </li>
+   *   <li>
+   *     otherwise, throw an {@link java.lang.IllegalStateException}
+   *   </li>
+   * </ul>
+   *
+   * @param name name of the field
+   * @param type type of the field
+   * @param clazz class of expected vector type
+   * @param <T> class type of expected vector type
+   * @throws java.lang.IllegalStateException raised if there is a hard schema change
+   *
+   * @return resultant {@link org.apache.drill.exec.vector.ValueVector}
+   */
+  public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
+    final ValueVector existing = getChild(name);
+    boolean create = false;
+    if (existing == null) {
+      create = true;
+    } else if (clazz.isAssignableFrom(existing.getClass())) {
+      return (T)existing;
+    } else if (nullFilled(existing)) {
+      existing.clear();
+      create = true;
+    }
+    if (create) {
+      final T vector = (T)TypeHelper.getNewVector(field.getPath(), name, allocator, type);
+      putChild(name, vector);
+      if (callBack!=null) {
+        callBack.doWork();
+      }
+      return vector;
+    }
+    final String message = "Drill does not support schema change yet. Existing[{}] and desired[{}] vector types mismatch";
+    throw new IllegalStateException(String.format(message, existing.getClass().getSimpleName(), clazz.getSimpleName()));
+  }
+
+  private boolean nullFilled(ValueVector vector) {
+    for (int r=0; r<vector.getAccessor().getValueCount(); r++) {
+      if (!vector.getAccessor().isNull(r)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Returns a {@link org.apache.drill.exec.vector.ValueVector} corresponding to the given ordinal identifier.
+   */
+  public ValueVector getChildByOrdinal(int id) {
+    return vectors.getByOrdinal(id);
+  }
+
+  /**
+   * Returns a {@link org.apache.drill.exec.vector.ValueVector} corresponding to the given field name if exists or null.
+   */
+  public ValueVector getChild(String name) {
+    return getChild(name, ValueVector.class);
+  }
+
+  /**
+   * Returns a {@link org.apache.drill.exec.vector.ValueVector} instance of subtype of <T> corresponding to the given
+   * field name if exists or null.
+   */
+  public <T extends ValueVector> T getChild(String name, Class<T> clazz) {
+    ValueVector v = vectors.get(name.toLowerCase());
+    if (v == null) {
+      return null;
+    }
+    return typeify(v, clazz);
+  }
+
+  /**
+   * Inserts the vector with the given name if it does not exist else replaces it with the new value.
+   *
+   * Note that this method does not enforce any vector type check nor throws a schema change exception.
+   */
+  protected void putChild(String name, ValueVector vector) {
+    ValueVector old = vectors.put(
+        Preconditions.checkNotNull(name, "field name cannot be null").toLowerCase(),
+        Preconditions.checkNotNull(vector, "vector cannot be null")
+    );
+    if (old != null && old != vector) {
+      logger.debug("Field [%s] mutated from [%s] to [%s]", name, old.getClass().getSimpleName(),
+          vector.getClass().getSimpleName());
+    }
+
+    field.addChild(vector.getField());
+  }
+
+  /**
+   * Returns a sequence of field names in the order that they show up in the schema.
+   */
+  protected Collection<String> getChildFieldNames() {
+    return Sets.newLinkedHashSet(Iterables.transform(field.getChildren(), new Function<MaterializedField, String>() {
+      @Nullable
+      @Override
+      public String apply(MaterializedField field) {
+        return Preconditions.checkNotNull(field).getLastName();
+      }
+    }));
+  }
+
+  /**
+   * Returns a sequence of underlying child vectors.
+   */
+  protected Collection<ValueVector> getChildren() {
+    return vectors.values();
+  }
+
+  /**
+   * Returns the number of underlying child vectors.
+   */
+  public int size() {
+    return vectors.size();
+  }
+
+  /**
+   * Clears out all underlying child vectors.
+   */
+ @Override
+  public void close() {
+    for (ValueVector vector:this) {
+      vector.close();
+    }
+  }
+
+  @Override
+  public Iterator<ValueVector> iterator() {
+    return vectors.values().iterator();
+  }
 
   protected <T extends ValueVector> T typeify(ValueVector v, Class<T> clazz) {
     if (clazz.isAssignableFrom(v.getClass())) {
       return (T) v;
-    } else {
-      throw new IllegalStateException(String.format("Vector requested [%s] was different than type stored [%s].  Drill doesn't yet support hetergenous types.", clazz.getSimpleName(), v.getClass().getSimpleName()));
     }
+    throw new IllegalStateException(String.format("Vector requested [%s] was different than type stored [%s].  Drill doesn't yet support hetergenous types.", clazz.getSimpleName(), v.getClass().getSimpleName()));
   }
 
-  public abstract List<ValueVector> getPrimitiveVectors();
+  /**
+   * Returns a list of scalar child vectors recursing the entire vector hierarchy.
+   */
+  public List<ValueVector> getPrimitiveVectors() {
+    List<ValueVector> primitiveVectors = Lists.newArrayList();
+    for (ValueVector v : vectors.values()) {
+      if (v instanceof AbstractContainerVector) {
+        AbstractContainerVector av = (AbstractContainerVector) v;
+        primitiveVectors.addAll(av.getPrimitiveVectors());
+      } else {
+        primitiveVectors.add(v);
+      }
+    }
+    return primitiveVectors;
+  }
 
-  public abstract VectorWithOrdinal getVectorWithOrdinal(String name);
+  /**
+   * Returns a vector with its corresponding ordinal mapping if field exists or null.
+   */
+  public VectorWithOrdinal getChildVectorWithOrdinal(String name) {
+    final int ordinal = vectors.getOrdinal(name.toLowerCase());
+    if (ordinal < 0) {
+      return null;
+    }
+    final ValueVector vector = vectors.getByOrdinal(ordinal);
+    return new VectorWithOrdinal(vector, ordinal);
+  }
 
   public TypedFieldId getFieldIdIfMatches(TypedFieldId.Builder builder, boolean addToBreadCrumb, PathSegment seg) {
     if (seg == null) {
@@ -78,7 +300,7 @@ public abstract class AbstractContainerVector implements ValueVector{
       // name segment.
     }
 
-    VectorWithOrdinal vord = getVectorWithOrdinal(seg.isArray() ? null : seg.getNameSegment().getPath());
+    VectorWithOrdinal vord = getChildVectorWithOrdinal(seg.isArray() ? null : seg.getNameSegment().getPath());
     if (vord == null) {
       return null;
     }
@@ -125,7 +347,7 @@ public abstract class AbstractContainerVector implements ValueVector{
   private MajorType getLastPathType() {
     if((this.getField().getType().getMinorType() == MinorType.LIST  &&
         this.getField().getType().getMode() == DataMode.REPEATED)) {  // Use Repeated scalar type instead of Required List.
-      VectorWithOrdinal vord = getVectorWithOrdinal(null);
+      VectorWithOrdinal vord = getChildVectorWithOrdinal(null);
       ValueVector v = vord.vector;
       if (! (v instanceof  AbstractContainerVector)) {
         return v.getField().getType();
@@ -138,18 +360,26 @@ public abstract class AbstractContainerVector implements ValueVector{
     return this.getField().getType();
   }
 
-  protected boolean supportsDirectRead() {
-    return false;
+  @Override
+  public DrillBuf[] getBuffers(boolean clear) {
+    List<DrillBuf> buffers = Lists.newArrayList();
+    int expectedBufSize = getBufferSize();
+    int actualBufSize = 0 ;
+
+    for (ValueVector v : vectors.values()) {
+      for (DrillBuf buf : v.getBuffers(clear)) {
+        buffers.add(buf);
+        actualBufSize += buf.writerIndex();
+      }
+    }
+
+    Preconditions.checkArgument(actualBufSize == expectedBufSize);
+    return buffers.toArray(new DrillBuf[buffers.size()]);
   }
 
-  protected class VectorWithOrdinal{
-    final ValueVector vector;
-    final int ordinal;
 
-    public VectorWithOrdinal(ValueVector v, int ordinal) {
-      this.vector = v;
-      this.ordinal = ordinal;
-    }
+  protected boolean supportsDirectRead() {
+    return false;
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
deleted file mode 100644
index f126e5c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.vector.complex;
-
-public class AbstractMapVector {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractMapVector.class);
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index cc3d24c..62a5140 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -22,7 +22,6 @@ import com.google.common.primitives.Ints;
 
 import io.netty.buffer.DrillBuf;
 
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -37,7 +36,6 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.ComplexHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
@@ -48,56 +46,24 @@ import org.apache.drill.exec.vector.complex.RepeatedMapVector.MapSingleCopier;
 import org.apache.drill.exec.vector.complex.impl.SingleMapReaderImpl;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
-import com.carrotsearch.hppc.IntObjectOpenHashMap;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 public class MapVector extends AbstractContainerVector {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class);
 
   public final static MajorType TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REQUIRED).build();
 
-  final HashMap<String, ValueVector> vectors = Maps.newLinkedHashMap();
-  private final Map<String, VectorWithOrdinal> vectorIds = Maps.newHashMap();
-  private final IntObjectOpenHashMap<ValueVector> vectorsById = new IntObjectOpenHashMap<>();
   private final SingleMapReaderImpl reader = new SingleMapReaderImpl(MapVector.this);
   private final Accessor accessor = new Accessor();
   private final Mutator mutator = new Mutator();
-  private final BufferAllocator allocator;
-  private MaterializedField field;
   private int valueCount;
-  private CallBack callBack;
 
   public MapVector(String path, BufferAllocator allocator, CallBack callBack){
-    this.field = MaterializedField.create(SchemaPath.getSimplePath(path), TYPE);
-    this.allocator = allocator;
-    this.callBack = callBack;
-  }
-  public MapVector(MaterializedField field, BufferAllocator allocator, CallBack callBack){
-    this.field = field;
-    this.allocator = allocator;
-    this.callBack = callBack;
-  }
-  @Override
-  public int size() {
-    return vectors.size();
+    this(MaterializedField.create(SchemaPath.getSimplePath(path), TYPE), allocator, callBack);
   }
 
-  @Override
-  public List<ValueVector> getPrimitiveVectors() {
-    List<ValueVector> primitiveVectors = Lists.newArrayList();
-    for (ValueVector v : this.vectors.values()) {
-      if (v instanceof AbstractContainerVector) {
-        AbstractContainerVector av = (AbstractContainerVector) v;
-        for (ValueVector vv : av.getPrimitiveVectors()) {
-          primitiveVectors.add(vv);
-        }
-      } else {
-        primitiveVectors.add(v);
-      }
-    }
-    return primitiveVectors;
+  public MapVector(MaterializedField field, BufferAllocator allocator, CallBack callBack){
+    super(field, allocator, callBack);
   }
 
   transient private MapTransferPair ephPair;
@@ -118,86 +84,17 @@ public class MapVector extends AbstractContainerVector {
   }
 
   @Override
-  public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
-    while (true) {
-      ValueVector vector = vectors.get(name);
-      if (vector == null) {
-        vector = TypeHelper.getNewVector(field.getPath(), name, allocator, type);
-        Preconditions.checkNotNull(vector, String.format("Failure to create vector of type %s.", type));
-        put(name, vector);
-        if (callBack != null) {
-          callBack.doWork();
-        }
-      }
-      if (clazz.isAssignableFrom(vector.getClass())) {
-        return (T)vector;
-      } else {
-        boolean allNulls = true;
-        for (int i=0; i<vector.getAccessor().getValueCount(); i++) {
-          if (!vector.getAccessor().isNull(i)) {
-            allNulls = false;
-            break;
-          }
-        }
-        if (allNulls) {
-          vector.clear();
-          vectors.remove(name);
-        } else {
-          throw new IllegalStateException(String.format("Vector requested [%s] was different than type stored [%s].  Drill doesn't yet support hetergenous types.", clazz.getSimpleName(), vector.getClass().getSimpleName()));
-        }
-      }
-    }
-  }
-
-  protected void put(String name, ValueVector vv) {
-    int ordinal = vectors.size();
-    if (vectors.put(name, vv) != null) {
-      throw new IllegalStateException();
-    }
-    vectorIds.put(name.toLowerCase(), new VectorWithOrdinal(vv, ordinal));
-    vectorsById.put(ordinal, vv);
-    field.addChild(vv.getField());
-  }
-
-
-  @Override
   protected boolean supportsDirectRead() {
     return true;
   }
 
   public Iterator<String> fieldNameIterator() {
-    return vectors.keySet().iterator();
-  }
-
-  @Override
-  public void allocateNew() throws OutOfMemoryRuntimeException {
-    if (!allocateNewSafe()) {
-      throw new OutOfMemoryRuntimeException();
-    }
-  }
-
-  @Override
-  public boolean allocateNewSafe() {
-    for (ValueVector v : vectors.values()) {
-      if (!v.allocateNewSafe()) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public <T extends ValueVector> T get(String name, Class<T> clazz) {
-    ValueVector v = vectors.get(name);
-    if (v == null) {
-      throw new IllegalStateException(String.format("Attempting to access invalid map field of name %s.", name));
-    }
-    return typeify(v, clazz);
+    return getChildFieldNames().iterator();
   }
 
   @Override
   public int getBufferSize() {
-    if (valueCount == 0 || vectors.isEmpty()) {
+    if (valueCount == 0 || size() == 0) {
       return 0;
     }
     long buffer = 0;
@@ -209,65 +106,51 @@ public class MapVector extends AbstractContainerVector {
   }
 
   @Override
-  public void close() {
-    for (ValueVector v : this) {
-      v.close();
-    }
-  }
-
-  @Override
-  public Iterator<ValueVector> iterator() {
-    return vectors.values().iterator();
-  }
-
-  @Override
-  public MaterializedField getField() {
-    return field;
-  }
-
-  @Override
   public TransferPair getTransferPair() {
-    return new MapTransferPair(field.getPath());
+    return new MapTransferPair(this, getField().getPath());
   }
 
   @Override
   public TransferPair makeTransferPair(ValueVector to) {
-    return new MapTransferPair( (MapVector) to);
+    return new MapTransferPair(this, (MapVector)to);
   }
 
   @Override
   public TransferPair getTransferPair(FieldReference ref) {
-    return new MapTransferPair(ref);
+    return new MapTransferPair(this, ref);
   }
 
-  private class MapTransferPair implements TransferPair{
-    private MapVector from = MapVector.this;
-    private TransferPair[] pairs;
-    private MapVector to;
-
-    public MapTransferPair(SchemaPath path) {
-      MapVector v = new MapVector(MaterializedField.create(path, TYPE), allocator, callBack);
-      pairs = new TransferPair[vectors.size()];
-      int i =0;
-      for (Map.Entry<String, ValueVector> e : vectors.entrySet()) {
-        TransferPair otherSide = e.getValue().getTransferPair();
-        v.put(e.getKey(), otherSide.getTo());
-        pairs[i++] = otherSide;
-      }
-      this.to = v;
+  protected static class MapTransferPair implements TransferPair{
+    private final TransferPair[] pairs;
+    private final MapVector from;
+    private final MapVector to;
+
+    public MapTransferPair(MapVector from, SchemaPath path) {
+      this(from, new MapVector(MaterializedField.create(path, TYPE), from.allocator, from.callBack), false);
     }
 
-    public MapTransferPair(MapVector to) {
+    public MapTransferPair(MapVector from, MapVector to) {
+      this(from, to, true);
+    }
+
+    protected MapTransferPair(MapVector from, MapVector to, boolean allocate) {
+      this.from = from;
       this.to = to;
-      pairs = new TransferPair[vectors.size()];
-      int i =0;
-      for (Map.Entry<String, ValueVector> e : vectors.entrySet()) {
-        int preSize = to.vectors.size();
-        ValueVector v = to.addOrGet(e.getKey(), e.getValue().getField().getType(), e.getValue().getClass());
-        if (to.vectors.size() != preSize) {
-          v.allocateNew();
+      this.pairs = new TransferPair[from.size()];
+
+      int i = 0;
+      ValueVector vector;
+      for (String child:from.getChildFieldNames()) {
+        int preSize = to.size();
+        vector = from.getChild(child);
+        if (vector == null) {
+          continue;
         }
-        pairs[i++] = e.getValue().makeTransferPair(v);
+        ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
+        if (allocate && to.size() != preSize) {
+          newVector.allocateNew();
+        }
+        pairs[i++] = vector.makeTransferPair(newVector);
       }
     }
 
@@ -277,8 +160,8 @@ public class MapVector extends AbstractContainerVector {
       for (TransferPair p : pairs) {
         p.transfer();
       }
-      to.valueCount = valueCount;
-      clear();
+      to.valueCount = from.valueCount;
+      from.clear();
     }
 
     @Override
@@ -308,7 +191,7 @@ public class MapVector extends AbstractContainerVector {
 
   @Override
   public int getValueCapacity() {
-    if (this.vectors.isEmpty()) {
+    if (size() == 0) {
       return Integer.MAX_VALUE;
     }
 
@@ -322,7 +205,7 @@ public class MapVector extends AbstractContainerVector {
       }
     };
 
-    return natural.min(vectors.values()).getValueCapacity();
+    return natural.min(getChildren()).getValueCapacity();
   }
 
   @Override
@@ -331,17 +214,6 @@ public class MapVector extends AbstractContainerVector {
   }
 
   @Override
-  public DrillBuf[] getBuffers(boolean clear) {
-    List<DrillBuf> bufs = Lists.newArrayList();
-    for (ValueVector v : vectors.values()) {
-      for (DrillBuf b : v.getBuffers(clear)) {
-        bufs.add(b);
-      }
-    }
-    return bufs.toArray(new DrillBuf[bufs.size()]);
-  }
-
-  @Override
   public void load(SerializedField metadata, DrillBuf buf) {
     List<SerializedField> fields = metadata.getChildList();
     valueCount = metadata.getValueCount();
@@ -350,11 +222,11 @@ public class MapVector extends AbstractContainerVector {
     for (SerializedField fmd : fields) {
       MaterializedField fieldDef = MaterializedField.create(fmd);
 
-      ValueVector v = vectors.get(fieldDef.getLastName());
+      ValueVector v = getChild(fieldDef.getLastName());
       if (v == null) {
         // if we arrive here, we didn't have a matching vector.
         v = TypeHelper.getNewVector(fieldDef, allocator);
-        put(fieldDef.getLastName(), v);
+        putChild(fieldDef.getLastName(), v);
       }
       if (fmd.getValueCount() == 0 && (!fmd.hasGroupCount() || fmd.getGroupCount() == 0)) {
         v.clear();
@@ -375,7 +247,7 @@ public class MapVector extends AbstractContainerVector {
         .setValueCount(valueCount);
 
 
-    for(ValueVector v : vectors.values()) {
+    for(ValueVector v : getChildren()) {
       b.addChild(v.getMetadata());
     }
     return b.build();
@@ -391,12 +263,10 @@ public class MapVector extends AbstractContainerVector {
     @Override
     public Object getObject(int index) {
       Map<String, Object> vv = new JsonStringHashMap();
-      for (Map.Entry<String, ValueVector> e : vectors.entrySet()) {
-        ValueVector v = e.getValue();
-        String k = e.getKey();
-        Object value = v.getAccessor().getObject(index);
+      for (String child:getChildFieldNames()) {
+        Object value = getChild(child).getAccessor().getObject(index);
         if (value != null) {
-          vv.put(k, value);
+          vv.put(child, value);
         }
       }
       return vv;
@@ -429,40 +299,31 @@ public class MapVector extends AbstractContainerVector {
   }
 
   public ValueVector getVectorById(int id) {
-    return vectorsById.get(id);
+    return getChildByOrdinal(id);
   }
 
   public class Mutator implements ValueVector.Mutator{
 
     @Override
     public void setValueCount(int valueCount) {
-      for (ValueVector v : vectors.values()) {
+      for (ValueVector v : getChildren()) {
         v.getMutator().setValueCount(valueCount);
       }
       MapVector.this.valueCount = valueCount;
     }
 
     @Override
-    public void reset() {
-    }
+    public void reset() { }
 
     @Override
-    public void generateTestData(int values) {
-    }
-
+    public void generateTestData(int values) { }
   }
 
   @Override
   public void clear() {
     valueCount = 0;
-    for (ValueVector v : vectors.values()) {
-      v.clear();;
+    for (ValueVector v : getChildren()) {
+      v.clear();
     }
   }
-
-  @Override
-  public VectorWithOrdinal getVectorWithOrdinal(String name) {
-    return vectorIds.get(name.toLowerCase());
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index 362d806..58eb546 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -54,16 +54,10 @@ import com.google.common.collect.Lists;
 public class RepeatedListVector extends AbstractContainerVector implements RepeatedFixedWidthVector{
 
   private final UInt4Vector offsets;   // offsets to start of each record
-  private final BufferAllocator allocator;
   private final Mutator mutator = new Mutator();
   private final RepeatedListAccessor accessor = new RepeatedListAccessor();
   private ValueVector vector;
-  private final MaterializedField field;
   private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this);
-  private int allocationValueCount = 4000;
-  private int allocationMonitor = 0;
-  private CallBack callBack;
-
   private int lastSet = 0;
 
   private int valueCount;
@@ -71,10 +65,8 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
   public static MajorType TYPE = Types.repeated(MinorType.LIST);
 
   public RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack){
-    this.allocator = allocator;
+    super(field, allocator, callBack);
     this.offsets = new UInt4Vector(null, allocator);
-    this.field = field;
-    this.callBack = callBack;
   }
 
   public RepeatedListVector(SchemaPath path, BufferAllocator allocator, CallBack callBack){
@@ -86,19 +78,6 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
     return vector != null ? 1 : 0;
   }
 
-  @Override
-  public List<ValueVector> getPrimitiveVectors() {
-    List<ValueVector> primitiveVectors = Lists.newArrayList();
-    if (vector instanceof AbstractContainerVector) {
-      for (ValueVector v : ((AbstractContainerVector) vector).getPrimitiveVectors()) {
-        primitiveVectors.add(v);
-      }
-    } else {
-      primitiveVectors.add(vector);
-    }
-    primitiveVectors.add(offsets);
-    return primitiveVectors;
-  }
 
   transient private RepeatedListTransferPair ephPair;
 
@@ -114,13 +93,6 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
   }
 
   @Override
-  public void allocateNew() throws OutOfMemoryRuntimeException {
-    if (!allocateNewSafe()) {
-      throw new OutOfMemoryRuntimeException();
-    }
-  }
-
-  @Override
   public boolean allocateNewSafe() {
     if (!offsets.allocateNewSafe()) {
       return false;
@@ -132,7 +104,6 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
     } else {
       return true;
     }
-
   }
 
   public class Mutator implements ValueVector.Mutator, RepeatedMutator{
@@ -277,9 +248,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
   @Override
   public void close() {
     offsets.close();
-    if (vector != null) {
-      vector.close();
-    }
+    super.close();
   }
 
   @Override
@@ -292,13 +261,8 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
   }
 
   @Override
-  public MaterializedField getField() {
-    return field;
-  }
-
-  @Override
   public TransferPair getTransferPair() {
-    return new RepeatedListTransferPair(field.getPath());
+    return new RepeatedListTransferPair(getField().getPath());
   }
 
   public class RepeatedListTransferPair implements TransferPair{
@@ -391,7 +355,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
   }
 
   private void setVector(ValueVector v) {
-    field.addChild(v.getField());
+    getField().addChild(v.getField());
     this.vector = v;
   }
 
@@ -441,7 +405,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
     Preconditions.checkArgument(name == null);
 
     if(vector == null){
-      vector = TypeHelper.getNewVector(MaterializedField.create(field.getPath().getUnindexedArrayChild(), type), allocator, callBack);
+      vector = TypeHelper.getNewVector(MaterializedField.create(getField().getPath().getUnindexedArrayChild(), type), allocator, callBack);
       if (callBack != null) {
         callBack.doWork();
       }
@@ -450,7 +414,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
   }
 
   @Override
-  public <T extends ValueVector> T get(String name, Class<T> clazz) {
+  public <T extends ValueVector> T getChild(String name, Class<T> clazz) {
     if (name != null) {
       return null;
     }
@@ -471,7 +435,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
   }
 
   @Override
-  public VectorWithOrdinal getVectorWithOrdinal(String name) {
+  public VectorWithOrdinal getChildVectorWithOrdinal(String name) {
     if (name != null) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index e140c8b..43a3881 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -33,7 +34,6 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.ComplexHolder;
 import org.apache.drill.exec.expr.holders.RepeatedMapHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
@@ -48,9 +48,7 @@ import org.apache.drill.exec.vector.complex.impl.NullReader;
 import org.apache.drill.exec.vector.complex.impl.RepeatedMapReaderImpl;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
-import com.carrotsearch.hppc.IntObjectOpenHashMap;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class RepeatedMapVector extends AbstractContainerVector implements RepeatedFixedWidthVector {
@@ -60,22 +58,14 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
   public final static MajorType TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REPEATED).build();
 
   private final UInt4Vector offsets;   // offsets to start of each record (considering record indices are 0-indexed)
-  private final Map<String, ValueVector> vectors = Maps.newLinkedHashMap();
-  private final Map<String, VectorWithOrdinal> vectorIds = Maps.newHashMap();
   private final RepeatedMapReaderImpl reader = new RepeatedMapReaderImpl(RepeatedMapVector.this);
-  private final IntObjectOpenHashMap<ValueVector> vectorsById = new IntObjectOpenHashMap<>();
   private final RepeatedMapAccessor accessor = new RepeatedMapAccessor();
   private final Mutator mutator = new Mutator();
-  private final BufferAllocator allocator;
-  private final MaterializedField field;
   private int lastPopulatedValueIndex = -1;
-  private CallBack callBack;
 
   public RepeatedMapVector(MaterializedField field, BufferAllocator allocator, CallBack callBack){
-    this.field = field;
-    this.allocator = allocator;
+    super(field, allocator, callBack);
     this.offsets = new UInt4Vector(null, allocator);
-    this.callBack = callBack;
   }
 
   @Override
@@ -83,7 +73,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
     clear();
     offsets.allocateNew(topLevelValueCount+1);
     offsets.zeroVector();
-    for (ValueVector v : vectors.values()) {
+    for (ValueVector v : getChildren()) {
       AllocationHelper.allocatePrecomputedChildCount(v, topLevelValueCount, 50, childValueCount);
     }
     mutator.reset();
@@ -91,56 +81,17 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
   }
 
   public Iterator<String> fieldNameIterator() {
-    return vectors.keySet().iterator();
-  }
-
-  @Override
-  public int size() {
-    return vectors.size();
+    return getChildFieldNames().iterator();
   }
 
   @Override
   public List<ValueVector> getPrimitiveVectors() {
-    List<ValueVector> primitiveVectors = Lists.newArrayList();
-    for (ValueVector v : this.vectors.values()) {
-      if (v instanceof AbstractContainerVector) {
-        AbstractContainerVector av = (AbstractContainerVector) v;
-        for (ValueVector vv : av.getPrimitiveVectors()) {
-          primitiveVectors.add(vv);
-        }
-      } else {
-        primitiveVectors.add(v);
-      }
-    }
+    List<ValueVector> primitiveVectors = super.getPrimitiveVectors();
     primitiveVectors.add(offsets);
     return primitiveVectors;
   }
 
   @Override
-  public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
-    ValueVector v = vectors.get(name);
-
-    if (v == null) {
-      v = TypeHelper.getNewVector(field.getPath(), name, allocator, type);
-      Preconditions.checkNotNull(v, String.format("Failure to create vector of type %s.", type));
-      put(name, v);
-      if (callBack != null) {
-        callBack.doWork();
-      }
-    }
-    return typeify(v, clazz);
-  }
-
-  @Override
-  public <T extends ValueVector> T get(String name, Class<T> clazz) {
-    ValueVector v = vectors.get(name);
-    if (v == null) {
-      throw new IllegalStateException(String.format("Attempting to access invalid map field of name %s.", name));
-    }
-    return typeify(v, clazz);
-  }
-
-  @Override
   public int getBufferSize() {
     if (accessor.getGroupCount() == 0) {
       return 0;
@@ -154,49 +105,45 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
 
   @Override
   public void close() {
-    for (ValueVector v : this) {
-      v.close();
-    }
-  }
-
-  @Override
-  public Iterator<ValueVector> iterator() {
-    return vectors.values().iterator();
-  }
-
-  @Override
-  public MaterializedField getField() {
-    return field;
+    super.close();
+    offsets.close();
   }
 
   @Override
   public TransferPair getTransferPair() {
-    return new MapTransferPair(field.getPath());
+    return new RepeatedMapTransferPair(this, getField().getPath());
   }
 
   @Override
   public TransferPair makeTransferPair(ValueVector to) {
-    return new MapTransferPair( (RepeatedMapVector) to);
+    return new RepeatedMapTransferPair(this, (RepeatedMapVector)to);
   }
 
   MapSingleCopier makeSingularCopier(MapVector to) {
-    return new MapSingleCopier(to);
+    return new MapSingleCopier(this, to);
   }
 
-  class MapSingleCopier {
+  protected static class MapSingleCopier {
     private final TransferPair[] pairs;
-    final RepeatedMapVector from = RepeatedMapVector.this;
-
-    public MapSingleCopier(MapVector to) {
-      pairs = new TransferPair[vectors.size()];
-      int i =0;
-      for (Map.Entry<String, ValueVector> e : vectors.entrySet()) {
-        int preSize = to.vectors.size();
-        ValueVector v = to.addOrGet(e.getKey(), e.getValue().getField().getType(), e.getValue().getClass());
-        if (to.vectors.size() != preSize) {
-          v.allocateNew();
+    public final RepeatedMapVector from;
+
+    public MapSingleCopier(RepeatedMapVector from, MapVector to) {
+      this.from = from;
+      this.pairs = new TransferPair[from.size()];
+
+      int i = 0;
+      ValueVector vector;
+      for (String child:from.getChildFieldNames()) {
+        int preSize = to.size();
+        vector = from.getChild(child);
+        if (vector == null) {
+          continue;
+        }
+        ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
+        if (to.size() != preSize) {
+          newVector.allocateNew();
         }
-        pairs[i++] = e.getValue().makeTransferPair(v);
+        pairs[i++] = vector.makeTransferPair(newVector);
       }
     }
 
@@ -211,19 +158,12 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
   }
 
   public TransferPair getTransferPairToSingleMap() {
-    return new SingleMapTransferPair(field.getPath());
+    return new SingleMapTransferPair(this, getField());
   }
 
   @Override
   public TransferPair getTransferPair(FieldReference ref) {
-    return new MapTransferPair(ref);
-  }
-
-  @Override
-  public void allocateNew() throws OutOfMemoryRuntimeException {
-    if (!allocateNewSafe()) {
-      throw new OutOfMemoryRuntimeException();
-    }
+    return new RepeatedMapTransferPair(this, ref);
   }
 
   @Override
@@ -232,44 +172,40 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
       return false;
     }
     offsets.zeroVector();
-    for (ValueVector v : vectors.values()) {
-      if (!v.allocateNewSafe()) {
-        return false;
-      }
-    }
-    return true;
+    return super.allocateNewSafe();
   }
 
-  private class SingleMapTransferPair implements TransferPair{
-    private RepeatedMapVector from = RepeatedMapVector.this;
-    private TransferPair[] pairs;
-    private MapVector to;
-
-    public SingleMapTransferPair(SchemaPath path) {
-
-      MaterializedField mf = MaterializedField.create(path, Types.required(field.getType().getMinorType()));
-      MapVector v = new MapVector(mf, allocator, callBack);
-      pairs = new TransferPair[vectors.size()];
-      int i =0;
-      for (Map.Entry<String, ValueVector> e : vectors.entrySet()) {
-        TransferPair otherSide = e.getValue().getTransferPair();
-        v.put(e.getKey(), otherSide.getTo());
-        pairs[i++] = otherSide;
-      }
-      this.to = v;
+  protected static class SingleMapTransferPair implements TransferPair {
+    private final TransferPair[] pairs;
+    private final RepeatedMapVector from;
+    private final MapVector to;
+    private static final MajorType MAP_TYPE = Types.required(MinorType.MAP);
+
+    public SingleMapTransferPair(RepeatedMapVector from, MaterializedField field) {
+      this(from, new MapVector(MaterializedField.create(field.getPath(), MAP_TYPE), from.allocator, from.callBack), false);
     }
 
-    public SingleMapTransferPair(MapVector to) {
+    public SingleMapTransferPair(RepeatedMapVector from, MapVector to) {
+      this(from, to, true);
+    }
+
+    public SingleMapTransferPair(RepeatedMapVector from, MapVector to, boolean allocate) {
+      this.from = from;
       this.to = to;
-      pairs = new TransferPair[vectors.size()];
-      int i =0;
-      for (Map.Entry<String, ValueVector> e : vectors.entrySet()) {
-        int preSize = to.vectors.size();
-        ValueVector v = to.addOrGet(e.getKey(), e.getValue().getField().getType(), e.getValue().getClass());
-        if (to.vectors.size() != preSize) {
-          v.allocateNew();
+      this.pairs = new TransferPair[from.size()];
+      int i = 0;
+      ValueVector vector;
+      for (String child:from.getChildFieldNames()) {
+        int preSize = to.size();
+        vector = from.getChild(child);
+        if (vector == null) {
+          continue;
         }
-        pairs[i++] = e.getValue().makeTransferPair(v);
+        ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
+        if (allocate && to.size() != preSize) {
+          newVector.allocateNew();
+        }
+        pairs[i++] = vector.makeTransferPair(newVector);
       }
     }
 
@@ -280,7 +216,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
         p.transfer();
       }
       to.getMutator().setValueCount(from.getAccessor().getValueCount());
-      clear();
+      from.clear();
     }
 
     @Override
@@ -308,46 +244,49 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
 
   }
 
-  private class MapTransferPair implements TransferPair{
+  private static class RepeatedMapTransferPair implements TransferPair{
 
     private final TransferPair[] pairs;
     private final RepeatedMapVector to;
-    private final RepeatedMapVector from = RepeatedMapVector.this;
-
-    public MapTransferPair(SchemaPath path) {
-      RepeatedMapVector v = new RepeatedMapVector(MaterializedField.create(path, TYPE), allocator, callBack);
-      pairs = new TransferPair[vectors.size()];
-      int i =0;
-      for (Map.Entry<String, ValueVector> e : vectors.entrySet()) {
-        TransferPair otherSide = e.getValue().getTransferPair();
-        v.put(e.getKey(), otherSide.getTo());
-        pairs[i++] = otherSide;
-      }
-      this.to = v;
+    private final RepeatedMapVector from;
+
+    public RepeatedMapTransferPair(RepeatedMapVector from, SchemaPath path) {
+      this(from, new RepeatedMapVector(MaterializedField.create(path, TYPE), from.allocator, from.callBack), false);
+    }
+
+    public RepeatedMapTransferPair(RepeatedMapVector from, RepeatedMapVector to) {
+      this(from, to, true);
     }
 
-    public MapTransferPair(RepeatedMapVector to) {
+    public RepeatedMapTransferPair(RepeatedMapVector from, RepeatedMapVector to, boolean allocate) {
+      this.from = from;
       this.to = to;
-      pairs = new TransferPair[vectors.size()];
-      int i =0;
-      for (Map.Entry<String, ValueVector> e : vectors.entrySet()) {
-        int preSize = to.vectors.size();
-        ValueVector v = to.addOrGet(e.getKey(), e.getValue().getField().getType(), e.getValue().getClass());
-        if (preSize != to.vectors.size()) {
-          v.allocateNew();
+      this.pairs = new TransferPair[from.size()];
+
+      int i = 0;
+      ValueVector vector;
+      for (String child:from.getChildFieldNames()) {
+        int preSize = to.size();
+        vector = from.getChild(child);
+        if (vector == null) {
+          continue;
         }
-        pairs[i++] = e.getValue().makeTransferPair(v);
+        ValueVector newVector = to.addOrGet(child, vector.getField().getType(), vector.getClass());
+        if (to.size() != preSize) {
+          newVector.allocateNew();
+        }
+        pairs[i++] = vector.makeTransferPair(newVector);
       }
     }
 
 
     @Override
     public void transfer() {
-      offsets.transferTo(to.offsets);
+      from.offsets.transferTo(to.offsets);
       for (TransferPair p : pairs) {
         p.transfer();
       }
-      clear();
+      from.clear();
     }
 
     @Override
@@ -358,7 +297,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
     @Override
     public boolean copyValueSafe(int srcIndex, int destIndex) {
       RepeatedMapHolder holder = new RepeatedMapHolder();
-      accessor.get(srcIndex, holder);
+      from.getAccessor().get(srcIndex, holder);
       if(destIndex >= to.getValueCapacity()){
         return false;
       }
@@ -380,7 +319,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
 
     @Override
     public void splitAndTransfer(final int groupStart, final int groups) {
-      final UInt4Vector.Accessor a = offsets.getAccessor();
+      final UInt4Vector.Accessor a = from.offsets.getAccessor();
       final UInt4Vector.Mutator m = to.offsets.getMutator();
 
       final int startPos = a.get(groupStart);
@@ -408,11 +347,11 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
   }
 
 
-  transient private MapTransferPair ephPair;
+  transient private RepeatedMapTransferPair ephPair;
 
   public boolean copyFromSafe(int fromIndex, int thisIndex, RepeatedMapVector from) {
     if (ephPair == null || ephPair.from != from) {
-      ephPair = (MapTransferPair) from.makeTransferPair(this);
+      ephPair = (RepeatedMapTransferPair) from.makeTransferPair(this);
     }
     return ephPair.copyValueSafe(fromIndex, thisIndex);
   }
@@ -429,21 +368,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
 
   @Override
   public DrillBuf[] getBuffers(boolean clear) {
-    int bufSize = getBufferSize();
-    List<DrillBuf> bufs = Lists.newArrayList(offsets.getBuffers(clear));
-
-    for (ValueVector v : vectors.values()) {
-      for (DrillBuf b : v.getBuffers(clear)) {
-        bufs.add(b);
-      }
-    }
-    int actualBufSize = 0 ;
-    for(DrillBuf b : bufs){
-      actualBufSize += b.writerIndex();
-    }
-
-    Preconditions.checkArgument(actualBufSize == bufSize);
-    return bufs.toArray(new DrillBuf[bufs.size()]);
+    return ArrayUtils.addAll(offsets.getBuffers(clear), super.getBuffers(clear));
   }
 
 
@@ -455,11 +380,11 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
 
     for (SerializedField fmd : fields) {
       MaterializedField fieldDef = MaterializedField.create(fmd);
-      ValueVector v = vectors.get(fieldDef.getLastName());
+      ValueVector v = getChild(fieldDef.getLastName());
       if (v == null) {
         // if we arrive here, we didn't have a matching vector.
         v = TypeHelper.getNewVector(fieldDef, allocator);
-        put(fieldDef.getLastName(), v);
+        putChild(fieldDef.getLastName(), v);
       }
       if (fmd.getValueCount() == 0 && (!fmd.hasGroupCount() || fmd.getGroupCount() == 0)) {
         v.clear();
@@ -480,22 +405,12 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
         .setGroupCount(accessor.getGroupCount())
         // while we don't need to actually read this on load, we need it to make sure we don't skip deserialization of this vector
         .setValueCount(accessor.getGroupCount());
-    for (ValueVector v : vectors.values()) {
+    for (ValueVector v : getChildren()) {
       b.addChild(v.getMetadata());
     }
     return b.build();
   }
 
-  protected void put(String name, ValueVector vv) {
-    int ordinal = vectors.size();
-    if (vectors.put(name, vv) != null) {
-      throw new IllegalStateException();
-    }
-    vectorIds.put(name, new VectorWithOrdinal(vv, ordinal));
-    vectorsById.put(ordinal, vv);
-    field.addChild(vv.getField());
-  }
-
   @Override
   public Mutator getMutator() {
     return mutator;
@@ -507,14 +422,14 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
     public Object getObject(int index) {
       List<Object> l = new JsonStringArrayList();
       int end = offsets.getAccessor().get(index+1);
+      String fieldName;
       for (int i =  offsets.getAccessor().get(index); i < end; i++) {
         Map<String, Object> vv = Maps.newLinkedHashMap();
-        for (Map.Entry<String, ValueVector> e : vectors.entrySet()) {
-          ValueVector v = e.getValue();
-          String k = e.getKey();
-          Object value =  v.getAccessor().getObject(i);
+        for (MaterializedField field:getField().getChildren()) {
+          fieldName = field.getLastName();
+          Object value = getChild(fieldName).getAccessor().getObject(i);
           if (value != null) {
-            vv.put(k,value);
+            vv.put(fieldName, value);
           }
         }
         l.add(vv);
@@ -610,7 +525,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
       populateEmpties(topLevelValueCount);
       offsets.getMutator().setValueCount(topLevelValueCount == 0 ? 0 : topLevelValueCount+1);
       int childValueCount = offsets.getAccessor().get(topLevelValueCount);
-      for (ValueVector v : vectors.values()) {
+      for (ValueVector v : getChildren()) {
         v.getMutator().setValueCount(childValueCount);
       }
     }
@@ -647,7 +562,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
     getMutator().reset();
 
     offsets.clear();
-    for(ValueVector v : vectors.values()) {
+    for(ValueVector v : getChildren()) {
       v.clear();;
     }
   }
@@ -656,10 +571,4 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
   public int load(int parentValueCount, int childValueCount, DrillBuf buf) {
     throw new UnsupportedOperationException();
   }
-
-  @Override
-  public VectorWithOrdinal getVectorWithOrdinal(String name) {
-    return vectorIds.get(name);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/VectorWithOrdinal.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/VectorWithOrdinal.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/VectorWithOrdinal.java
new file mode 100644
index 0000000..a0c8dd8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/VectorWithOrdinal.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+public class VectorWithOrdinal {
+  public final ValueVector vector;
+  public final int ordinal;
+
+  public VectorWithOrdinal(ValueVector v, int ordinal) {
+    this.vector = v;
+    this.ordinal = ordinal;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
index 18b5e9e..a110dcc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
@@ -190,7 +190,7 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWri
     @Override
     public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
       ValueVector v = vc.addOrGet(name, type, clazz);
-      this.put(name, v);
+      putChild(name, v);
       return this.typeify(v, clazz);
 
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java
index c60730c..ae2f779 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java
@@ -23,7 +23,6 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.holders.RepeatedListHolder;
-import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedListVector;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
@@ -111,7 +110,7 @@ public class RepeatedListReaderImpl extends AbstractFieldReader{
   @Override
   public FieldReader reader() {
     if (reader == null) {
-      reader = container.get(name, ValueVector.class).getAccessor().getReader();
+      reader = container.getChild(name).getAccessor().getReader();
       if (currentOffset == NO_VALUES) {
         reader = NullReader.INSTANCE;
       } else {

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
index 15f8a2b..3171d8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
@@ -50,7 +50,7 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{
   public FieldReader reader(String name) {
     FieldReader reader = fields.get(name);
     if (reader == null) {
-      ValueVector child = vector.get(name, ValueVector.class);
+      ValueVector child = vector.getChild(name);
       if (child == null) {
         reader = NullReader.INSTANCE;
       } else {

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java
index c2284ec..40fa6d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java
@@ -23,7 +23,6 @@ package org.apache.drill.exec.vector.complex.impl;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
@@ -65,7 +64,7 @@ public class SingleListReaderImpl extends AbstractFieldReader{
   @Override
   public FieldReader reader() {
     if (reader == null) {
-      reader = container.get(name, ValueVector.class).getAccessor().getReader();
+      reader = container.getChild(name).getAccessor().getReader();
       setPosition(idx());
     }
     return reader;

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java
index 3ec66ff..76f9e2f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java
@@ -51,7 +51,7 @@ public class SingleMapReaderImpl extends AbstractFieldReader{
   public FieldReader reader(String name){
     FieldReader reader = fields.get(name);
     if(reader == null){
-      ValueVector child = vector.get(name, ValueVector.class);
+      ValueVector child = vector.getChild(name);
       if(child == null){
         reader = NullReader.INSTANCE;
       }else{

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
index 36184a7..c3c9354 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
@@ -94,7 +94,7 @@ public class VectorContainerWriter extends AbstractFieldWriter implements Comple
     public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
       try {
         ValueVector v = mutator.addField(MaterializedField.create(name, type), clazz);
-        this.put(name, v);
+        putChild(name, v);
         return this.typeify(v, clazz);
       } catch (SchemaChangeException e) {
         throw new IllegalStateException(e);

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
index 8405d0e..baf5e82 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.parquet;
 
 import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.junit.Test;
 
 public class TestParquetComplex extends BaseTestQuery {
@@ -157,7 +158,7 @@ public class TestParquetComplex extends BaseTestQuery {
     String[] columns = {"keyword0", "keyword2"};
     testBuilder()
             .sqlQuery(query)
-            .ordered()
+            .unOrdered()
             .jsonBaselineFile("store/parquet/complex/baseline4.json")
             .baselineColumns(columns)
             .build()
@@ -170,7 +171,7 @@ public class TestParquetComplex extends BaseTestQuery {
     String[] columns = {"keyword0", "keyword2"};
     testBuilder()
             .sqlQuery(query)
-            .ordered()
+            .unOrdered()
             .jsonBaselineFile("store/parquet/complex/baseline4.json")
             .baselineColumns(columns)
             .build()

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
index 7e4cf4b..437bbb5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
@@ -38,9 +38,6 @@ public class TestJsonReaderWithSparseFiles extends BaseTestQuery {
   static class TypeConverter {
 
     public Object convert(Object obj) {
-      if (obj == null) {
-        return null;
-      }
       if (obj instanceof JsonStringArrayList || obj instanceof JsonStringHashMap) {
         return obj.toString();
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/a2190fa0/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
index 3f125fa..feaef4d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java
@@ -248,7 +248,7 @@ public class TestRepeated {
 
     ByteArrayOutputStream stream = new ByteArrayOutputStream();
     JsonWriter jsonWriter = new JsonWriter(stream, true);
-    FieldReader reader = v.get("col", MapVector.class).getAccessor().getReader();
+    FieldReader reader = v.getChild("col", MapVector.class).getAccessor().getReader();
     reader.setPosition(0);
     jsonWriter.write(reader);
     reader.setPosition(1);


[2/3] drill git commit: DRILL-1926 Fix for back pressure logic

Posted by pa...@apache.org.
DRILL-1926 Fix for back pressure logic


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/2a410775
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/2a410775
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/2a410775

Branch: refs/heads/master
Commit: 2a4107753870aa408447fe6fb0becabcbb0691ef
Parents: a2190fa
Author: Yuliya Feldman <yf...@maprtech.com>
Authored: Wed Dec 17 14:58:43 2014 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Tue Jan 13 14:29:20 2015 -0800

----------------------------------------------------------------------
 .../exec/work/batch/ResponseSenderQueue.java    |  17 ++-
 .../work/batch/UnlimitedRawBatchBuffer.java     |  21 ++-
 .../work/batch/TestUnlimitedBatchBuffer.java    | 134 +++++++++++++++++++
 3 files changed, 164 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/2a410775/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
index 5a9316e..a7535c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
@@ -34,12 +34,25 @@ public class ResponseSenderQueue {
   }
 
   public void flushResponses(){
-    while(!q.isEmpty()){
+    flushResponses(Integer.MAX_VALUE);
+  }
+
+  /**
+   * Flush only up to a count responses
+   * @param count
+   * @return
+   */
+  public int flushResponses(int count){
+    logger.debug("queue.size: {}, count: {}", q.size(), count);
+    int i = 0;
+    while(!q.isEmpty() && i < count){
       ResponseSender s = q.poll();
       if(s != null){
         s.send(DataRpcConfig.OK);
       }
+      i++;
     }
-
+    return i;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/2a410775/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 623a719..35aec93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -41,6 +41,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   private volatile BufferState state = BufferState.INIT;
   private final int softlimit;
   private final int startlimit;
+  private final int bufferSizePerSocket;
   private final AtomicBoolean overlimit = new AtomicBoolean(false);
   private final AtomicBoolean outOfMemory = new AtomicBoolean(false);
   private final ResponseSenderQueue readController = new ResponseSenderQueue();
@@ -49,10 +50,11 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
   private FragmentContext context;
 
   public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount) {
-    int bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
+    bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
 
     this.softlimit = bufferSizePerSocket * fragmentCount;
     this.startlimit = Math.max(softlimit/2, 1);
+    logger.debug("softLimit: {}, startLimit: {}", softlimit, startlimit);
     this.buffer = Queues.newLinkedBlockingDeque();
     this.fragmentCount = fragmentCount;
     this.streamCounter = fragmentCount;
@@ -82,7 +84,8 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
       return;
     }
     buffer.add(batch);
-    if (buffer.size() == softlimit) {
+    if (buffer.size() >= softlimit) {
+      logger.debug("buffer.size: {}", buffer.size());
       overlimit.set(true);
       readController.enqueueResponse(batch.getSender());
     } else {
@@ -167,11 +170,17 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
     }
 
 
-    // if we are in the overlimit condition and aren't finished, check if we've passed the start limit.  If so, turn off the overlimit condition and set auto read to true (start reading from socket again).
+    // try to flush the difference between softlimit and queue size, so every flush we are reducing backlog
+    // when queue size is lower then softlimit - the bigger the difference the more we can flush
     if (!isFinished() && overlimit.get()) {
-      if (buffer.size() == startlimit) {
-        overlimit.set(false);
-        readController.flushResponses();
+      int flushCount = softlimit - buffer.size();
+      if ( flushCount > 0 ) {
+        int flushed = readController.flushResponses(flushCount);
+        logger.debug("flush {} entries, flushed {} entries ", flushCount, flushed);
+        if ( flushed == 0 ) {
+          // queue is empty - nothing to do for now
+          overlimit.set(false);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/2a410775/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
new file mode 100644
index 0000000..15ee3f3
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.batch;
+
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.data.DataRpcConfig;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test case to test whether backpressure is applied when
+ * size of the queue of RawBatchBuffers is exceeding specified softLimit.
+ * It is testing that acknowledgments are queued and sent according to the
+ * correct schedule
+ * If algorithm to release acks will be changed in the future
+ * this test will need to be changed
+ * It is not testing whether Senders receive acknowledgments and act accordingly
+ */
+public class TestUnlimitedBatchBuffer extends ExecTest {
+
+  private static int FRAGMENT_COUNT = 5;
+  private DrillConfig dc = DrillConfig.create();
+
+  private static class MySender implements ResponseSender {
+
+    private int sendCount = 0;
+
+    @Override
+    public void send(Response r) {
+      sendCount++;
+    }
+
+
+    public int getSendCount() {
+      return sendCount;
+    }
+
+    public void resetSender() {
+      sendCount = 0;
+    }
+  }
+  @Test
+  public void testBackPressure() throws Exception {
+
+    final MySender mySender = new MySender();
+    FragmentContext context = Mockito.mock(FragmentContext.class);
+
+    Mockito.when(context.getConfig()).thenReturn(dc);
+
+    UnlimitedRawBatchBuffer rawBuffer = new UnlimitedRawBatchBuffer(context, FRAGMENT_COUNT);
+
+    RawFragmentBatch batch = Mockito.mock(RawFragmentBatch.class);
+
+    Mockito.when(batch.getSender()).thenReturn(mySender);
+    Mockito.doAnswer(new Answer<Void>() {
+      public Void answer(InvocationOnMock ignore) throws Throwable {
+        mySender.send(DataRpcConfig.OK);
+        return null;
+      }
+    }).when(batch).sendOk();
+
+    FragmentRecordBatch header = FragmentRecordBatch.newBuilder().setIsOutOfMemory(false).setIsLastBatch(false).build();
+    Mockito.when(batch.getHeader()).thenReturn(header);
+
+    /// start the real test
+    int incomingBufferSize = dc.getInt(ExecConstants.INCOMING_BUFFER_SIZE);
+    int softLimit = incomingBufferSize * FRAGMENT_COUNT;
+
+    // No back pressure should be kicked in
+    for ( int i = 0; i < softLimit-1; i++) {
+      rawBuffer.enqueue(batch);
+    }
+
+    // number of responses sent == number of enqueued elements
+    assertEquals(softLimit - 1, mySender.getSendCount());
+    rawBuffer.getNext();
+
+    // set senderCount to 0
+    mySender.resetSender();
+
+    // test back pressure
+    // number of elements in the queue = softLimit -2
+    // enqueue softlimit elements more
+    for ( int i = 0; i < softLimit; i++) {
+      rawBuffer.enqueue(batch);
+    }
+    // we are exceeding softlimit, so senderCount should not increase
+    assertEquals(1, mySender.getSendCount());
+
+    // other responses should be saved in the responsequeue
+    for (int i = 0; i < softLimit-2; i++ ) {
+      rawBuffer.getNext();
+    }
+
+    // still should not send responses, as queue.size should higher then softLimit
+    assertEquals(1, mySender.getSendCount());
+
+    // size of the queue == softLimit now
+    for (int i = softLimit; i > 0 ; i-- ) {
+      int senderCount = mySender.getSendCount();
+      rawBuffer.getNext();
+      int expectedCountNumber = softLimit - i + senderCount+1;
+      assertEquals((expectedCountNumber < softLimit ? expectedCountNumber : softLimit), mySender.getSendCount());
+    }
+  }
+
+}


[3/3] drill git commit: DRILL-1859 Issue with killing/stopping operator processing - limit is one of the users of this logic

Posted by pa...@apache.org.
DRILL-1859 Issue with killing/stopping operator processing - limit is one of the users of this logic


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/69db15eb
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/69db15eb
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/69db15eb

Branch: refs/heads/master
Commit: 69db15ebbdc3a8f4a038e6f47a0675c32d14cdf4
Parents: 2a41077
Author: Yuliya Feldman <yf...@maprtech.com>
Authored: Mon Jan 12 14:11:45 2015 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Tue Jan 13 14:29:28 2015 -0800

----------------------------------------------------------------------
 .../exec/work/batch/ResponseSenderQueue.java    |  5 +++
 .../work/batch/UnlimitedRawBatchBuffer.java     | 36 ++++++++++------
 .../work/batch/TestUnlimitedBatchBuffer.java    | 44 ++++++++++++++++----
 3 files changed, 65 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/69db15eb/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
index a7535c3..141c434 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ResponseSenderQueue.java
@@ -22,6 +22,7 @@ import java.util.Queue;
 import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.data.DataRpcConfig;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 
 public class ResponseSenderQueue {
@@ -55,4 +56,8 @@ public class ResponseSenderQueue {
     return i;
   }
 
+  @VisibleForTesting
+  boolean isEmpty() {
+    return q.isEmpty();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/69db15eb/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 35aec93..895918c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.record.RawFragmentBatch;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 
 public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
@@ -63,12 +64,12 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
 
   @Override
   public void enqueue(RawFragmentBatch batch) throws IOException {
-    if (state == BufferState.KILLED) {
-      batch.release();
-    }
     if (isFinished()) {
       if (state == BufferState.KILLED) {
+        // do not even enqueue just release and send ack back
         batch.release();
+        batch.sendOk();
+        return;
       } else {
         throw new IOException("Attempted to enqueue batch after finished");
       }
@@ -107,29 +108,29 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
       if (!context.isFailed() && !context.isCancelled()) {
         context.fail(new IllegalStateException("Batches still in queue during cleanup"));
         logger.error("{} Batches in queue.", buffer.size());
-        RawFragmentBatch batch;
-        while ((batch = buffer.poll()) != null) {
-          logger.error("Batch left in queue: {}", batch);
-        }
-      }
-      RawFragmentBatch batch;
-      while ((batch = buffer.poll()) != null) {
-        if (batch.getBody() != null) {
-          batch.getBody().release();
-        }
       }
+      clearBufferWithBody();
     }
   }
 
   @Override
   public void kill(FragmentContext context) {
     state = BufferState.KILLED;
+    clearBufferWithBody();
+  }
+
+  /**
+   * Helper method to clear buffer with request bodies release
+   * also flushes ack queue - in case there are still responses pending
+   */
+  private void clearBufferWithBody() {
     while (!buffer.isEmpty()) {
       RawFragmentBatch batch = buffer.poll();
       if (batch.getBody() != null) {
         batch.getBody().release();
       }
     }
+    readController.flushResponses();
   }
 
   @Override
@@ -205,4 +206,13 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
     return (state == BufferState.KILLED || state == BufferState.FINISHED);
   }
 
+  @VisibleForTesting
+  ResponseSenderQueue getReadController() {
+    return readController;
+  }
+
+  @VisibleForTesting
+  boolean isBufferEmpty() {
+    return buffer.isEmpty();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/69db15eb/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
index 15ee3f3..a710d21 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.work.batch;
 
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
@@ -29,6 +30,7 @@ import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.data.DataRpcConfig;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -47,6 +49,11 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
 
   private static int FRAGMENT_COUNT = 5;
   private DrillConfig dc = DrillConfig.create();
+  private MySender mySender;
+  private UnlimitedRawBatchBuffer rawBuffer;
+  private RawFragmentBatch batch;
+  private FragmentContext context;
+  private int softLimit;
 
   private static class MySender implements ResponseSender {
 
@@ -66,17 +73,17 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
       sendCount = 0;
     }
   }
-  @Test
-  public void testBackPressure() throws Exception {
 
-    final MySender mySender = new MySender();
-    FragmentContext context = Mockito.mock(FragmentContext.class);
+  @Before
+  public void setUp() {
+    mySender = new MySender();
+    context = Mockito.mock(FragmentContext.class);
 
     Mockito.when(context.getConfig()).thenReturn(dc);
 
-    UnlimitedRawBatchBuffer rawBuffer = new UnlimitedRawBatchBuffer(context, FRAGMENT_COUNT);
+    rawBuffer = new UnlimitedRawBatchBuffer(context, FRAGMENT_COUNT);
 
-    RawFragmentBatch batch = Mockito.mock(RawFragmentBatch.class);
+    batch = Mockito.mock(RawFragmentBatch.class);
 
     Mockito.when(batch.getSender()).thenReturn(mySender);
     Mockito.doAnswer(new Answer<Void>() {
@@ -91,8 +98,11 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
 
     /// start the real test
     int incomingBufferSize = dc.getInt(ExecConstants.INCOMING_BUFFER_SIZE);
-    int softLimit = incomingBufferSize * FRAGMENT_COUNT;
+    softLimit = incomingBufferSize * FRAGMENT_COUNT;
+  }
 
+  @Test
+  public void testBackPressure() throws Exception {
     // No back pressure should be kicked in
     for ( int i = 0; i < softLimit-1; i++) {
       rawBuffer.enqueue(batch);
@@ -131,4 +141,24 @@ public class TestUnlimitedBatchBuffer extends ExecTest {
     }
   }
 
+  @Test
+  public void testAcksWithKill() throws Exception {
+    // Back pressure should be kicked in
+    for ( int i = 0; i < 2*softLimit; i++) {
+      rawBuffer.enqueue(batch);
+    }
+    assertEquals(softLimit - 1, mySender.getSendCount());
+    assertTrue(!rawBuffer.getReadController().isEmpty());
+
+    rawBuffer.kill(context);
+
+    // UnlimitedBatchBuffer queue should be cleared
+    assertTrue(rawBuffer.isBufferEmpty());
+
+    // acks queue should be cleared as well
+    assertTrue(rawBuffer.getReadController().isEmpty());
+
+    // all acks should be sent
+    assertEquals(2*softLimit, mySender.getSendCount());
+  }
 }