You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/05/05 13:27:29 UTC

[5/5] drill git commit: DRILL-5423: Refactor ScanBatch to allow unit testing record readers

DRILL-5423: Refactor ScanBatch to allow unit testing record readers

Refactors ScanBatch to allow unit testing of record reader
implementations, especially the “writer” classes.

See JIRA for details.

closes #811


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/41ffed50
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/41ffed50
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/41ffed50

Branch: refs/heads/master
Commit: 41ffed50fbb9319b4a796555396b88de010cb10b
Parents: 0939485
Author: Paul Rogers <pr...@maprtech.com>
Authored: Sat Apr 8 20:52:04 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Fri May 5 15:46:01 2017 +0300

----------------------------------------------------------------------
 .../exec/ops/AbstractOperatorExecContext.java   |  90 ++++++++++++++++
 .../apache/drill/exec/ops/OperatorContext.java  |  51 ++--------
 .../drill/exec/ops/OperatorContextImpl.java     | 102 ++++++-------------
 .../drill/exec/ops/OperatorExecContext.java     |  46 +++++++++
 .../drill/exec/ops/OperatorUtilities.java       |  48 +++++++++
 .../drill/exec/physical/impl/BaseRootExec.java  |   5 +-
 .../drill/exec/physical/impl/ScanBatch.java     |  62 ++++++++---
 .../drill/exec/memory/TestAllocators.java       |   7 +-
 .../drill/exec/record/TestRecordIterator.java   |   5 +-
 9 files changed, 280 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
new file mode 100644
index 0000000..a517fdf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.testing.ExecutionControls;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Implementation of {@link OperatorExecContext} that provides services
+ * needed by most run-time operators. Excludes services that need the
+ * entire Drillbit. Allows easy testing of operator code that uses this
+ * interface.
+ */
+
+public class AbstractOperatorExecContext implements OperatorExecContext {
+
+  protected final BufferAllocator allocator;
+  protected final ExecutionControls executionControls;
+  protected final PhysicalOperator popConfig;
+  protected final BufferManager manager;
+  protected final OperatorStatReceiver statsWriter;
+
+  public AbstractOperatorExecContext(BufferAllocator allocator, PhysicalOperator popConfig,
+                                     ExecutionControls executionControls,
+                                     OperatorStatReceiver stats) {
+    this.allocator = allocator;
+    this.popConfig = popConfig;
+    manager = new BufferManagerImpl(allocator);
+    statsWriter = stats;
+    this.executionControls = executionControls;
+  }
+
+  @Override
+  public DrillBuf replace(DrillBuf old, int newSize) {
+    return manager.replace(old, newSize);
+  }
+
+  @Override
+  public DrillBuf getManagedBuffer() {
+    return manager.getManagedBuffer();
+  }
+
+  @Override
+  public DrillBuf getManagedBuffer(int size) {
+    return manager.getManagedBuffer(size);
+  }
+
+  @Override
+  public ExecutionControls getExecutionControls() { return executionControls; }
+
+  @Override
+  public OperatorStatReceiver getStatsWriter() { return statsWriter; }
+
+  @Override
+  public BufferAllocator getAllocator() {
+    if (allocator == null) {
+      throw new UnsupportedOperationException("Operator context does not have an allocator");
+    }
+    return allocator;
+  }
+
+  @Override
+  public void close() {
+    try {
+      manager.close();
+    } finally {
+      if (allocator != null) {
+        allocator.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index d6045fc..b248d5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,44 +18,28 @@
 package org.apache.drill.exec.ops;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import io.netty.buffer.DrillBuf;
+public interface OperatorContext extends OperatorExecContext {
 
-public abstract class OperatorContext {
+  OperatorStats getStats();
 
-  public abstract DrillBuf replace(DrillBuf old, int newSize);
+  ExecutorService getExecutor();
 
-  public abstract DrillBuf getManagedBuffer();
+  ExecutorService getScanExecutor();
 
-  public abstract DrillBuf getManagedBuffer(int size);
+  ExecutorService getScanDecodeExecutor();
 
-  public abstract BufferAllocator getAllocator();
+  DrillFileSystem newFileSystem(Configuration conf) throws IOException;
 
-  public abstract OperatorStats getStats();
-
-  public abstract ExecutorService getExecutor();
-
-  public abstract ExecutorService getScanExecutor();
-
-  public abstract ExecutorService getScanDecodeExecutor();
-
-  public abstract ExecutionControls getExecutionControls();
-
-  public abstract DrillFileSystem newFileSystem(Configuration conf) throws IOException;
-
-  public abstract DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException;
+  DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException;
 
   /**
    * Run the callable as the given proxy user.
@@ -65,21 +49,6 @@ public abstract class OperatorContext {
    * @param <RESULT> result type
    * @return Future<RESULT> future with the result of calling the callable
    */
-  public abstract <RESULT> ListenableFuture<RESULT> runCallableAs(UserGroupInformation proxyUgi,
+  <RESULT> ListenableFuture<RESULT> runCallableAs(UserGroupInformation proxyUgi,
                                                                   Callable<RESULT> callable);
-
-  public static int getChildCount(PhysicalOperator popConfig) {
-    Iterator<PhysicalOperator> iter = popConfig.iterator();
-    int i = 0;
-    while (iter.hasNext()) {
-      iter.next();
-      i++;
-    }
-
-    if (i == 0) {
-      i = 1;
-    }
-    return i;
-  }
-
-}
+ }

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index c19cc1f..37c609e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.ops;
 
-import io.netty.buffer.DrillBuf;
-
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.Callable;
@@ -26,10 +24,8 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.testing.ExecutionControls;
 import org.apache.drill.exec.work.WorkManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -39,15 +35,11 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
-class OperatorContextImpl extends OperatorContext implements AutoCloseable {
+class OperatorContextImpl extends AbstractOperatorExecContext implements OperatorContext, AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
 
-  private final BufferAllocator allocator;
-  private final ExecutionControls executionControls;
   private boolean closed = false;
-  private final PhysicalOperator popConfig;
   private final OperatorStats stats;
-  private final BufferManager manager;
   private DrillFileSystem fs;
   private final ExecutorService executor;
   private final ExecutorService scanExecutor;
@@ -62,75 +54,43 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
   private ListeningExecutorService delegatePool;
 
   public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException {
-    this.allocator = context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
-        popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
-    this.popConfig = popConfig;
-    this.manager = new BufferManagerImpl(allocator);
-
-    OpProfileDef def =
-        new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
-    stats = context.getStats().newOperatorStats(def, allocator);
-    executionControls = context.getExecutionControls();
-    executor = context.getDrillbitContext().getExecutor();
-    scanExecutor = context.getDrillbitContext().getScanExecutor();
-    scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor();
+    this(popConfig, context, null);
   }
 
   public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats)
       throws OutOfMemoryException {
-    this.allocator = context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
-        popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
-    this.popConfig = popConfig;
-    this.manager = new BufferManagerImpl(allocator);
-    this.stats     = stats;
-    executionControls = context.getExecutionControls();
+    super(context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
+          popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation()),
+          popConfig, context.getExecutionControls(), stats);
+    if (stats != null) {
+      this.stats = stats;
+    } else {
+      OpProfileDef def =
+          new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(),
+                           OperatorUtilities.getChildCount(popConfig));
+      this.stats = context.getStats().newOperatorStats(def, allocator);
+    }
     executor = context.getDrillbitContext().getExecutor();
     scanExecutor = context.getDrillbitContext().getScanExecutor();
     scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor();
   }
 
-  @Override
-  public DrillBuf replace(DrillBuf old, int newSize) {
-    return manager.replace(old, newSize);
-  }
-
-  @Override
-  public DrillBuf getManagedBuffer() {
-    return manager.getManagedBuffer();
-  }
-
-  @Override
-  public DrillBuf getManagedBuffer(int size) {
-    return manager.getManagedBuffer(size);
-  }
-
   // Allow an operator to use the thread pool
   @Override
   public ExecutorService getExecutor() {
     return executor;
   }
+
   @Override
   public ExecutorService getScanExecutor() {
     return scanExecutor;
   }
+
   @Override
   public ExecutorService getScanDecodeExecutor() {
     return scanDecodeExecutor;
   }
 
-  @Override
-  public ExecutionControls getExecutionControls() {
-    return executionControls;
-  }
-
-  @Override
-  public BufferAllocator getAllocator() {
-    if (allocator == null) {
-      throw new UnsupportedOperationException("Operator context does not have an allocator");
-    }
-    return allocator;
-  }
-
   public boolean isClosed() {
     return closed;
   }
@@ -143,20 +103,19 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     }
     logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null);
 
-    manager.close();
-
-    if (allocator != null) {
-      allocator.close();
-    }
-
-    if (fs != null) {
-      try {
-        fs.close();
-      } catch (IOException e) {
-        throw new DrillRuntimeException(e);
+    closed = true;
+    try {
+      super.close();
+    } finally {
+      if (fs != null) {
+        try {
+          fs.close();
+          fs = null;
+        } catch (IOException e) {
+          throw new DrillRuntimeException(e);
+        }
       }
     }
-    closed = true;
   }
 
   @Override
@@ -201,14 +160,13 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     return fs;
   }
 
-  @Override
-  /*
-     Creates a DrillFileSystem that does not automatically track operator stats.
+  /**
+   * Creates a DrillFileSystem that does not automatically track operator stats.
    */
+  @Override
   public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException {
     Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
     fs = new DrillFileSystem(conf, null);
     return fs;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
new file mode 100644
index 0000000..4d64aba
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.testing.ExecutionControls;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * Narrowed version of the {@link OperatorContext} used to create an
+ * easy-to-test version of the operator context that excludes services
+ * that require a full Drillbit server.
+ */
+
+public interface OperatorExecContext {
+
+  DrillBuf replace(DrillBuf old, int newSize);
+
+  DrillBuf getManagedBuffer();
+
+  DrillBuf getManagedBuffer(int size);
+
+  BufferAllocator getAllocator();
+
+  ExecutionControls getExecutionControls();
+
+  OperatorStatReceiver getStatsWriter();
+
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java
new file mode 100644
index 0000000..2e6e759
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorUtilities.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+/**
+ * Utility methods, formerly on the OperatorContext class, that work with
+ * operators. The utilities here are available to operators at unit test
+ * time, while methods in OperatorContext are available only in production
+ * code.
+ */
+
+public class OperatorUtilities {
+
+  private OperatorUtilities() { }
+
+  public static int getChildCount(PhysicalOperator popConfig) {
+    Iterator<PhysicalOperator> iter = popConfig.iterator();
+    int count = 0;
+    while (iter.hasNext()) {
+      iter.next();
+      count++;
+    }
+
+    if (count == 0) {
+      count = 1;
+    }
+    return count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index f720f8e..d01e294 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.OperatorUtilities;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.CloseableRecordBatch;
@@ -44,7 +45,7 @@ public abstract class BaseRootExec implements RootExec {
   public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
     this.oContext = fragmentContext.newOperatorContext(config, stats);
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
-        config.getOperatorType(), OperatorContext.getChildCount(config)),
+        config.getOperatorType(), OperatorUtilities.getChildCount(config)),
         oContext.getAllocator());
     fragmentContext.getStats().addOperatorStats(this.stats);
     this.fragmentContext = fragmentContext;
@@ -54,7 +55,7 @@ public abstract class BaseRootExec implements RootExec {
       final PhysicalOperator config) throws OutOfMemoryException {
     this.oContext = oContext;
     stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
-        config.getOperatorType(), OperatorContext.getChildCount(config)),
+        config.getOperatorType(), OperatorUtilities.getChildCount(config)),
       oContext.getAllocator());
     fragmentContext.getStats().addOperatorStats(this.stats);
     this.fragmentContext = fragmentContext;

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 011e751..5a9af39 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -35,6 +35,7 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorExecContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -56,6 +57,7 @@ import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 
 /**
@@ -68,19 +70,14 @@ public class ScanBatch implements CloseableRecordBatch {
   /** Main collection of fields' value vectors. */
   private final VectorContainer container = new VectorContainer();
 
-  /** Fields' value vectors indexed by fields' keys. */
-  private final CaseInsensitiveMap<ValueVector> fieldVectorMap =
-          CaseInsensitiveMap.newHashMap();
-
   private int recordCount;
   private final FragmentContext context;
   private final OperatorContext oContext;
   private Iterator<RecordReader> readers;
   private RecordReader currentReader;
   private BatchSchema schema;
-  private final Mutator mutator = new Mutator();
+  private final Mutator mutator;
   private boolean done = false;
-  private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
   private boolean hasReadNonEmptyFile = false;
   private Map<String, ValueVector> implicitVectors;
   private Iterator<Map<String, String>> implicitColumns;
@@ -98,6 +95,7 @@ public class ScanBatch implements CloseableRecordBatch {
     currentReader = readers.next();
     this.oContext = oContext;
     allocator = oContext.getAllocator();
+    mutator = new Mutator(oContext, allocator, container);
 
     boolean setup = false;
     try {
@@ -158,7 +156,7 @@ public class ScanBatch implements CloseableRecordBatch {
   }
 
   private void clearFieldVectorMap() {
-    for (final ValueVector v : fieldVectorMap.values()) {
+    for (final ValueVector v : mutator.fieldVectorMap().values()) {
       v.clear();
     }
   }
@@ -173,7 +171,7 @@ public class ScanBatch implements CloseableRecordBatch {
       try {
         injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class);
 
-        currentReader.allocate(fieldVectorMap);
+        currentReader.allocate(mutator.fieldVectorMap());
       } catch (OutOfMemoryException e) {
         logger.debug("Caught Out of Memory Exception", e);
         clearFieldVectorMap();
@@ -204,10 +202,8 @@ public class ScanBatch implements CloseableRecordBatch {
           // If all the files we have read so far are just empty, the schema is not useful
           if (! hasReadNonEmptyFile) {
             container.clear();
-            for (ValueVector v : fieldVectorMap.values()) {
-              v.clear();
-            }
-            fieldVectorMap.clear();
+            clearFieldVectorMap();
+            mutator.clear();
           }
 
           currentReader.close();
@@ -215,7 +211,7 @@ public class ScanBatch implements CloseableRecordBatch {
           implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
           currentReader.setup(oContext, mutator);
           try {
-            currentReader.allocate(fieldVectorMap);
+            currentReader.allocate(mutator.fieldVectorMap());
           } catch (OutOfMemoryException e) {
             logger.debug("Caught OutOfMemoryException");
             clearFieldVectorMap();
@@ -323,11 +319,41 @@ public class ScanBatch implements CloseableRecordBatch {
     return container.getValueAccessorById(clazz, ids);
   }
 
-  private class Mutator implements OutputMutator {
+  /**
+   * Row set mutator implementation provided to record readers created by
+   * this scan batch. Made visible so that tests can create this mutator
+   * without also needing a ScanBatch instance. (This class is really independent
+   * of the ScanBatch, but resides here for historical reasons. This is,
+   * in turn, the only use of the genereated vector readers in the vector
+   * package.)
+   */
+
+  @VisibleForTesting
+  public static class Mutator implements OutputMutator {
     /** Whether schema has changed since last inquiry (via #isNewSchema}).  Is
      *  true before first inquiry. */
     private boolean schemaChanged = true;
 
+    /** Fields' value vectors indexed by fields' keys. */
+    private final CaseInsensitiveMap<ValueVector> fieldVectorMap =
+            CaseInsensitiveMap.newHashMap();
+
+    private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+    private final BufferAllocator allocator;
+
+    private final VectorContainer container;
+
+    private final OperatorExecContext oContext;
+
+    public Mutator(OperatorExecContext oContext, BufferAllocator allocator, VectorContainer container) {
+      this.oContext = oContext;
+      this.allocator = allocator;
+      this.container = container;
+    }
+
+    public Map<String, ValueVector> fieldVectorMap() {
+      return fieldVectorMap;
+    }
 
     @SuppressWarnings("resource")
     @Override
@@ -396,6 +422,10 @@ public class ScanBatch implements CloseableRecordBatch {
     public CallBack getCallBack() {
       return callBack;
     }
+
+    public void clear() {
+      fieldVectorMap.clear();
+    }
   }
 
   @Override
@@ -414,7 +444,7 @@ public class ScanBatch implements CloseableRecordBatch {
     for (final ValueVector v : implicitVectors.values()) {
       v.clear();
     }
-    fieldVectorMap.clear();
+    mutator.clear();
     currentReader.close();
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
index 288e78d..0dc2925 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.OperatorUtilities;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
@@ -218,7 +219,7 @@ public class TestAllocators extends DrillTest {
 
       // Use some bogus operator type to create a new operator context.
       def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
-          OperatorContext.getChildCount(physicalOperator1));
+          OperatorUtilities.getChildCount(physicalOperator1));
       stats = fragmentContext1.getStats().newOperatorStats(def, fragmentContext1.getAllocator());
 
       // Add a couple of Operator Contexts
@@ -232,7 +233,7 @@ public class TestAllocators extends DrillTest {
       OperatorContext oContext21 = fragmentContext1.newOperatorContext(physicalOperator3);
 
       def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE,
-          OperatorContext.getChildCount(physicalOperator4));
+          OperatorUtilities.getChildCount(physicalOperator4));
       stats = fragmentContext2.getStats().newOperatorStats(def, fragmentContext2.getAllocator());
       OperatorContext oContext22 = fragmentContext2.newOperatorContext(physicalOperator4, stats);
       DrillBuf b22 = oContext22.getAllocator().buffer(2000000);
@@ -246,7 +247,7 @@ public class TestAllocators extends DrillTest {
 
       // New fragment starts an operator that allocates an amount within the limit
       def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE,
-          OperatorContext.getChildCount(physicalOperator5));
+          OperatorUtilities.getChildCount(physicalOperator5));
       stats = fragmentContext3.getStats().newOperatorStats(def, fragmentContext3.getAllocator());
       OperatorContext oContext31 = fragmentContext3.newOperatorContext(physicalOperator5, stats);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/41ffed50/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
index c2429b7..847caa5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/TestRecordIterator.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OpProfileDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.OperatorUtilities;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -73,7 +74,7 @@ public class TestRecordIterator extends PopUnitTestBase {
     RecordBatch singleBatch = exec.getIncoming();
     PhysicalOperator dummyPop = operatorList.iterator().next();
     OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
-      OperatorContext.getChildCount(dummyPop));
+      OperatorUtilities.getChildCount(dummyPop));
     OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator());
     RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0, false);
     int totalRecords = 0;
@@ -130,7 +131,7 @@ public class TestRecordIterator extends PopUnitTestBase {
     RecordBatch singleBatch = exec.getIncoming();
     PhysicalOperator dummyPop = operatorList.iterator().next();
     OpProfileDef def = new OpProfileDef(dummyPop.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
-      OperatorContext.getChildCount(dummyPop));
+        OperatorUtilities.getChildCount(dummyPop));
     OperatorStats stats = exec.getContext().getStats().newOperatorStats(def, exec.getContext().getAllocator());
     RecordIterator iter = new RecordIterator(singleBatch, null, exec.getContext().newOperatorContext(dummyPop, stats), 0);
     List<ValueVector> vectors = null;