You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/01/30 21:54:12 UTC

drill git commit: DRILL-6071: Limit batch size for flatten operator

Repository: drill
Updated Branches:
  refs/heads/master a9ea4ec1c -> 039530a41


DRILL-6071: Limit batch size for flatten operator

closes #1091


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/039530a4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/039530a4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/039530a4

Branch: refs/heads/master
Commit: 039530a4195ba8fa4532b9ca92980206fa66c181
Parents: a9ea4ec
Author: Padma Penumarthy <pp...@yahoo.com>
Authored: Wed Jan 10 05:06:58 2018 -0800
Committer: Boaz Ben-Zvi <bo...@apache.org>
Committed: Tue Jan 30 12:00:47 2018 -0800

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   3 +
 .../impl/flatten/FlattenRecordBatch.java        |  55 ++
 .../physical/impl/flatten/FlattenTemplate.java  | 142 +--
 .../exec/physical/impl/flatten/Flattener.java   |   1 +
 .../physical/impl/spill/RecordBatchSizer.java   |  50 +-
 .../server/options/SystemOptionManager.java     |   3 +-
 .../src/main/resources/drill-module.conf        |   1 +
 .../physical/unit/BasicPhysicalOpUnitTest.java  |  29 +
 .../physical/unit/MiniPlanUnitTestBase.java     |   2 +-
 .../physical/unit/PhysicalOpUnitTestBase.java   |  45 +-
 .../exec/physical/unit/TestOutputBatchSize.java | 882 +++++++++++++++++++
 .../org/apache/drill/test/DrillTestWrapper.java |  39 +-
 12 files changed, 1095 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 25f6135..c949e51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -76,6 +76,9 @@ public final class ExecConstants {
   public static final String SPILL_FILESYSTEM = "drill.exec.spill.fs";
   public static final String SPILL_DIRS = "drill.exec.spill.directories";
 
+  public static final String OUTPUT_BATCH_SIZE = "drill.exec.memory.operator.output_batch_size";
+  public static final LongValidator OUTPUT_BATCH_SIZE_VALIDATOR = new RangeLongValidator(OUTPUT_BATCH_SIZE, 1024, 512 * 1024 * 1024);
+
   // External Sort Boot configuration
 
   public static final String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size";

http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 8be16ad..9483f29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -40,6 +41,7 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.FlattenPOP;
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
@@ -68,6 +70,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
   private boolean hasRemainder = false;
   private int remainderIndex = 0;
   private int recordCount;
+  private long outputBatchSize;
 
   private final Flattener.Monitor monitor = new Flattener.Monitor() {
     @Override
@@ -94,8 +97,57 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
     }
   }
 
+  private class FlattenMemoryManager {
+    private final int outputRowCount;
+    private static final int OFFSET_VECTOR_WIDTH = 4;
+    private static final int WORST_CASE_FRAGMENTATION_FACTOR = 2;
+    private static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT;
+    private static final int MIN_NUM_ROWS = 1;
+
+    private FlattenMemoryManager(RecordBatch incoming, long outputBatchSize, SchemaPath flattenColumn) {
+      // Get sizing information for the batch.
+      RecordBatchSizer sizer = new RecordBatchSizer(incoming);
+
+      final TypedFieldId typedFieldId = incoming.getValueVectorId(flattenColumn);
+      final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
+
+      // Get column size of flatten column.
+      RecordBatchSizer.ColumnSize columnSize = RecordBatchSizer.getColumn(incoming.getValueAccessorById(field.getValueClass(),
+          typedFieldId.getFieldIds()).getValueVector(), field.getName());
+
+      // Average rowWidth of flatten column
+      final int avgRowWidthFlattenColumn = RecordBatchSizer.safeDivide(columnSize.netSize, incoming.getRecordCount());
+
+      // Average rowWidth excluding the flatten column.
+      final int avgRowWidthWithOutFlattenColumn = sizer.netRowWidth() - avgRowWidthFlattenColumn;
+
+      // Average rowWidth of single element in the flatten list.
+      // subtract the offset vector size from column data size.
+      final int avgRowWidthSingleFlattenEntry =
+          RecordBatchSizer.safeDivide(columnSize.netSize - (OFFSET_VECTOR_WIDTH * columnSize.valueCount), columnSize.elementCount);
+
+      // Average rowWidth of outgoing batch.
+      final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry;
+
+      // Number of rows in outgoing batch
+      outputRowCount = Math.max(MIN_NUM_ROWS, Math.min(MAX_NUM_ROWS,
+          RecordBatchSizer.safeDivide((outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR), avgOutgoingRowWidth)));
+
+      logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," +
+              "avgOutgoingRowWidth : {}, outputRowCount : {}", sizer, outputBatchSize, avgOutgoingRowWidth, outputRowCount);
+    }
+
+    public int getOutputRowCount() {
+      return outputRowCount;
+    }
+  }
+
+
   public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
     super(pop, context, incoming);
+
+    // get the output batch size from config.
+    outputBatchSize = context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
   }
 
   @Override
@@ -148,6 +200,9 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
 
   @Override
   protected IterOutcome doWork() {
+    FlattenMemoryManager flattenMemoryManager = new FlattenMemoryManager(incoming, outputBatchSize, popConfig.getColumn());
+    flattener.setOutputCount(flattenMemoryManager.getOutputRowCount());
+
     int incomingRecordCount = incoming.getRecordCount();
 
     if (!doAlloc()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
index ed20429..cd58bfd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.record.TransferPair;
 
 import com.google.common.collect.ImmutableList;
 
+import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,8 +39,7 @@ import org.slf4j.LoggerFactory;
 public abstract class FlattenTemplate implements Flattener {
   private static final Logger logger = LoggerFactory.getLogger(FlattenTemplate.class);
 
-  private static final int OUTPUT_BATCH_SIZE = 4*1024;
-  private static final int OUTPUT_MEMORY_LIMIT = 512 * 1024 * 1024;
+  private static final int OUTPUT_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
 
   private ImmutableList<TransferPair> transfers;
   private BufferAllocator outputAllocator;
@@ -47,14 +47,12 @@ public abstract class FlattenTemplate implements Flattener {
   private RepeatedValueVector fieldToFlatten;
   private RepeatedValueVector.RepeatedAccessor accessor;
   private int valueIndex;
-  private boolean bigRecords = false;
-  private int bigRecordsBufferSize;
 
   /**
-   * The output batch limit starts at OUTPUT_BATCH_SIZE, but may be decreased
+   * The output batch limit starts at OUTPUT_ROW_COUNT, but may be decreased
    * if records are found to be large.
    */
-  private int outputLimit = OUTPUT_BATCH_SIZE;
+  private int outputLimit = OUTPUT_ROW_COUNT;
 
   // this allows for groups to be written between batches if we run out of space, for cases where we have finished
   // a batch on the boundary it will be set to 0
@@ -73,6 +71,11 @@ public abstract class FlattenTemplate implements Flattener {
   }
 
   @Override
+  public void setOutputCount(int outputCount) {
+    outputLimit = outputCount;
+  }
+
+  @Override
   public final int flattenRecords(final int recordCount, final int firstOutputIndex,
       final Flattener.Monitor monitor) {
     switch (svMode) {
@@ -101,75 +104,10 @@ public abstract class FlattenTemplate implements Flattener {
             for ( ; innerValueIndexLocal < innerValueCount; innerValueIndexLocal++) {
               // If we've hit the batch size limit, stop and flush what we've got so far.
               if (recordsThisCall == outputLimit) {
-                if (bigRecords) {
-                  /*
-                   * We got to the limit we used before, but did we go over
-                   * the bigRecordsBufferSize in the second half of the batch? If
-                   * so, we'll need to adjust the batch limits.
-                   */
-                  adjustBatchLimits(1, monitor, recordsThisCall);
-                }
-
                 // Flush this batch.
                 break outer;
               }
 
-              /*
-               * At the moment, the output record includes the input record, so for very
-               * large records that we're flattening, we're carrying forward the original
-               * record as well as the flattened element. We've seen a case where flattening a 4MB
-               * record with a 20,000 element array causing memory usage to explode. To avoid
-               * that until we can push down the selected fields to operators like this, we
-               * also limit the amount of memory in use at one time.
-               *
-               * We have to have written at least one record to be able to get a buffer that will
-               * have a real allocator, so we have to do this lazily. We won't check the limit
-               * for the first two records, but that keeps this simple.
-               */
-              if (bigRecords) {
-                /*
-                 * If we're halfway through the outputLimit, check on our memory
-                 * usage so far.
-                 */
-                if (recordsThisCall == outputLimit / 2) {
-                  /*
-                   * If we've used more than half the space we've used for big records
-                   * in the past, we've seen even bigger records than before, so stop and
-                   * see if we need to flush here before we go over bigRecordsBufferSize
-                   * memory usage, and reduce the outputLimit further before we continue
-                   * with the next batch.
-                   */
-                  if (adjustBatchLimits(2, monitor, recordsThisCall)) {
-                    break outer;
-                  }
-                }
-              } else {
-                if (outputAllocator.getAllocatedMemory() > OUTPUT_MEMORY_LIMIT) {
-                  /*
-                   * We're dealing with big records. Reduce the outputLimit to
-                   * the current record count, and take note of how much space the
-                   * vectors report using for that. We'll use those numbers as limits
-                   * going forward in order to avoid allocating more memory.
-                   */
-                  bigRecords = true;
-                  outputLimit = Math.min(recordsThisCall, outputLimit);
-                  if (outputLimit < 1) {
-                    throw new IllegalStateException("flatten outputLimit (" + outputLimit
-                        + ") won't make progress");
-                  }
-
-                  /*
-                   * This will differ from what the allocator reports because of
-                   * overhead. But the allocator check is much cheaper to do, so we
-                   * only compute this at selected times.
-                   */
-                  bigRecordsBufferSize = monitor.getBufferSizeFor(recordsThisCall);
-
-                  // Stop and flush.
-                  break outer;
-                }
-              }
-
               try {
                 doEval(valueIndexLocal, outputIndex);
               } catch (OversizedAllocationException ex) {
@@ -211,68 +149,6 @@ public abstract class FlattenTemplate implements Flattener {
     }
   }
 
-  /**
-   * Determine if the current batch record limit needs to be adjusted (when handling
-   * bigRecord mode). If so, adjust the limit, and return true, otherwise return false.
-   *
-   * <p>If the limit is adjusted, it will always be adjusted down, because we need to operate
-   * based on the largest sized record we've ever seen.</p>
-   *
-   * <p>If the limit is adjusted, then the current batch should be flushed, because
-   * continuing would lead to going over the large memory limit that has already been
-   * established.</p>
-   *
-   * @param multiplier Multiply currently used memory (according to the monitor) before
-   *   checking against past memory limits. This allows for checking the currently used
-   *   memory after processing a fraction of the expected batch limit, but using that as
-   *   a predictor of the full batch's size. For example, if this is checked after half
-   *   the batch size limit's records are processed, then using a multiplier of two will
-   *   do the check under the assumption that processing the full batch limit will use
-   *   twice as much memory.
-   * @param monitor the Flattener.Monitor instance to use for the current memory usage check
-   * @param recordsThisCall the number of records processed so far during this call to
-   *   flattenRecords().
-   * @return true if the batch size limit was adjusted, false otherwise
-   */
-  private boolean adjustBatchLimits(final int multiplier, final Flattener.Monitor monitor,
-      final int recordsThisCall) {
-    assert bigRecords : "adjusting batch limits when no big records";
-    final int bufferSize = multiplier * monitor.getBufferSizeFor(recordsThisCall);
-
-    /*
-     * If the amount of space we've used so far is below the amount that triggered
-     * the bigRecords mode, then no adjustment is needed.
-     */
-    if (bufferSize <= bigRecordsBufferSize) {
-      return false;
-    }
-
-    /*
-     * We've used more space than we've used for big records in the past, we've seen
-     * even bigger records, so we need to adjust our limits, and flush what we've got so far.
-     *
-     * We should reduce the outputLimit proportionately to get the predicted
-     * amount of memory used back down to bigRecordsBufferSize.
-     *
-     * The number of records to limit is therefore
-     * outputLimit *
-     *   (1 - (bufferSize - bigRecordsBufferSize) / bigRecordsBufferSize)
-     *
-     * Doing some algebra on the multiplier:
-     * (bigRecordsBufferSize - (bufferSize - bigRecordsBufferSize)) / bigRecordsBufferSize
-     * (bigRecordsBufferSize - bufferSize + bigRecordsBufferSize) / bigRecordsBufferSize
-     * (2 * bigRecordsBufferSize - bufferSize) / bigRecordsBufferSize
-     *
-     * If bufferSize has gotten so big that this would be negative, we'll
-     * just go down to one record per batch. We need to check for that on
-     * outputLimit anyway, in order to make sure that we make progress.
-     */
-    final int newLimit = (int)
-        (outputLimit * (2.0 * ((double) bigRecordsBufferSize) - bufferSize) / bigRecordsBufferSize);
-    outputLimit = Math.max(1, newLimit);
-    return true;
-  }
-
   @Override
   public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers)  throws SchemaChangeException{
 

http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
index 392757e..b1d93c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
@@ -45,6 +45,7 @@ public interface Flattener {
   int flattenRecords(int recordCount, int firstOutputIndex, Monitor monitor);
 
   void setFlattenField(RepeatedValueVector repeatedColumn);
+  void setOutputCount(int outputCount);
 
   RepeatedValueVector getFlattenField();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
index 5808f45..0fe67d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
@@ -93,9 +93,19 @@ public class RecordBatchSizer {
 
     public final int elementCount;
 
+    /**
+     * Size of the top level value vector. For map and repeated list,
+     * this is just size of offset vector.
+     */
     public int dataSize;
 
     /**
+     * Total size of the column includes the sum total of memory for all
+     * value vectors representing the column.
+     */
+    public int netSize;
+
+    /**
      * The estimated, average number of elements per parent value.
      * Always 1 for a non-repeated type. For a repeated type,
      * this is the average entries per array (per repeated element).
@@ -131,9 +141,15 @@ public class RecordBatchSizer {
         break;
       default:
         dataSize = v.getPayloadByteCount(valueCount);
-        stdSize = TypeHelper.getSize(metadata.getType()) * elementCount;
+        try {
+          stdSize = TypeHelper.getSize(metadata.getType()) * elementCount;
+        } catch (Exception e) {
+          // For unsupported types, just set stdSize to 0.
+          stdSize = 0;
+        }
       }
       estSize = safeDivide(dataSize, valueCount);
+      netSize = v.getPayloadByteCount(valueCount);
     }
 
     @SuppressWarnings("resource")
@@ -154,8 +170,14 @@ public class RecordBatchSizer {
       return childCount;
     }
 
+    @SuppressWarnings("resource")
     private void buildList(ValueVector v) {
-      @SuppressWarnings("resource")
+      // complex ListVector cannot be casted to RepeatedListVector.
+      // check the mode.
+      if (v.getField().getDataMode() != DataMode.REPEATED) {
+        dataSize = v.getPayloadByteCount(valueCount);
+        return;
+      }
       UInt4Vector offsetVector = ((RepeatedListVector) v).getOffsetVector();
       dataSize = offsetVector.getPayloadByteCount(valueCount);
     }
@@ -232,6 +254,10 @@ public class RecordBatchSizer {
     }
   }
 
+  public static ColumnSize getColumn(ValueVector v, String prefix) {
+    return new ColumnSize(v, prefix);
+  }
+
   public static final int MAX_VECTOR_SIZE = ValueVector.MAX_BUFFER_SIZE; // 16 MiB
 
   private List<ColumnSize> columnSizes = new ArrayList<>();
@@ -380,14 +406,18 @@ public class RecordBatchSizer {
     // vectors do consume space, so visit columns recursively.
 
     switch (v.getField().getType().getMinorType()) {
-    case MAP:
-      expandMap((AbstractMapVector) v, prefix + v.getField().getName() + ".");
-      break;
-    case LIST:
-      expandList((RepeatedListVector) v, prefix + v.getField().getName() + ".");
-      break;
-    default:
-      v.collectLedgers(ledgers);
+      case MAP:
+        expandMap((AbstractMapVector) v, prefix + v.getField().getName() + ".");
+        break;
+      case LIST:
+        // complex ListVector cannot be casted to RepeatedListVector.
+        // do not expand the list if it is not repeated mode.
+        if (v.getField().getDataMode() == DataMode.REPEATED) {
+          expandList((RepeatedListVector) v, prefix + v.getField().getName() + ".");
+        }
+        break;
+      default:
+        v.collectLedgers(ledgers);
     }
 
     netRowWidth += colSize.estSize;

http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 369f3bc..4dba96d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -211,7 +211,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.PERSISTENT_TABLE_UMASK_VALIDATOR),
       new OptionDefinition(ExecConstants.CPU_LOAD_AVERAGE),
       new OptionDefinition(ExecConstants.ENABLE_VECTOR_VALIDATOR),
-      new OptionDefinition(ExecConstants.ENABLE_ITERATOR_VALIDATOR)
+      new OptionDefinition(ExecConstants.ENABLE_ITERATOR_VALIDATOR),
+      new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false))
     };
 
     final CaseInsensitiveMap<OptionDefinition> map = CaseInsensitiveMap.newHashMap();

http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 28b7975..5659c82 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -420,6 +420,7 @@ drill.exec.options: {
     drill.exec.storage.implicit.fqn.column.label: "fqn",
     drill.exec.storage.implicit.suffix.column.label: "suffix",
     drill.exec.testing.controls: "{}",
+    drill.exec.memory.operator.output_batch_size : 33554432, # 32 MB
     exec.bulk_load_table_list.bulk_size: 1000,
     exec.compile.scalar_replacement: false,
     exec.enable_bulk_load_table_list: false,

http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
index ec85f21..fb8160a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
@@ -23,7 +23,9 @@ import java.util.List;
 
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.ComplexToJson;
 import org.apache.drill.exec.physical.config.ExternalSort;
 import org.apache.drill.exec.physical.config.Filter;
@@ -34,6 +36,7 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.TopN;
+import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.planner.physical.AggPrelBase;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -189,6 +192,32 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
   }
 
   @Test
+  public void testFlatten() {
+    final PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("b"));
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+
+    for (int j = 0; j < 1; j++) {
+      batchString.append("[");
+      for (int i = 0; i < 1; i++) {
+        batchString.append("{\"a\": 5, \"b\" : [5, 6, 7]}");
+      }
+      batchString.append("]");
+      inputJsonBatches.add(batchString.toString());
+    }
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(flatten)
+            .inputDataStreamJson(inputJsonBatches)
+            .baselineColumns("a", "b")
+            .baselineValues(5l, 5l)
+            .baselineValues(5l, 6l)
+            .baselineValues(5l, 7l);
+
+    opTestBuilder.go();
+  }
+
+  @Test
   public void testExternalSort() {
     ExternalSort sortConf = new ExternalSort(null,
         Lists.newArrayList(ordering("b", RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST)), false);

http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
index e6e72e7..7eafb86 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -152,7 +152,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
       }
       Map<String, List<Object>> actualSuperVectors = new TreeMap<String, List<Object>>();
 
-      int actualBatchNum = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema, actualSuperVectors);
+      int actualBatchNum = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema, null, null, actualSuperVectors);
       if (expectBatchNum != null) {
         if (expectBatchNum != actualBatchNum) {
           throw new AssertionError(String.format("Expected %s batches from operator tree. But operators return %s batch!", expectBatchNum, actualBatchNum));

http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
index b01dd3e..d1ad990 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java
@@ -74,6 +74,7 @@ import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -84,6 +85,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.TreeMap;
 
 public class PhysicalOpUnitTestBase extends ExecTest {
   protected MockExecutorFragmentContext fragContext;
@@ -209,16 +211,19 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     private List<List<String>> inputStreamsJSON;
     private long initReservation = AbstractBase.INIT_ALLOCATION;
     private long maxAllocation = AbstractBase.MAX_ALLOCATION;
+    private boolean checkBatchMemory;
+    private boolean expectNoRows;
+    private Long expectedBatchSize;
+    private Integer expectedNumBatches;
 
-    @SuppressWarnings({ "unchecked", "resource" })
+    @SuppressWarnings({"unchecked", "resource"})
     public void go() {
       BatchCreator<PhysicalOperator> opCreator;
       RecordBatch testOperator;
       try {
         mockOpContext(popConfig, initReservation, maxAllocation);
 
-        opCreator = (BatchCreator<PhysicalOperator>)
-            opCreatorReg.getOperatorCreator(popConfig.getClass());
+        opCreator = (BatchCreator<PhysicalOperator>) opCreatorReg.getOperatorCreator(popConfig.getClass());
         List<RecordBatch> incomingStreams = Lists.newArrayList();
         if (inputStreamsJSON != null) {
           for (List<String> batchesJson : inputStreamsJSON) {
@@ -229,8 +234,19 @@ public class PhysicalOpUnitTestBase extends ExecTest {
 
         testOperator = opCreator.getBatch(fragContext, popConfig, incomingStreams);
 
-        Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator));
-        Map<String, List<Object>> expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
+        Map<String, List<Object>> actualSuperVectors = DrillTestWrapper.addToCombinedVectorResults(new BatchIterator(testOperator), expectedBatchSize, expectedNumBatches);
+
+        Map<String, List<Object>> expectedSuperVectors;
+
+        if (expectNoRows) {
+          expectedSuperVectors = new TreeMap<>();
+          for (String column : baselineColumns) {
+            expectedSuperVectors.put(column, new ArrayList<>());
+          }
+        } else {
+          expectedSuperVectors = DrillTestWrapper.translateRecordListToHeapVectors(baselineRecords);
+        }
+
         DrillTestWrapper.compareMergedVectors(expectedSuperVectors, actualSuperVectors);
 
       } catch (ExecutionSetupException e) {
@@ -281,7 +297,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
       return this;
     }
 
-    public OperatorTestBuilder baselineValues(Object ... baselineValues) {
+    public OperatorTestBuilder baselineValues(Object... baselineValues) {
       if (baselineRecords == null) {
         baselineRecords = new ArrayList<>();
       }
@@ -296,6 +312,21 @@ public class PhysicalOpUnitTestBase extends ExecTest {
       this.baselineRecords.add(ret);
       return this;
     }
+
+    public OperatorTestBuilder expectZeroRows() {
+      this.expectNoRows = true;
+      return this;
+    }
+
+    public OperatorTestBuilder expectedNumBatches(Integer expectedNumBatches) {
+      this.expectedNumBatches = expectedNumBatches;
+      return this;
+    }
+
+    public OperatorTestBuilder expectedBatchSize(Long batchSize) {
+      this.expectedBatchSize = batchSize;
+      return this;
+    }
   }
 
   /**
@@ -448,7 +479,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     return getJsonReadersFromBatchString(jsonBatches, fragContext, Collections.singletonList(SchemaPath.getSimplePath("*")));
   }
 
-  private List<RecordReader> getReaderListForJsonBatches(List<String> jsonBatches, FragmentContext fragContext) {
+  public List<RecordReader> getReaderListForJsonBatches(List<String> jsonBatches, FragmentContext fragContext) {
     Iterator<RecordReader> readers = getRecordReadersForJsonBatches(jsonBatches, fragContext);
     List<RecordReader> readerList = new ArrayList<>();
     while(readers.hasNext()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
new file mode 100644
index 0000000..4a1dc8f
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -0,0 +1,882 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.unit;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.FlattenPOP;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.util.JsonStringHashMap;
+import org.apache.drill.exec.util.Text;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
+  private static final long initReservation = AbstractBase.INIT_ALLOCATION;
+  private static final long maxAllocation = AbstractBase.MAX_ALLOCATION;
+  // Keeping row count below 4096 so we do not produce more than one batch.
+  // scanBatch with json reader produces batches of 4k.
+  private int numRows = 4000;
+  private static final String wideString =
+    "b00dUrA0oa2i4ZEHg6zvPXPXlVQYB2BXe8T5gIEtvUDzcN6yUkIqyS07gaAy8k4ac6Bn1cxblsXFnkp8g8hiQkUMJPyl6" +
+    "l0jTdsIzQ4PkVCURGGyF0aduGqCXUaKp91gqkRMvLhHhmrHdEb22QN20dXEHSygR7vrb2zZhhfWeJbXRsesuYDqdGig801IAS6VWRIdQtJ6gaRhCdNz";
+
+  /**
+   *  Figures out what will be total size of the batches for a given Json input batch.
+   */
+  private long getExpectedSize(List<String> expectedJsonBatches) throws ExecutionSetupException {
+    // Create a dummy scanBatch to figure out the size.
+    RecordBatch scanBatch = new ScanBatch(new MockPhysicalOperator(), fragContext, getReaderListForJsonBatches(expectedJsonBatches, fragContext));
+    Iterable<VectorAccessible> batches = new BatchIterator(scanBatch);
+
+    long totalSize = 0;
+    for (VectorAccessible batch : batches) {
+      RecordBatchSizer sizer = new RecordBatchSizer(batch);
+      totalSize += sizer.netSize();
+    }
+    return totalSize;
+  }
+
+  @Test
+  public void testFlattenFixedWidth() throws Exception {
+    PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+    mockOpContext(flatten, initReservation, maxAllocation);
+
+    // create input rows like this.
+    // "a" : 5, "b" : wideString, "c" : [6,7,8,9]
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+    batchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [6, 7, 8, 9]},");
+    }
+    batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [6, 7, 8, 9]}");
+    batchString.append("]");
+    inputJsonBatches.add(batchString.toString());
+
+    // Figure out what will be approximate total output size out of flatten for input above
+    // We will use this sizing information to set output batch size so we can produce desired
+    // number of batches that can be verified.
+
+    // output rows will be like this.
+    // "a" : 5, "b" : wideString, "c" : 6
+    // "a" : 5, "b" : wideString, "c" : 7
+    // "a" : 5, "b" : wideString, "c" : 8
+    // "a" : 5, "b" : wideString, "c" : 9
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 6},");
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 7},");
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 8},");
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 9},");
+    }
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 6},");
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 7},");
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 8},");
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : 9}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(flatten)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b", "c")
+      .expectedNumBatches(4)  // verify number of batches
+      .expectedBatchSize(totalSize / 2); // verify batch size.
+
+    for (int i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, 6l);
+      opTestBuilder.baselineValues(5l, wideString, 7l);
+      opTestBuilder.baselineValues(5l, wideString, 8l);
+      opTestBuilder.baselineValues(5l, wideString, 9l);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testFlattenVariableWidth() throws Exception {
+    PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+    mockOpContext(flatten, initReservation, maxAllocation);
+
+    // create input rows like this.
+    // "a" : 5, "b" : wideString, "c" : ["parrot", "hummingbird", "owl", "woodpecker", "peacock"]
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+    batchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\",\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\", \"peacock\"]},");
+    }
+    batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\",\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\", \"peacock\"]}");
+    batchString.append("]");
+    inputJsonBatches.add(batchString.toString());
+
+    // Figure out what will be approximate total output size out of flatten for input above
+    // We will use this sizing information to set output batch size so we can produce desired
+    // number of batches that can be verified.
+
+    // output rows will be like this.
+    // "a" : 5, "b" : wideString, "c" : parrot
+    // "a" : 5, "b" : wideString, "c" : hummingbird
+    // "a" : 5, "b" : wideString, "c" : owl
+    // "a" : 5, "b" : wideString, "c" : woodpecker
+    // "a" : 5, "b" : wideString, "c" : peacock
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"parrot\"},");
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"hummingbird\"},");
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"owl\"},");
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"woodpecker\"},");
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"peacock\"},");
+    }
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"parrot\"},");
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"hummingbird\"},");
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"owl\"},");
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"woodpecker\"},");
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"peacock\"}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(flatten)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b", "c")
+      .expectedNumBatches(4) // verify number of batches
+      .expectedBatchSize(totalSize / 2); // verify batch size.
+
+    for (int i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, "parrot");
+      opTestBuilder.baselineValues(5l, wideString, "hummingbird");
+      opTestBuilder.baselineValues(5l, wideString, "owl");
+      opTestBuilder.baselineValues(5l, wideString, "woodpecker");
+      opTestBuilder.baselineValues(5l, wideString, "peacock");
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testFlattenFixedWidthList() throws Exception {
+    PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+    mockOpContext(flatten, initReservation, maxAllocation);
+
+    // create input rows like this.
+    // "a" : 5, "b" : wideString, "c" : [[1,2,3,4], [5,6,7,8]]
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+    batchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + "[1,2,3,4]," + "[5,6,7,8]" + "]");
+      batchString.append("},");
+    }
+    batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" + "[1,2,3,4]," + "[5,6,7,8]" + "]");
+    batchString.append("}]");
+    inputJsonBatches.add(batchString.toString());
+
+    // Figure out what will be approximate total output size out of flatten for input above
+    // We will use this sizing information to set output batch size so we can produce desired
+    // number of batches that can be verified.
+
+    // output rows will be like this.
+    // "a" : 5, "b" : wideString, "c" : [1,2,3,4]
+    // "a" : 5, "b" : wideString, "c" : [5,6,7,8]
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[1,2,3,4]\"},");
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[5,6,7,8]\"},");
+    }
+
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[1,2,3,4]\"},");
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : \"[5,6,7,8]\"}");
+
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(flatten)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b", "c")
+      .expectedNumBatches(4) // verify number of batches
+      .expectedBatchSize(totalSize);  // verify batch size.
+
+    for (int i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, new ArrayList<Long>(Arrays.asList(1L, 2L, 3L, 4L)));
+      opTestBuilder.baselineValues(5l, wideString, new ArrayList<Long>(Arrays.asList(5L, 6L, 7L, 8L)));
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testFlattenVariableWidthList() throws Exception {
+    PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+    mockOpContext(flatten, initReservation, maxAllocation);
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+
+    // create input rows like this.
+    // "a" : 5, "b" : wideString, "c" : [["parrot", "hummingbird", "owl", "woodpecker"], ["hawk", "nightingale", "swallow", "peacock"]]
+    batchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," +
+        "\"c\" : [" + "[\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]," + "[\"hawk\",\"nightingale\",\"swallow\",\"peacock\"]" + "]");
+      batchString.append("},");
+    }
+    batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," +
+      "\"c\" : [" + "[\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]," + "[\"hawk\",\"nightingale\",\"swallow\",\"peacock\"]" + "]");
+    batchString.append("}]");
+    inputJsonBatches.add(batchString.toString());
+
+    // Figure out what will be approximate total output size out of flatten for input above
+    // We will use this sizing information to set output batch size so we can produce desired
+    // number of batches that can be verified.
+
+    // output rows will be like this.
+    // "a" : 5, "b" : wideString, "c" : ["parrot", "hummingbird", "owl", "woodpecker"]
+    // "a" : 5, "b" : wideString, "c" : ["hawk", "nightingale", "swallow", "peacock"]
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]},");
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"hawk\", \"nightingale\", \"swallow\", \"peacock\"]},");
+    }
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"parrot\", \"hummingbird\", \"owl\", \"woodpecker\"]},");
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [\"hawk\", \"nightingale\", \"swallow\", \"peacock\"]}");
+
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(flatten)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b", "c")
+      .expectedNumBatches(4) // verify number of batches
+      .expectedBatchSize(totalSize);  // verify batch size.
+
+    final JsonStringArrayList<Text> birds1 = new JsonStringArrayList<Text>() {{
+      add(new Text("parrot"));
+      add(new Text("hummingbird"));
+      add(new Text("owl"));
+      add(new Text("woodpecker"));
+    }};
+
+    final JsonStringArrayList<Text> birds2 = new JsonStringArrayList<Text>() {{
+      add(new Text("hawk"));
+      add(new Text("nightingale"));
+      add(new Text("swallow"));
+      add(new Text("peacock"));
+    }};
+
+    for (int i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, birds1);
+      opTestBuilder.baselineValues(5l, wideString, birds2);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testFlattenMap() throws Exception {
+    PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+    mockOpContext(flatten, initReservation, maxAllocation);
+
+    // create input rows like this.
+    // "a" : 5, "b" : wideString, "c" : [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}]
+
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+
+    batchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," +
+        "\"c\" : [" + " { \"trans_id\":\"t1\", \"amount\":100, " +
+        "\"trans_time\":7777777, \"type\":\"sports\"}," +
+        " { \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}");
+      batchString.append("]},");
+    }
+    batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," +
+      "\"c\" : [" + " { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777," +
+      " \"type\":\"sports\"}," +
+      " { \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}");
+    batchString.append("]}]");
+    inputJsonBatches.add(batchString.toString());
+
+    // Figure out what will be approximate total output size out of flatten for input above
+    // We will use this sizing information to set output batch size so we can produce desired
+    // number of batches that can be verified.
+
+    // output rows will be like this.
+    // "a" : 5, "b" : wideString, "c" : {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}
+    // "a" : 5, "b" : wideString, "c" : {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+        "{\"trans_id\":\"t1\", \"amount\":100, " +
+        "\"trans_time\":7777777, \"type\":\"sports\"}},");
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+        "{\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}},");
+    }
+
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+      "{\"trans_id\":\"t1\", \"amount\":100, " +
+      "\"trans_time\":7777777, \"type\":\"sports\"}},");
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+      "{\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}}");
+
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(flatten)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b", "c")
+      .expectedNumBatches(4) // verify number of batches
+      .expectedBatchSize(totalSize / 2);  // verify batch size.
+
+    JsonStringHashMap<String, Object> resultExpected1 = new JsonStringHashMap<>();
+    resultExpected1.put("trans_id", new Text("t1"));
+    resultExpected1.put("amount", new Long(100));
+    resultExpected1.put("trans_time", new Long(7777777));
+    resultExpected1.put("type", new Text("sports"));
+
+    JsonStringHashMap<String, Object> resultExpected2 = new JsonStringHashMap<>();
+    resultExpected2.put("trans_id", new Text("t2"));
+    resultExpected2.put("amount", new Long(1000));
+    resultExpected2.put("trans_time", new Long(8888888));
+    resultExpected2.put("type", new Text("groceries"));
+
+    for (int i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, resultExpected1);
+      opTestBuilder.baselineValues(5l, wideString, resultExpected2);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testFlattenListOfMaps() throws Exception {
+    PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+    mockOpContext(flatten, initReservation, maxAllocation);
+
+    // create input rows like this.
+    // "a" : 5, "b" : wideString,
+    // "c" : [ [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}],
+    //         [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}],
+    //         [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}] ]
+
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+
+    batchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" +
+        "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+        "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " +
+        "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+        "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " +
+        "[ { \"trans_id\":\"t1\", \"amount\":100, " +
+        "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+        "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]");
+      batchString.append("]},");
+    }
+    batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" +
+      "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+      "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " +
+      "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+      "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ], " +
+      "[ { \"trans_id\":\"t1\", \"amount\":100, " +
+      "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+      "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]");
+    batchString.append("]}]");
+    inputJsonBatches.add(batchString.toString());
+
+    // Figure out what will be approximate total output size out of flatten for input above
+    // We will use this sizing information to set output batch size so we can produce desired
+    // number of batches that can be verified.
+
+    // output rows will be like this.
+    // "a" : 5, "b" : wideString, "c" : [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}]
+    // "a" : 5, "b" : wideString, "c" : [{"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, {"trans_id":"t1", amount:100, trans_time:8888888, type:groceries}]
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+        "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+        "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]},");
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+        "[ { \"trans_id\":\"t1\", \"amount\":100, " +
+        "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+        "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]},");
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+        "[ { \"trans_id\":\"t1\", \"amount\":100, " +
+        "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+        "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]},");
+    }
+
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+      "[ { \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+      "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"} ]},");
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+      "[ { \"trans_id\":\"t1\", \"amount\":100, " +
+      "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+      "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]},");
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+      "[ { \"trans_id\":\"t1\", \"amount\":100, " +
+      "\"trans_time\":7777777, \"type\":\"sports\"}," + " { " +
+      "\"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]}");
+
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(flatten)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b", "c")
+      .expectedNumBatches(4) // verify number of batches
+      .expectedBatchSize(totalSize / 2);  // verify batch size.
+
+    final JsonStringHashMap<String, Object> resultExpected1 = new JsonStringHashMap<>();
+    resultExpected1.put("trans_id", new Text("t1"));
+    resultExpected1.put("amount", new Long(100));
+    resultExpected1.put("trans_time", new Long(7777777));
+    resultExpected1.put("type", new Text("sports"));
+
+    final JsonStringHashMap<String, Object> resultExpected2 = new JsonStringHashMap<>();
+    resultExpected2.put("trans_id", new Text("t2"));
+    resultExpected2.put("amount", new Long(1000));
+    resultExpected2.put("trans_time", new Long(8888888));
+    resultExpected2.put("type", new Text("groceries"));
+
+    final JsonStringArrayList<JsonStringHashMap<String, Object>> results = new JsonStringArrayList<JsonStringHashMap<String, Object>>() {{
+      add(resultExpected1);
+      add(resultExpected2);
+    }};
+
+    for (int i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, results);
+      opTestBuilder.baselineValues(5l, wideString, results);
+      opTestBuilder.baselineValues(5l, wideString, results);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testFlattenNestedMap() throws Exception {
+    PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+    mockOpContext(flatten, initReservation, maxAllocation);
+
+    // create input rows like this.
+    // "a" : 5, "b" : wideString,
+    // "c" : [ {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries},
+    //         {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries} ]
+
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+
+    StringBuilder innerMap = new StringBuilder();
+    innerMap.append("{ \"trans_id\":\"inner_trans_t1\", \"amount\":100, \"trans_time\":7777777, \"type\":\"sports\"}");
+
+    batchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" +
+        " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " +
+        "\"type\":\"sports\"}," + " { \"innerMap\": " + innerMap +
+        ", \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}]");
+      batchString.append("},");
+    }
+    batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : [" +
+      " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " +
+      "\"type\":\"sports\"}," + " { \"innerMap\": " + innerMap + ",  \"trans_id\":\"t2\", \"amount\":1000, \"trans_time\":8888888, \"type\":\"groceries\"}");
+    batchString.append("]}]");
+    inputJsonBatches.add(batchString.toString());
+
+    // Figure out what will be approximate total output size out of flatten for input above
+    // We will use this sizing information to set output batch size so we can produce desired
+    // number of batches that can be verified.
+
+    // output rows will be like this.
+    // "a" : 5, "b" : wideString, "c" : {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries}
+    // "a" : 5, "b" : wideString, "c" : {innerMap: {"trans_id":"t1", amount:100, trans_time:7777777, type:sports}, "trans_id":"t1", amount:100, trans_time:8888888, type:groceries}
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+        " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " +
+        "\"type\":\"sports\"} }, ");
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+        " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " +
+        "\"type\":\"sports\"} }, ");
+    }
+
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+      " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " +
+      "\"type\":\"sports\"} }, ");
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" : " +
+      " { \"innerMap\": " + innerMap + ", \"trans_id\":\"t1\", \"amount\":100, \"trans_time\":7777777, " +
+      "\"type\":\"sports\"} }");
+
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(flatten)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b", "c")
+      .expectedNumBatches(4) // verify number of batches
+      .expectedBatchSize(totalSize / 2);  // verify batch size.
+
+    JsonStringHashMap<String, Object> innerMapResult = new JsonStringHashMap<>();
+    innerMapResult.put("trans_id", new Text("inner_trans_t1"));
+    innerMapResult.put("amount", new Long(100));
+    innerMapResult.put("trans_time", new Long(7777777));
+    innerMapResult.put("type", new Text("sports"));
+
+    JsonStringHashMap<String, Object> resultExpected1 = new JsonStringHashMap<>();
+    resultExpected1.put("trans_id", new Text("t1"));
+    resultExpected1.put("amount", new Long(100));
+    resultExpected1.put("trans_time", new Long(7777777));
+    resultExpected1.put("type", new Text("sports"));
+    resultExpected1.put("innerMap", innerMapResult);
+
+    JsonStringHashMap<String, Object> resultExpected2 = new JsonStringHashMap<>();
+    resultExpected2.put("trans_id", new Text("t2"));
+    resultExpected2.put("amount", new Long(1000));
+    resultExpected2.put("trans_time", new Long(8888888));
+    resultExpected2.put("type", new Text("groceries"));
+    resultExpected2.put("innerMap", innerMapResult);
+
+    for (int i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, resultExpected1);
+      opTestBuilder.baselineValues(5l, wideString, resultExpected2);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testFlattenUpperLimit() throws Exception {
+    // test the upper limit of 65535 records per batch.
+    PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+    mockOpContext(flatten, initReservation, maxAllocation);
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+
+    StringBuilder flattenElement = new StringBuilder();
+
+    // Create list of 1000 elements
+    flattenElement.append("[");
+    for (int i = 0; i < 1000; i++) {
+      flattenElement.append(i);
+      flattenElement.append(",");
+    }
+    flattenElement.append(1000);
+    flattenElement.append("]");
+
+    batchString.append("[");
+
+    numRows = 1000;
+
+    for (int i = 0; i < numRows; i++) {
+      batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\":" + flattenElement + "},");
+    }
+    batchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\":" + flattenElement + "}");
+    batchString.append("]");
+    inputJsonBatches.add(batchString.toString());
+
+    // Figure out what will be approximate total output size out of flatten for input above
+    // We will use this sizing information to set output batch size so we can produce desired
+    // number of batches that can be verified.
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      for (int j = 0; j < 1000; j++) {
+        expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" :");
+        expectedBatchString.append(j);
+        expectedBatchString.append("},");
+      }
+    }
+    for (int j = 0; j < 999; j++) {
+      expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" :");
+      expectedBatchString.append(j);
+      expectedBatchString.append("},");
+    }
+
+    expectedBatchString.append("{\"a\": 5, " + "\"b\" : " + "\"" + wideString + "\"," + "\"c\" :");
+    expectedBatchString.append(1000);
+    expectedBatchString.append("}");
+
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+    // Here we expect 16 batches because each batch will be limited by upper limit of 65535 records.
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(flatten)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b", "c")
+      .expectedNumBatches(16) // verify number of batches
+      .expectedBatchSize(totalSize / 2);  // verify batch size.
+
+    for (long i = 0; i < numRows + 1; i++) {
+      for (long j = 0; j < 1001; j++) {
+        opTestBuilder.baselineValues(5l, wideString, j);
+      }
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testFlattenLowerLimit() throws Exception {
+    // test the lower limit of at least one batch
+    PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+    mockOpContext(flatten, initReservation, maxAllocation);
+
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+    StringBuilder flattenElement = new StringBuilder();
+
+    // Create list of 10 elements
+    flattenElement.append("[");
+    for (int i = 0; i < 10; i++) {
+      flattenElement.append(i);
+      flattenElement.append(",");
+    }
+    flattenElement.append(10);
+    flattenElement.append("]");
+
+    // create list of wideStrings
+    final StringBuilder wideStrings = new StringBuilder();
+    wideStrings.append("[");
+    for (int i = 0; i < 10; i++) {
+      wideStrings.append("\"" + wideString + "\",");
+    }
+    wideStrings.append("\"" + wideString + "\"");
+    wideStrings.append("]");
+
+    batchString.append("[");
+    batchString.append("{\"a\": " + wideStrings + "," + "\"c\":" + flattenElement);
+    batchString.append("}]");
+    inputJsonBatches.add(batchString.toString());
+
+    // Figure out what will be approximate total output size out of flatten for input above
+    // We will use this sizing information to set output batch size so we can produce desired
+    // number of batches that can be verified.
+
+    // set very low value of batch size for a large record size.
+    // This is to test we atleast get one record per batch.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 1024);
+
+    // Here we expect 10 batches because each batch will be bounded by lower limit of at least 1 record.
+    // do not check the output batch size as it will be more than configured value of 1024, so we get
+    // at least one record out.
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(flatten)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "c")
+      .expectedNumBatches(10); // verify number of batches
+
+    final JsonStringArrayList<Text> results = new JsonStringArrayList<Text>() {{
+      add(new Text(wideString));
+      add(new Text(wideString));
+      add(new Text(wideString));
+      add(new Text(wideString));
+      add(new Text(wideString));
+      add(new Text(wideString));
+      add(new Text(wideString));
+      add(new Text(wideString));
+      add(new Text(wideString));
+      add(new Text(wideString));
+      add(new Text(wideString));
+    }};
+
+    for (long j = 0; j < 11; j++) {
+      opTestBuilder.baselineValues(results, j);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testFlattenEmptyList() throws Exception {
+    final PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("b"));
+
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+
+    StringBuilder flattenElement = new StringBuilder();
+
+    flattenElement.append("[");
+    flattenElement.append("]");
+
+    batchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      batchString.append("{\"a\": 5, " + "\"b\" : " + flattenElement + "},");
+    }
+    batchString.append("{\"a\": 5, " + "\"b\" : " + flattenElement + "}");
+    batchString.append("]");
+    inputJsonBatches.add(batchString.toString());
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(flatten)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b")
+      .expectZeroRows();
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testFlattenLargeRecords() throws Exception {
+    PhysicalOperator flatten = new FlattenPOP(null, SchemaPath.getSimplePath("c"));
+    mockOpContext(flatten, initReservation, maxAllocation);
+
+    // create input rows like this.
+    // "a" : <id1>, "b" : wideString, "c" : [ 10 wideStrings ]
+    List<String> inputJsonBatches = Lists.newArrayList();
+    StringBuilder batchString = new StringBuilder();
+    int arrayLength = 10;
+    StringBuilder test = new StringBuilder();
+    test.append("[ \"");
+    for (int i = 0; i < arrayLength; i++) {
+      test.append(wideString);
+      test.append("\",\"");
+    }
+    test.append(wideString);
+    test.append("\"]");
+
+    batchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      batchString.append("{" + "\"a\" :" + (new StringBuilder().append(i)) + ",\"b\": \"" + wideString + "\"," +
+        "\"c\": " + test + "},");
+    }
+    batchString.append("{" + "\"a\" :" + (new StringBuilder().append(numRows)) + ",\"b\": \"" + wideString + "\"," +
+      "\"c\": " + test + "}");
+    batchString.append("]");
+    inputJsonBatches.add(batchString.toString());
+
+    // output rows will be like this.
+    // "a" : <id1>, "b" : wideString, "c" : wideString
+
+    // Figure out what will be approximate total output size out of flatten for input above
+    // We will use this sizing information to set output batch size so we can produce desired
+    // number of batches that can be verified.
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int k = 0; k < (numRows) * 11; k++) {
+      expectedBatchString.append("{" + "\"a\" :" + (new StringBuilder().append(k)) + ",\"b\": \"" + wideString + "\",");
+      expectedBatchString.append("\"c\": \"" + wideString + "\"},");
+    }
+    expectedBatchString.append("{" + "\"a\" :" + (new StringBuilder().append(numRows)) + ",\"b\": \"" + wideString + "\",");
+    expectedBatchString.append("\"c\": \"" + wideString + "\"}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches because of fragmentation factor of 2 accounted for in flatten.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize / 2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+      .physicalOperator(flatten)
+      .inputDataStreamJson(inputJsonBatches)
+      .baselineColumns("a", "b", "c")
+      .expectedNumBatches(4) // verify number of batches
+      .expectedBatchSize(totalSize / 2);  // verify batch size.
+
+    for (long k = 0; k < ((numRows + 1)); k++) {
+      for (int j = 0; j < arrayLength + 1; j++) {
+        opTestBuilder.baselineValues(k, wideString, wideString);
+      }
+    }
+
+    opTestBuilder.go();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/039530a4/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
index cd68bf3..c470b0d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -40,6 +40,8 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.HyperVectorValueIterator;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.record.BatchSchema;
@@ -318,10 +320,11 @@ public class DrillTestWrapper {
    * @throws SchemaChangeException
    * @throws UnsupportedEncodingException
    */
-  public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches)
+  public static Map<String, List<Object>> addToCombinedVectorResults(Iterable<VectorAccessible> batches,
+                                                                     Long expectedBatchSize, Integer expectedNumBatches)
       throws SchemaChangeException, UnsupportedEncodingException {
     Map<String, List<Object>> combinedVectors = new TreeMap<>();
-    addToCombinedVectorResults(batches, null, combinedVectors);
+    addToCombinedVectorResults(batches, null, expectedBatchSize, expectedNumBatches, combinedVectors);
     return combinedVectors;
   }
 
@@ -336,12 +339,15 @@ public class DrillTestWrapper {
    * @throws SchemaChangeException
    * @throws UnsupportedEncodingException
    */
-  public static int addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema, Map<String, List<Object>> combinedVectors)
+  public static int addToCombinedVectorResults(Iterable<VectorAccessible> batches, BatchSchema expectedSchema,
+                                               Long expectedBatchSize, Integer expectedNumBatches,
+                                               Map<String, List<Object>> combinedVectors)
        throws SchemaChangeException, UnsupportedEncodingException {
     // TODO - this does not handle schema changes
     int numBatch = 0;
     long totalRecords = 0;
     BatchSchema schema = null;
+
     for (VectorAccessible loader : batches)  {
       numBatch++;
       if (expectedSchema != null) {
@@ -352,6 +358,13 @@ public class DrillTestWrapper {
         }
       }
 
+      if (expectedBatchSize != null) {
+        RecordBatchSizer sizer = new RecordBatchSizer(loader);
+        // Not checking actualSize as accounting is not correct when we do
+        // split and transfer ownership across operators.
+        Assert.assertTrue(sizer.netSize() <= expectedBatchSize);
+      }
+
       // TODO:  Clean:  DRILL-2933:  That load(...) no longer throws
       // SchemaChangeException, so check/clean throws clause above.
       if (schema == null) {
@@ -367,6 +380,7 @@ public class DrillTestWrapper {
       }
       logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords);
       totalRecords += loader.getRecordCount();
+
       for (VectorWrapper<?> w : loader) {
         String field = SchemaPath.getSimplePath(w.getField().getName()).toExpr();
         ValueVector[] vectors;
@@ -420,6 +434,16 @@ public class DrillTestWrapper {
         }
       }
     }
+
+    if (expectedNumBatches != null) {
+      // Based on how much memory is actually taken by value vectors (because of doubling stuff),
+      // we have to do complex math for predicting exact number of batches.
+      // Instead, check that number of batches is at least the minimum that is expected
+      // and no more than twice of that.
+      Assert.assertTrue(numBatch >= expectedNumBatches);
+      Assert.assertTrue(numBatch <= (2*expectedNumBatches));
+    }
+
     return numBatch;
   }
 
@@ -539,7 +563,7 @@ public class DrillTestWrapper {
       addTypeInfoIfMissing(actual.get(0), testBuilder);
 
       BatchIterator batchIter = new BatchIterator(actual, loader);
-      actualSuperVectors = addToCombinedVectorResults(batchIter);
+      actualSuperVectors = addToCombinedVectorResults(batchIter, null, null);
       batchIter.close();
 
       // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
@@ -552,7 +576,7 @@ public class DrillTestWrapper {
           test(baselineOptionSettingQueries);
           expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
           BatchIterator exBatchIter = new BatchIterator(expected, loader);
-          expectedSuperVectors = addToCombinedVectorResults(exBatchIter);
+          expectedSuperVectors = addToCombinedVectorResults(exBatchIter, null, null);
           exBatchIter.close();
         }
       } else {
@@ -587,6 +611,11 @@ public class DrillTestWrapper {
 
   public static Map<String, List<Object>> translateRecordListToHeapVectors(List<Map<String, Object>> records) {
     Map<String, List<Object>> ret = new TreeMap<>();
+
+    if (records == null) {
+      return ret;
+    }
+
     for (String s : records.get(0).keySet()) {
       ret.put(s, new ArrayList<>());
     }