You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2017/08/15 13:44:15 UTC

[03/13] drill git commit: DRILL-5601: Rollup of external sort fixes an improvements

DRILL-5601: Rollup of external sort fixes an improvements

- DRILL-5513: Managed External Sort : OOM error during the merge phase
- DRILL-5519: Sort fails to spill and results in an OOM
- DRILL-5522: OOM during the merge and spill process of the managed external sort
- DRILL-5594: Excessive buffer reallocations during merge phase of external sort
- DRILL-5597: Incorrect "bits" vector allocation in nullable vectors allocateNew()
- DRILL-5602: Repeated List Vector fails to initialize the offset vector
- DRILL-5617: Spill file name collisions when spill file is on a shared file system
0 DRILL-5445: bug in repeated map vector deserialization
- Workaround for DRILL-5656: Streaming Agg Batch forces sort to retain in-memory batches past NONE
- Fixes for the "record batch sizer" to handle for UNION, MAP, LIST types

Fixes a longstanding bug in the deserialization of a repeated map
vector read from a spill file. A few minor code cleanups also.

All of the bugs have to do with handling low-memory conditions, and with
correctly estimating the sizes of vectors, even when those vectors come
from the spill file or from an exchange. Hence, the changes for all of
the above issues are interrelated.

Also includes some fixes for tests:

* Certain tests require the ability to enforce the output size of the
memory merge/sort. Restored this option.
* Resolve issue with TestDrillbitResilience
* A particular test injects a fault during in-memory sort, but used a
single-batch input (which does not need the merge phase.)
Rather than introduce a new config property (the earlier solution),
altered the test to use input that returns more than one batch.

Two fixes forBasicPhysicalOpUnitTest — a test that uses JMockit to create a “fake”
fragment context.

1. No drillbit endpoint is available, so the SpillSet change that adds
the node to the spill path failed. Solution was to omit the node path
segment in such tests.

2. The Dynamic UDF registry is null causing a crash. This has nothing
to do with sort. Perhaps some pre-existing error? Anyway, added a check
for this condition.

close #860


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/073ea681
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/073ea681
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/073ea681

Branch: refs/heads/master
Commit: 073ea68197cdc37e2ca9414da96e9df39ec49fcc
Parents: ae123e1
Author: Paul Rogers <pr...@maprtech.com>
Authored: Thu Apr 6 13:57:19 2017 -0700
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Mon Aug 14 22:19:24 2017 -0700

----------------------------------------------------------------------
 .../cache/VectorAccessibleSerializable.java     |   3 +-
 .../drill/exec/coord/ClusterCoordinator.java    |  10 +-
 .../expr/fn/FunctionImplementationRegistry.java |   9 +-
 .../fn/registry/RemoteFunctionRegistry.java     |   8 +
 .../physical/impl/aggregate/InternalBatch.java  |   4 +-
 .../impl/aggregate/StreamingAggBatch.java       |  12 +
 .../impl/aggregate/StreamingAggTemplate.java    |  24 +-
 .../impl/sort/SortRecordBatchBuilder.java       |  13 +-
 .../physical/impl/spill/RecordBatchSizer.java   | 348 ++++++++----
 .../exec/physical/impl/spill/SpillSet.java      |  24 +-
 .../IteratorValidatorBatchIterator.java         |   3 +-
 .../exec/physical/impl/xsort/MSortTemplate.java |   6 +-
 .../physical/impl/xsort/managed/BatchGroup.java |   7 +-
 .../impl/xsort/managed/BufferedBatches.java     |   1 +
 .../impl/xsort/managed/ExternalSortBatch.java   |  74 ++-
 .../impl/xsort/managed/MSortTemplate.java       |   4 +-
 .../physical/impl/xsort/managed/MSorter.java    |   3 +-
 .../impl/xsort/managed/MergeSortWrapper.java    |  18 +-
 .../managed/PriorityQueueCopierTemplate.java    |   1 -
 .../managed/PriorityQueueCopierWrapper.java     |  53 +-
 .../physical/impl/xsort/managed/SortConfig.java |  27 +-
 .../physical/impl/xsort/managed/SortImpl.java   | 271 ++++++----
 .../impl/xsort/managed/SortMemoryManager.java   | 523 ++++++++++++++-----
 .../impl/xsort/managed/SorterWrapper.java       |   3 +-
 .../impl/xsort/managed/SpilledRuns.java         |  31 +-
 .../drill/exec/record/AbstractRecordBatch.java  |   6 +-
 .../drill/exec/record/HyperVectorWrapper.java   |   7 +-
 .../apache/drill/exec/record/RecordBatch.java   |   4 +-
 .../drill/exec/record/VectorInitializer.java    | 154 ++++++
 .../apache/drill/exec/record/WritableBatch.java |   5 +-
 .../org/apache/drill/exec/server/Drillbit.java  |   1 +
 .../src/main/resources/drill-module.conf        |  11 +-
 .../java/org/apache/drill/TestUnionAll.java     |   1 -
 .../exec/cache/TestBatchSerialization.java      |   6 -
 .../physical/impl/window/TestWindowFrame.java   |   5 +
 .../impl/xsort/TestSimpleExternalSort.java      |   7 +-
 .../impl/xsort/TestSortSpillWithException.java  |   8 +-
 .../impl/xsort/managed/SortTestUtilities.java   |   2 +-
 .../physical/impl/xsort/managed/TestCopier.java |   7 +-
 .../managed/TestExternalSortInternals.java      | 224 +++++---
 .../impl/xsort/managed/TestSortImpl.java        |  10 +-
 .../exec/server/TestDrillbitResilience.java     |  38 +-
 .../org/apache/drill/test/ClientFixture.java    | 120 +++++
 .../org/apache/drill/test/ProfileParser.java    |   2 +-
 .../org/apache/drill/test/QueryBuilder.java     |  25 +
 .../src/main/java/io/netty/buffer/DrillBuf.java |  17 +-
 .../drill/exec/memory/AllocationManager.java    |  17 +-
 .../apache/drill/exec/memory/BaseAllocator.java |   1 +
 .../src/main/codegen/includes/vv_imports.ftl    |   2 +
 .../codegen/templates/FixedValueVectors.java    |   4 +-
 .../codegen/templates/NullableValueVectors.java |  13 +-
 .../src/main/codegen/templates/UnionVector.java |  14 +-
 .../templates/VariableLengthVectors.java        |  37 +-
 .../drill/exec/vector/AllocationHelper.java     |  43 +-
 .../drill/exec/vector/BaseDataValueVector.java  |  14 +-
 .../org/apache/drill/exec/vector/BitVector.java |   5 +-
 .../apache/drill/exec/vector/ObjectVector.java  |   9 +-
 .../apache/drill/exec/vector/ValueVector.java   |  11 +-
 .../apache/drill/exec/vector/ZeroVector.java    |   9 +-
 .../exec/vector/complex/AbstractMapVector.java  |  14 +-
 .../vector/complex/BaseRepeatedValueVector.java |  13 +-
 .../drill/exec/vector/complex/ListVector.java   |  13 +-
 .../exec/vector/complex/RepeatedListVector.java |  13 +-
 .../exec/vector/complex/RepeatedMapVector.java  |  25 +-
 64 files changed, 1759 insertions(+), 638 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index d569ae5..0a78cb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -35,8 +35,6 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.vector.NullableBigIntVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.codahale.metrics.MetricRegistry;
@@ -141,6 +139,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
   }
 
   // Like above, only preserve the original container and list of value-vectors
+  @SuppressWarnings("resource")
   public void readFromStreamWithContainer(VectorContainer myContainer, InputStream input) throws IOException {
     final VectorContainer container = new VectorContainer();
     final UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
index ea9593e..e758d6f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
@@ -38,8 +38,12 @@ public abstract class ClusterCoordinator implements AutoCloseable {
       16, 0.75f, 16);
 
   /**
-   * Start the cluster coordinator.  Millis to wait is
-   * @param millisToWait The maximum time to wait before throwing an exception if the cluster coordination service has not successfully started.  Use 0 to wait indefinitely.
+   * Start the cluster coordinator. Millis to wait is
+   *
+   * @param millisToWait
+   *          The maximum time to wait before throwing an exception if the
+   *          cluster coordination service has not successfully started. Use 0
+   *          to wait indefinitely.
    * @throws Exception
    */
   public abstract void start(long millisToWait) throws Exception;
@@ -49,7 +53,7 @@ public abstract class ClusterCoordinator implements AutoCloseable {
   public abstract void unregister(RegistrationHandle handle);
 
   /**
-   * Get a collection of avialable Drillbit endpoints, Thread-safe.
+   * Get a collection of available Drillbit endpoints, Thread-safe.
    * Could be slightly out of date depending on refresh policy.
    *
    * @return A collection of available endpoints.

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index 8bc6af0..1c399e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -343,7 +343,9 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
    */
   @SuppressWarnings("resource")
   public boolean syncWithRemoteRegistry(long version) {
-    if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), localFunctionRegistry.getVersion())) {
+    // Do the version check only if a remote registry exists. It does
+    // not exist for some JMockit-based unit tests.
+    if (isRegistrySyncNeeded()) {
       synchronized (this) {
         long localRegistryVersion = localFunctionRegistry.getVersion();
         if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), localRegistryVersion))  {
@@ -390,6 +392,11 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
     return version != localFunctionRegistry.getVersion();
   }
 
+  private boolean isRegistrySyncNeeded() {
+    return remoteFunctionRegistry.hasRegistry() &&
+           isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), localFunctionRegistry.getVersion());
+  }
+
   /**
    * Checks if local function registry should be synchronized with remote function registry.
    * If remote function registry version is -1, it means that remote function registry is unreachable

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
index 2e5eda2..38d8fcc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
@@ -128,6 +128,14 @@ public class RemoteFunctionRegistry implements AutoCloseable {
     }
   }
 
+  /**
+   * Report whether a remote registry exists. During some unit tests,
+   * no remote registry exists, so the other methods should not be called.
+   * @return true if a remote registry exists, false if this a local-only
+   * instance and no such registry exists
+   */
+  public boolean hasRegistry() { return registry != null; }
+
   public Registry getRegistry(DataChangeVersion version) {
     return registry.get(registry_path, version);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
index 9e96727..1d1d3cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -39,7 +39,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
     this(incoming, null, oContext);
   }
 
-  public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers, OperatorContext oContext){
+  public InternalBatch(RecordBatch incoming, VectorWrapper<?>[] ignoreWrappers, OperatorContext oContext){
     switch(incoming.getSchema().getSelectionVectorMode()){
     case FOUR_BYTE:
       this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent();

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index af41438..f7fd1d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.impl.aggregate.StreamingAggregator.AggOutcome;
+import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -87,6 +88,14 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
+
+    // Sorry. Horrible hack. To release memory after an in-memory sort,
+    // the External Sort normally frees in-memory sorted batches just
+    // before returning NONE. But, this operator needs the batches to
+    // be present, even after NONE. This call puts the ESB into the proper
+    // "mode". A later call explicitly releases the batches.
+
+    ExternalSortBatch.retainSv4OnNone(incoming);
   }
 
   @Override
@@ -111,6 +120,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       case STOP:
         state = BatchState.STOP;
         return;
+      default:
+        break;
     }
 
     this.incomingSchema = incoming.getSchema();
@@ -176,6 +187,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
         container.zeroVectors();
       }
       done = true;
+      ExternalSortBatch.releaseBatches(incoming);
       // fall through
     case RETURN_OUTCOME:
       IterOutcome outcome = aggregator.getOutcome();

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 3417611..fb4d508 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -36,6 +36,9 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
   private int previousIndex = -1;
   private int underlyingIndex = 0;
   private int currentIndex;
+  /**
+   * Number of records added to the current aggregation group.
+   */
   private long addedRecordCount = 0;
   private IterOutcome outcome;
   private int outputCount = 0;
@@ -163,6 +166,19 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
           previousIndex = currentIndex;
         }
 
+        /**
+         * Hold onto the previous incoming batch. When the incoming uses an
+         * SV4, the InternalBatch DOES NOT clone or transfer the data. Instead,
+         * it clones only the SV4, and assumes that the same hyper-list of
+         * batches will be offered again after the next call to the incoming
+         * next(). This is, in fact, how the SV4 works, so all is fine. The
+         * trick to be aware of, however, is that this MUST BE TRUE even if
+         * the incoming next() returns NONE: the incoming is obligated to continue
+         * to offer the set of batches. That is, the incoming cannot try to be
+         * tidy and release the batches one end-of-data or the following code
+         * will fail, likely with an IndexOutOfBounds exception.
+         */
+
         InternalBatch previous = new InternalBatch(incoming, context);
 
         try {
@@ -178,14 +194,14 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
               lastOutcome = out;
               if (first && addedRecordCount == 0) {
                 return setOkAndReturn();
-              } else if(addedRecordCount > 0) {
+              } else if (addedRecordCount > 0) {
                 outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
-                // (output container full or not) as we are not going to insert anymore records.
+                // (output container full or not) as we are not going to insert any more records.
                 if (EXTRA_DEBUG) {
                   logger.debug("Received no more batches, returning.");
                 }
                 return setOkAndReturn();
-              }else{
+              } else {
                 if (first && out == IterOutcome.OK) {
                   out = IterOutcome.OK_NEW_SCHEMA;
                 }
@@ -330,7 +346,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
 
   private void addRecordInc(int index) {
     addRecord(index);
-    this.addedRecordCount++;
+    addedRecordCount++;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index cd52a91..999fb04 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.AllocationReservation;
+import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.BatchSchema;
@@ -244,15 +245,17 @@ public class SortRecordBatchBuilder implements AutoCloseable {
   }
 
   /**
-   * For given record count how much memory does SortRecordBatchBuilder needs for its own purpose. This is used in
-   * ExternalSortBatch to make decisions about whether to spill or not.
+   * For given record count, return the memory that SortRecordBatchBuilder needs
+   * for its own purpose. This is used in ExternalSortBatch to make decisions
+   * about whether to spill or not.
    *
-   * @param recordCount
-   * @return
+   * @param recordCount expected output record count
+   * @return number of bytes needed for an SV4, power-of-two rounded
    */
+
   public static long memoryNeeded(int recordCount) {
     // We need 4 bytes (SV4) for each record. Due to power-of-two allocations, the
     // backing buffer might be twice this size.
-    return recordCount * 2 * 4;
+    return BaseAllocator.nextPowerOfTwo(recordCount * 4);
   }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
index a1b8169..f76757f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
@@ -19,117 +19,221 @@ package org.apache.drill.exec.physical.impl.spill;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.AllocationManager.BufferLedger;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorInitializer;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.UInt4Vector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedListVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 
+import com.google.common.collect.Sets;
+
 /**
  * Given a record batch or vector container, determines the actual memory
  * consumed by each column, the average row, and the entire record batch.
  */
 
 public class RecordBatchSizer {
-//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSizer.class);
 
   /**
    * Column size information.
    */
   public static class ColumnSize {
+    public final String prefix;
     public final MaterializedField metadata;
 
     /**
-     * Assumed size from Drill metadata.
+     * Assumed size from Drill metadata. Note that this information is
+     * 100% bogus. Do not use it.
      */
 
+    @Deprecated
     public int stdSize;
 
     /**
-     * Actual memory consumed by all the vectors associated with this column.
-     */
-
-    public int totalSize;
-
-    /**
      * Actual average column width as determined from actual memory use. This
      * size is larger than the actual data size since this size includes per-
      * column overhead such as any unused vector space, etc.
      */
 
-    public int estSize;
-    public int capacity;
-    public int density;
-    public int dataSize;
-    public boolean variableWidth;
+    public final int estSize;
 
-    public ColumnSize(ValueVector vv) {
-      metadata = vv.getField();
-      stdSize = TypeHelper.getSize(metadata.getType());
+    /**
+     * Number of times the value here (possibly repeated) appears in
+     * the record batch.
+     */
 
-      // Can't get size estimates if this is an empty batch.
-      int rowCount = vv.getAccessor().getValueCount();
-      if (rowCount == 0) {
-        estSize = stdSize;
-        return;
-      }
+    public final int valueCount;
 
-      // Total size taken by all vectors (and underlying buffers)
-      // associated with this vector.
+    /**
+     * The number of elements in the value vector. Consider two cases.
+     * A required or nullable vector has one element per row, so the
+     * <tt>entryCount</tt> is the same as the <tt>valueCount</tt> (which,
+     * in turn, is the same as the row count.) But, if this vector is an
+     * array, then the <tt>valueCount</tt> is the number of columns, while
+     * <tt>entryCount</tt> is the total number of elements in all the arrays
+     * that make up the columns, so <tt>entryCount</tt> will be different than
+     * the <tt>valueCount</tt> (normally larger, but possibly smaller if most
+     * arrays are empty.
+     * <p>
+     * Finally, the column may be part of another list. In this case, the above
+     * logic still applies, but the <tt>valueCount</tt> is the number of entries
+     * in the outer array, not the row count.
+     */
+
+    public int entryCount;
+    public int dataSize;
 
-      totalSize = vv.getAllocatedByteCount();
+    /**
+     * The estimated, average number of elements. For a repeated type,
+     * this is the average entries per array (per repeated element).
+     */
 
-      // Capacity is the number of values that the vector could
-      // contain. This is useful only for fixed-length vectors.
+    public int estElementCount;
+    public final boolean isVariableWidth;
 
-      capacity = vv.getValueCapacity();
+    public ColumnSize(ValueVector v, String prefix, int valueCount) {
+      this.prefix = prefix;
+      this.valueCount = valueCount;
+      metadata = v.getField();
+      isVariableWidth = v instanceof VariableWidthVector;
 
       // The amount of memory consumed by the payload: the actual
       // data stored in the vectors.
 
-      dataSize = vv.getPayloadByteCount();
+      if (v.getField().getDataMode() == DataMode.REPEATED) {
+        buildRepeated(v);
+      }
+      estElementCount = 1;
+      entryCount = 1;
+      switch (metadata.getType().getMinorType()) {
+      case LIST:
+        buildList(v);
+        break;
+      case MAP:
+      case UNION:
+        // No standard size for Union type
+        dataSize = v.getPayloadByteCount(valueCount);
+        break;
+      default:
+        dataSize = v.getPayloadByteCount(valueCount);
+        stdSize = TypeHelper.getSize(metadata.getType());
+      }
+      estSize = roundUp(dataSize, valueCount);
+    }
+
+    private void buildRepeated(ValueVector v) {
 
-      // Determine "density" the number of rows compared to potential
-      // capacity. Low-density batches occur at block boundaries, ends
-      // of files and so on. Low-density batches throw off our estimates
-      // for Varchar columns because we don't know the actual number of
-      // bytes consumed (that information is hidden behind the Varchar
-      // implementation where we can't get at it.)
+      // Repeated vectors are special: they have an associated offset vector
+      // that changes the value count of the contained vectors.
 
-      density = roundUp(dataSize * 100, totalSize);
-      estSize = roundUp(dataSize, rowCount);
-      variableWidth = vv instanceof VariableWidthVector ;
+      @SuppressWarnings("resource")
+      UInt4Vector offsetVector = ((RepeatedValueVector) v).getOffsetVector();
+      buildArray(offsetVector);
+      if (metadata.getType().getMinorType() == MinorType.MAP) {
+
+        // For map, the only data associated with the map vector
+        // itself is the offset vector, if any.
+
+        dataSize = offsetVector.getPayloadByteCount(valueCount);
+      }
+    }
+
+    private void buildList(ValueVector v) {
+      @SuppressWarnings("resource")
+      UInt4Vector offsetVector = ((RepeatedListVector) v).getOffsetVector();
+      buildArray(offsetVector);
+      dataSize = offsetVector.getPayloadByteCount(valueCount);
+    }
+
+    private void buildArray(UInt4Vector offsetVector) {
+      entryCount = offsetVector.getAccessor().get(valueCount);
+      estElementCount = roundUp(entryCount, valueCount);
     }
 
     @Override
     public String toString() {
       StringBuilder buf = new StringBuilder()
+          .append(prefix)
           .append(metadata.getName())
           .append("(type: ")
+          .append(metadata.getType().getMode().name())
+          .append(" ")
           .append(metadata.getType().getMinorType().name())
-          .append(", std col. size: ")
+          .append(", count: ")
+          .append(valueCount);
+      if (metadata.getDataMode() == DataMode.REPEATED) {
+        buf.append(", total entries: ")
+           .append(entryCount)
+           .append(", per-array: ")
+           .append(estElementCount);
+      }
+      buf .append(", std size: ")
           .append(stdSize)
-          .append(", actual col. size: ")
+          .append(", actual size: ")
           .append(estSize)
-          .append(", total size: ")
-          .append(totalSize)
           .append(", data size: ")
           .append(dataSize)
-          .append(", row capacity: ")
-          .append(capacity)
-          .append(", density: ")
-          .append(density)
           .append(")");
       return buf.toString();
     }
+
+    /**
+     * Add a single vector initializer to a collection for the entire batch.
+     * Uses the observed column size information to predict the size needed
+     * when allocating a new vector for the same data.
+     *
+     * @param initializer the vector initializer to hold the hints
+     * for this column
+     */
+
+    private void buildVectorInitializer(VectorInitializer initializer) {
+      int width = 0;
+      switch(metadata.getType().getMinorType()) {
+      case VAR16CHAR:
+      case VARBINARY:
+      case VARCHAR:
+
+        // Subtract out the offset vector width
+        width = estSize - 4;
+
+        // Subtract out the bits (is-set) vector width
+        if (metadata.getDataMode() == DataMode.OPTIONAL) {
+          width -= 1;
+        }
+        break;
+      default:
+        break;
+      }
+      String name = prefix + metadata.getName();
+      if (metadata.getDataMode() == DataMode.REPEATED) {
+        if (width > 0) {
+          // Estimated width is width of entire column. Divide
+          // by element count to get per-element size.
+          initializer.variableWidthArray(name, width / estElementCount, estElementCount);
+        } else {
+          initializer.fixedWidthArray(name, estElementCount);
+        }
+      }
+      else if (width > 0) {
+        initializer.variableWidth(name, width);
+      }
+    }
   }
 
   private List<ColumnSize> columnSizes = new ArrayList<>();
@@ -139,14 +243,16 @@ public class RecordBatchSizer {
    */
   private int rowCount;
   /**
-   * Standard row width using Drill meta-data.
+   * Standard row width using Drill meta-data. Note: this information is
+   * 100% bogus. Do not use it.
    */
+  @Deprecated
   private int stdRowWidth;
   /**
    * Actual batch size summing all buffers used to store data
    * for the batch.
    */
-  private int totalBatchSize;
+  private int accountedMemorySize;
   /**
    * Actual row width computed by dividing total batch memory by the
    * record count.
@@ -162,6 +268,8 @@ public class RecordBatchSizer {
   private int sv2Size;
   private int avgDensity;
 
+  private Set<BufferLedger> ledgers = Sets.newIdentityHashSet();
+
   private int netBatchSize;
 
   public RecordBatchSizer(RecordBatch batch) {
@@ -177,42 +285,58 @@ public class RecordBatchSizer {
   /**
    *  Count the nullable columns; used for memory estimation
    */
-  public int numNullables;
+  public int nullableCount;
+
   /**
+   * Create empirical metadata for a record batch given a vector accessible
+   * (basically, an iterator over the vectors in the batch.)
    *
-   * @param va
+   * @param va iterator over the batch's vectors
    */
+
   public RecordBatchSizer(VectorAccessible va) {
     this(va, null);
   }
 
+  /**
+   * Create empirical metadata for a record batch given a vector accessible
+   * (basically, an iterator over the vectors in the batch) along with a
+   * selection vector for those records. The selection vector is used to
+   * pad the estimated row width with the extra two bytes needed per record.
+   * The selection vector memory is added ot the total memory consumed by
+   * this batch.
+   *
+   * @param va iterator over the batch's vectors
+   * @param sv2 selection vector associated with this batch
+   */
+
   public RecordBatchSizer(VectorAccessible va, SelectionVector2 sv2) {
     rowCount = va.getRecordCount();
     for (VectorWrapper<?> vw : va) {
-      int size = measureColumn(vw.getValueVector());
-      if ( size > maxSize ) { maxSize = size; }
-      if ( vw.getField().isNullable() ) { numNullables++; }
+      measureColumn(vw.getValueVector(), "", rowCount);
+    }
+
+    for (BufferLedger ledger : ledgers) {
+      accountedMemorySize += ledger.getAccountedSize();
     }
 
     if (rowCount > 0) {
-      grossRowWidth = roundUp(totalBatchSize, rowCount);
+      grossRowWidth = roundUp(accountedMemorySize, rowCount);
     }
 
     if (sv2 != null) {
       sv2Size = sv2.getBuffer(false).capacity();
-      grossRowWidth += roundUp(sv2Size, rowCount);
-      netRowWidth += 2;
+      accountedMemorySize += sv2Size;
+      hasSv2 = true;
     }
 
-    int totalDensity = 0;
-    int usableCount = 0;
-    for (ColumnSize colSize : columnSizes) {
-      if ( colSize.density > 0 ) {
-        usableCount++;
-      }
-      totalDensity += colSize.density;
-    }
-    avgDensity = roundUp(totalDensity, usableCount);
+    computeEstimates();
+  }
+
+  private void computeEstimates() {
+    grossRowWidth = roundUp(accountedMemorySize, rowCount);
+    netRowWidth = roundUp(netBatchSize, rowCount);
+    avgDensity = roundUp(netBatchSize * 100, accountedMemorySize);
   }
 
   public void applySv2() {
@@ -220,9 +344,10 @@ public class RecordBatchSizer {
       return;
     }
 
+    hasSv2 = true;
     sv2Size = BaseAllocator.nextPowerOfTwo(2 * rowCount);
-    grossRowWidth += roundUp(sv2Size, rowCount);
-    totalBatchSize += sv2Size;
+    accountedMemorySize += sv2Size;
+    computeEstimates();
   }
 
   /**
@@ -238,36 +363,61 @@ public class RecordBatchSizer {
     if ( arg <= 32 ) { return 32; }
     return 64;
   }
-  private int measureColumn(ValueVector vv) {
-    // Maps consume no size themselves. However, their contained
-    // vectors do consume space, so visit columns recursively.
-    if (vv.getField().getType().getMinorType() == MinorType.MAP) {
-      return expandMap((AbstractMapVector) vv);
-    }
 
-    ColumnSize colSize = new ColumnSize(vv);
-    columnSizes.add(colSize);
+  private void measureColumn(ValueVector v, String prefix, int valueCount) {
 
+    ColumnSize colSize = new ColumnSize(v, prefix, valueCount);
+    columnSizes.add(colSize);
     stdRowWidth += colSize.stdSize;
-    totalBatchSize += colSize.totalSize;
     netBatchSize += colSize.dataSize;
+    maxSize = Math.max(maxSize, colSize.dataSize);
+    if (colSize.metadata.isNullable()) {
+      nullableCount++;
+    }
+
+    // Maps consume no size themselves. However, their contained
+    // vectors do consume space, so visit columns recursively.
+
+    switch (v.getField().getType().getMinorType()) {
+    case MAP:
+      expandMap((AbstractMapVector) v, prefix + v.getField().getName() + ".", colSize.entryCount);
+      break;
+    case LIST:
+      expandList((RepeatedListVector) v, prefix + v.getField().getName() + ".", colSize.entryCount);
+      break;
+    default:
+      v.collectLedgers(ledgers);
+    }
+
     netRowWidth += colSize.estSize;
-    netRowWidthCap50 += ! colSize.variableWidth ? colSize.estSize :
-        8 /* offset vector */ + roundUpToPowerOf2( Math.min(colSize.estSize,50) );
+    netRowWidthCap50 += ! colSize.isVariableWidth ? colSize.estSize :
+        8 /* offset vector */ + roundUpToPowerOf2(Math.min(colSize.estSize,50));
         // above change 8 to 4 after DRILL-5446 is fixed
-    return colSize.estSize;
   }
 
-  private int expandMap(AbstractMapVector mapVector) {
-    int accum = 0;
+  private void expandMap(AbstractMapVector mapVector, String prefix, int valueCount) {
     for (ValueVector vector : mapVector) {
-      accum += measureColumn(vector);
+      measureColumn(vector, prefix, valueCount);
     }
-    return accum;
+
+    // For a repeated map, we need the memory for the offset vector (only).
+    // Map elements are recursively expanded above.
+
+    if (mapVector.getField().getDataMode() == DataMode.REPEATED) {
+      ((RepeatedMapVector) mapVector).getOffsetVector().collectLedgers(ledgers);
+    }
+  }
+
+  private void expandList(RepeatedListVector vector, String prefix, int valueCount) {
+    measureColumn(vector.getDataVector(), prefix, valueCount);
+
+    // Determine memory for the offset vector (only).
+
+    vector.collectLedgers(ledgers);
   }
 
   public static int roundUp(int num, int denom) {
-    if(denom == 0) {
+    if (denom == 0) {
       return 0;
     }
     return (int) Math.ceil((double) num / denom);
@@ -283,8 +433,8 @@ public class RecordBatchSizer {
    * and null marking columns.
    * @return "real" width of the row
    */
-  public int netRowWidthCap50() { return netRowWidthCap50 + numNullables; }
-  public int actualSize() { return totalBatchSize; }
+  public int netRowWidthCap50() { return netRowWidthCap50 + nullableCount; }
+  public int actualSize() { return accountedMemorySize; }
   public boolean hasSv2() { return hasSv2; }
   public int avgDensity() { return avgDensity; }
   public int netSize() { return netBatchSize; }
@@ -304,14 +454,32 @@ public class RecordBatchSizer {
     buf.append( "  Records: " );
     buf.append(rowCount);
     buf.append(", Total size: ");
-    buf.append(totalBatchSize);
-    buf.append(", Gross row width:");
+    buf.append(accountedMemorySize);
+    buf.append(", Data size: ");
+    buf.append(netBatchSize);
+    buf.append(", Gross row width: ");
     buf.append(grossRowWidth);
-    buf.append(", Net row width:");
+    buf.append(", Net row width: ");
     buf.append(netRowWidth);
-    buf.append(", Density:");
+    buf.append(", Density: ");
     buf.append(avgDensity);
     buf.append("}");
     return buf.toString();
   }
+
+  /**
+   * The column size information gathered here represents empirically-derived
+   * schema metadata. Use that metadata to create an instance of a class that
+   * allocates memory for new vectors based on the observed size information.
+   * The caller provides the row count; the size information here provides
+   * column widths, number of elements in each array, etc.
+   */
+
+  public VectorInitializer buildVectorInitializer() {
+    VectorInitializer initializer = new VectorInitializer();
+    for (ColumnSize colSize : columnSizes) {
+      colSize.buildVectorInitializer(initializer);
+    }
+    return initializer;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
index 5a02cee..c96c5dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.directory.api.util.Strings;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
@@ -36,6 +37,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.config.Sort;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.hadoop.conf.Configuration;
@@ -358,10 +360,17 @@ public class SpillSet {
   private long writeBytes;
 
   public SpillSet(FragmentContext context, PhysicalOperator popConfig) {
-    this(context.getConfig(), context.getHandle(), popConfig);
+    this(context.getConfig(), context.getHandle(), popConfig,
+         // Endpoint appears to be null in some tests.
+         context.getDrillbitContext() == null ? null :
+         context.getDrillbitContext().getEndpoint());
   }
 
-  public SpillSet(DrillConfig config, FragmentHandle handle, PhysicalOperator popConfig) {
+  public SpillSet(FragmentContext context, PhysicalOperator popConfig, DrillbitEndpoint ep) {
+    this(context.getConfig(), context.getHandle(), popConfig, ep);
+  }
+
+  public SpillSet(DrillConfig config, FragmentHandle handle, PhysicalOperator popConfig, DrillbitEndpoint ep) {
     String operName;
 
     // Set the spill options from the configuration
@@ -413,7 +422,16 @@ public class SpillSet {
       fileManager = new HadoopFileManager(spillFs);
     }
 
-    spillDirName = String.format("%s_%s_%s-%s_minor%s", QueryIdHelper.getQueryId(handle.getQueryId()),
+    // If provided with a prefix to identify the Drillbit, prepend that to the
+    // spill directory.
+
+    String nodeDir = "";
+    if (ep != null  &&  ep.getAddress() != null) {
+      nodeDir = ep.getAddress() + "-" + ep.getUserPort() + "/";
+    }
+    spillDirName = String.format("%s%s_%s_%s-%s-%s",
+        nodeDir,
+        QueryIdHelper.getQueryId(handle.getQueryId()),
         operName, handle.getMajorFragmentId(), popConfig.getOperatorId(), handle.getMinorFragmentId());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 0d7fccc..2be1ed5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -281,7 +281,6 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
 
       // Validate schema when available.
       if (batchState == OK || batchState == OK_NEW_SCHEMA) {
-        final BatchSchema prevLastSchema = lastSchema;
         final BatchSchema prevLastNewSchema = lastNewSchema;
 
         lastSchema = incoming.getSchema();
@@ -365,4 +364,6 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
                       this.getClass().getCanonicalName()));
   }
 
+  public RecordBatch getIncoming() { return incoming; }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 34aa46a..2f3d2f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -26,6 +26,7 @@ import javax.inject.Named;
 
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
@@ -92,8 +93,9 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
    * @return
    */
   public static long memoryNeeded(final int recordCount) {
-    // We need 4 bytes (SV4) for each record.
-    return recordCount * 4;
+    // We need 4 bytes (SV4) for each record, power of 2 rounded.
+
+    return BaseAllocator.nextPowerOfTwo(recordCount * 4);
   }
 
   private int merge(final int leftStart, final int rightStart, final int rightEnd, final int outStart) {

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
index a74183c..f16bec6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
@@ -212,11 +212,16 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
         reader = VectorSerializer.reader(allocator, inputStream);
       }
       Stopwatch watch = Stopwatch.createStarted();
+      long start = allocator.getAllocatedMemory();
       VectorContainer c =  reader.read();
+      long end = allocator.getAllocatedMemory();
+      logger.trace("Read {} records in {} us; size = {}, memory = {}",
+                   c.getRecordCount(),
+                   watch.elapsed(TimeUnit.MICROSECONDS),
+                   (end - start), end);
       if (schema != null) {
         c = SchemaUtil.coerceContainer(c, schema, allocator);
       }
-      logger.trace("Read {} records in {} us", c.getRecordCount(), watch.elapsed(TimeUnit.MICROSECONDS));
       spilledBatches--;
       currentContainer.zeroVectors();
       Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
index 8f1b5e5..5bf8073 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java
@@ -106,6 +106,7 @@ public class BufferedBatches {
     sorterWrapper.sortBatch(convertedBatch, sv2);
     bufferBatch(convertedBatch, sv2, batchSize);
   }
+
   /**
    * Convert an incoming batch into the agree-upon format.
    * @param incoming

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 1dbddee..ea1d605 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.ops.OperExecContext;
 import org.apache.drill.exec.ops.OperExecContextImpl;
 import org.apache.drill.exec.physical.config.ExternalSort;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
@@ -32,6 +33,7 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
@@ -156,12 +158,18 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 
 public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
-  protected static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ExternalSortBatch.class);
+
+  // For backward compatibility, masquerade as the original
+  // external sort. Else, some tests don't pass.
+
+  protected static final ControlsInjector injector =
+      ControlsInjectorFactory.getInjector(org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.class);
 
   public static final String INTERRUPTION_AFTER_SORT = "after-sort";
   public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
   public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
   public static final String INTERRUPTION_WHILE_MERGING = "merging";
+  private boolean retainInMemoryBatchesOnNone;
 
   private final RecordBatch incoming;
 
@@ -210,7 +218,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
     SortConfig sortConfig = new SortConfig(context.getConfig());
     SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(),
-                                     popConfig);
+                                     popConfig, context.getIdentity());
     OperExecContext opContext = new OperExecContextImpl(context, oContext, popConfig, injector);
     PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
     SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);
@@ -233,6 +241,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     return resultsIterator.getSv4();
   }
 
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    return resultsIterator.getSv2();
+  }
+
   /**
    * Called by {@link AbstractRecordBatch} as a fast-path to obtain
    * the first record batch and setup the schema of this batch in order
@@ -307,11 +320,18 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       // Close the iterator here to release any remaining resources such
       // as spill files. This is important when a query has a join: the
       // first branch sort may complete before the second branch starts;
-      // it may be quite a while after returning the last row before the
-      // fragment executor calls this opeator's close method.
-
-      resultsIterator.close();
-      resultsIterator = null;
+      // it may be quite a while after returning the last batch before the
+      // fragment executor calls this operator's close method.
+      //
+      // Note however, that the StreamingAgg operator REQUIRES that the sort
+      // retain the batches behind an SV4 when doing an in-memory sort because
+      // the StreamingAgg retains a reference to that data that it will use
+      // after receiving a NONE result code. See DRILL-5656.
+
+      if (! this.retainInMemoryBatchesOnNone) {
+        resultsIterator.close();
+        resultsIterator = null;
+      }
       return IterOutcome.NONE;
     }
   }
@@ -356,7 +376,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       return IterOutcome.NONE;
     }
 
-    // sort may have prematurely exited due to should continue returning false.
+    // sort may have prematurely exited due to shouldContinue() returning false.
 
     if (! context.shouldContinue()) {
       sortState = SortState.DONE;
@@ -410,7 +430,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
       logger.error("received OUT_OF_MEMORY, trying to spill");
       if (! sortImpl.forceSpill()) {
-        throw UserException.memoryError("Received OUT_OF_MEMORY, but enough batches to spill")
+        throw UserException.memoryError("Received OUT_OF_MEMORY, but not enough batches to spill")
           .build(logger);
       }
       break;
@@ -501,4 +521,40 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       throw ex;
     }
   }
+
+  /**
+   * Workaround for DRILL-5656. We wish to release the batches for an
+   * in-memory sort once data is delivered. Normally, we can release them
+   * just before returning NONE. But, the StreamingAggBatch requires that
+   * the batches still be present on NONE. This method "sniffs" the input
+   * provided, and if the external sort, sets a mode that retains the
+   * batches. Yes, it is a horrible hack. But, necessary until the Drill
+   * iterator protocol can be revised.
+   *
+   * @param incoming the incoming batch for some operator which may
+   * (or may not) be an external sort (or, an external sort wrapped
+   * in a batch iterator validator.)
+   */
+
+  public static void retainSv4OnNone(RecordBatch incoming) {
+    if (incoming instanceof IteratorValidatorBatchIterator) {
+      incoming = ((IteratorValidatorBatchIterator) incoming).getIncoming();
+    }
+    if (incoming instanceof ExternalSortBatch) {
+      ((ExternalSortBatch) incoming).retainInMemoryBatchesOnNone = true;
+    }
+  }
+
+  public static void releaseBatches(RecordBatch incoming) {
+    if (incoming instanceof IteratorValidatorBatchIterator) {
+      incoming = ((IteratorValidatorBatchIterator) incoming).getIncoming();
+    }
+    if (incoming instanceof ExternalSortBatch) {
+      ExternalSortBatch esb = (ExternalSortBatch) incoming;
+      if (esb.resultsIterator != null) {
+        esb.resultsIterator.close();
+        esb.resultsIterator = null;
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
index da41e5e..5b07c4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
@@ -58,7 +58,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
 
   @Override
   public void setup(final FragmentExecContext context, final BufferAllocator allocator, final SelectionVector4 vector4,
-                    final VectorContainer hyperBatch, int outputBatchSize) throws SchemaChangeException{
+                    final VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException{
     // we pass in the local hyperBatch since that is where we'll be reading data.
     Preconditions.checkNotNull(vector4);
     this.vector4 = vector4.createNewWrapperCurrent();
@@ -89,7 +89,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
 
     @SuppressWarnings("resource")
     final DrillBuf drillBuf = allocator.buffer(4 * totalCount);
-    desiredRecordBatchCount = Math.min(outputBatchSize, Character.MAX_VALUE);
+    desiredRecordBatchCount = Math.min(outputBatchSize, desiredBatchSize);
     desiredRecordBatchCount = Math.min(desiredRecordBatchCount, totalCount);
     aux = new SelectionVector4(drillBuf, totalCount, desiredRecordBatchCount);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
index 06bbdea..71ae29e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
@@ -30,7 +30,8 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
  */
 
 public interface MSorter {
-  public void setup(FragmentExecContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch, int outputBatchSize) throws SchemaChangeException;
+  public void setup(FragmentExecContext context, BufferAllocator allocator, SelectionVector4 vector4,
+                    VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException;
   public void sort();
   public SelectionVector4 getSV4();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
index 3ab9af3..9fd995f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java
@@ -103,7 +103,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
    * @return the sv4 for this operator
    */
 
-  public void merge(List<BatchGroup.InputBatch> batchGroups) {
+  public void merge(List<BatchGroup.InputBatch> batchGroups, int outputBatchSize) {
 
     // Add the buffered batches to a collection that MSorter can use.
     // The builder takes ownership of the batches and will release them if
@@ -124,7 +124,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
       sv4 = builder.getSv4();
       Sort popConfig = context.getOperatorDefn();
       mSorter = createNewMSorter(popConfig.getOrderings(), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
-      mSorter.setup(context, context.getAllocator(), sv4, destContainer, sv4.getCount());
+      mSorter.setup(context, context.getAllocator(), sv4, destContainer, sv4.getCount(), outputBatchSize);
     } catch (SchemaChangeException e) {
       throw UserException.unsupportedError(e)
             .message("Unexpected schema change - likely code error.")
@@ -139,7 +139,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
     context.injectUnchecked(ExternalSortBatch.INTERRUPTION_AFTER_SORT);
     sv4 = mSorter.getSV4();
 
-    destContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
+//    destContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
   }
 
   private MSorter createNewMSorter(List<Ordering> orderings, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) {
@@ -147,7 +147,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
     cg.plainJavaCapable(true);
 
     // Uncomment out this line to debug the generated code.
-//  cg.saveCodeForDebugging(true);
+//    cg.saveCodeForDebugging(true);
     ClassGenerator<MSorter> g = cg.getRoot();
     g.setMappingSet(mainMapping);
 
@@ -232,13 +232,9 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults {
     } catch (RuntimeException e) {
       ex = (ex == null) ? e : ex;
     }
-    try {
-      if (sv4 != null) {
-        sv4.clear();
-      }
-    } catch (RuntimeException e) {
-      ex = (ex == null) ? e : ex;
-    }
+
+    // Sv4 is cleared by the builder, above.
+
     if (ex != null) {
       throw ex;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
index 7a460f5..b48c012 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
@@ -66,7 +66,6 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
 
   @Override
   public int next(int targetRecordCount) {
-    VectorAccessibleUtilities.allocateVectors(outgoing, targetRecordCount);
     for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; outgoingIndex++) {
       if (queueSize == 0) {
         return 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
index 6b71782..6ec8862 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
@@ -34,7 +34,9 @@ import org.apache.drill.exec.ops.OperExecContext;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorInitializer;
 import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -100,10 +102,12 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper {
    * @param batchGroupList
    * @param outputContainer
    * @param targetRecordCount
+   * @param allocHelper
    * @return
    */
-  public BatchMerger startMerge(BatchSchema schema, List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer, int targetRecordCount) {
-    return new BatchMerger(this, schema, batchGroupList, outputContainer, targetRecordCount);
+  public BatchMerger startMerge(BatchSchema schema, List<? extends BatchGroup> batchGroupList,
+              VectorContainer outputContainer, int targetRecordCount, VectorInitializer allocHelper) {
+    return new BatchMerger(this, schema, batchGroupList, outputContainer, targetRecordCount, allocHelper);
   }
 
   /**
@@ -195,11 +199,11 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper {
 
   public static class BatchMerger implements SortResults, AutoCloseable {
 
-    private PriorityQueueCopierWrapper holder;
-    private VectorContainer hyperBatch;
-    private VectorContainer outputContainer;
-    private int targetRecordCount;
-    private int copyCount;
+    private final PriorityQueueCopierWrapper holder;
+    private final VectorContainer hyperBatch;
+    private final VectorContainer outputContainer;
+    private final VectorInitializer allocHelper;
+    private final int targetRecordCount;
     private int batchCount;
     private long estBatchSize;
 
@@ -212,8 +216,8 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper {
      * @param targetRecordCount number of records for each output batch
      */
     private BatchMerger(PriorityQueueCopierWrapper holder, BatchSchema schema, List<? extends BatchGroup> batchGroupList,
-                        int targetRecordCount) {
-      this(holder, schema, batchGroupList, new VectorContainer(), targetRecordCount);
+                        int targetRecordCount, VectorInitializer allocHelper) {
+      this(holder, schema, batchGroupList, new VectorContainer(), targetRecordCount, allocHelper);
     }
 
     /**
@@ -224,12 +228,13 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper {
      * @param batchGroupList the input batches
      * @param outputContainer merges output batch into the given output container
      * @param targetRecordCount number of records for each output batch
+     * @param allocHelper
      */
     private BatchMerger(PriorityQueueCopierWrapper holder, BatchSchema schema, List<? extends BatchGroup> batchGroupList,
-                        VectorContainer outputContainer, int targetRecordCount) {
+                        VectorContainer outputContainer, int targetRecordCount, VectorInitializer allocHelper) {
       this.holder = holder;
+      this.allocHelper = allocHelper;
       hyperBatch = constructHyperBatch(schema, batchGroupList);
-      copyCount = 0;
       this.targetRecordCount = targetRecordCount;
       this.outputContainer = outputContainer;
       holder.createCopier(hyperBatch, batchGroupList, outputContainer);
@@ -245,29 +250,35 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper {
 
     @Override
     public boolean next() {
-      Stopwatch w = Stopwatch.createStarted();
       long start = holder.getAllocator().getAllocatedMemory();
+
+      // Allocate an outgoing container the "dumb" way (based on static sizes)
+      // for testing, or the "smart" way (based on actual observed data sizes)
+      // for production code.
+
+      if (allocHelper == null) {
+        VectorAccessibleUtilities.allocateVectors(outputContainer, targetRecordCount);
+      } else {
+        allocHelper.allocateBatch(outputContainer, targetRecordCount);
+      }
+      logger.trace("Initial output batch allocation: {} bytes",
+                   holder.getAllocator().getAllocatedMemory() - start);
+      Stopwatch w = Stopwatch.createStarted();
       int count = holder.copier.next(targetRecordCount);
-      copyCount += count;
       if (count > 0) {
         long t = w.elapsed(TimeUnit.MICROSECONDS);
         batchCount++;
-        logger.trace("Took {} us to merge {} records", t, count);
         long size = holder.getAllocator().getAllocatedMemory() - start;
+        logger.trace("Took {} us to merge {} records, consuming {} bytes of memory",
+                     t, count, size);
         estBatchSize = Math.max(estBatchSize, size);
       } else {
         logger.trace("copier returned 0 records");
       }
 
-      // Identify the schema to be used in the output container. (Since
-      // all merged batches have the same schema, the schema we identify
-      // here should be the same as that which we already had.
+      // Initialize output container metadata.
 
       outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-
-      // The copier does not set the record count in the output
-      // container, so do that here.
-
       outputContainer.setRecordCount(count);
 
       return count > 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
index e47d67e..8ae3998 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 
 public class SortConfig {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
 
   /**
    * Smallest allowed output batch size. The smallest output batch
@@ -62,6 +63,12 @@ public class SortConfig {
 
   private final int bufferedBatchLimit;
 
+  /**
+   * Limit the size of the in-memory merge return batches.
+   * Primarily for testing.
+   */
+
+  private final int mSortBatchSize;
 
   public SortConfig(DrillConfig config) {
 
@@ -101,15 +108,26 @@ public class SortConfig {
     } else {
       bufferedBatchLimit = Math.max(value, 2);
     }
+
+    // Limit on memory merge batch size; primarily for testing
+
+    if (config.hasPath(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE)) {
+      mSortBatchSize = Math.max(1,
+            Math.min(Character.MAX_VALUE,
+                     config.getInt(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE)));
+    } else {
+      mSortBatchSize = Character.MAX_VALUE;
+    }
+
     logConfig();
   }
 
   private void logConfig() {
-    ExternalSortBatch.logger.debug("Config: " +
+    logger.debug("Config: " +
                  "spill file size = {}, spill batch size = {}, " +
-                 "merge limit = {}, merge batch size = {}",
-                  spillFileSize(), spillFileSize(),
-                  mergeLimit(), mergeBatchSize());
+                 "merge batch size = {}, mSort batch size = {}",
+                  spillFileSize, spillBatchSize,
+                  mergeBatchSize, mSortBatchSize);
   }
 
   public long maxMemory() { return maxMemory; }
@@ -118,4 +136,5 @@ public class SortConfig {
   public int spillBatchSize() { return spillBatchSize; }
   public int mergeBatchSize() { return mergeBatchSize; }
   public int getBufferedBatchLimit() { return bufferedBatchLimit; }
+  public int getMSortBatchSize() { return mSortBatchSize; }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/073ea681/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
index 6f0da3d..7420402 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -27,9 +27,12 @@ import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
 import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorInitializer;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
@@ -70,6 +73,103 @@ public class SortImpl {
     SelectionVector4 getSv4();
   }
 
+  public static class EmptyResults implements SortResults {
+
+    private final VectorContainer dest;
+
+    public EmptyResults(VectorContainer dest) {
+      dest.setRecordCount(0);
+      dest.buildSchema(SelectionVectorMode.NONE);
+      this.dest = dest;
+    }
+
+    @Override
+    public boolean next() { return false; }
+
+    @Override
+    public void close() { }
+
+    @Override
+    public int getBatchCount() { return 0; }
+
+    @Override
+    public int getRecordCount() { return 0; }
+
+    @Override
+    public SelectionVector4 getSv4() { return null; }
+
+    @Override
+    public SelectionVector2 getSv2() { return null; }
+
+    @Override
+    public VectorContainer getContainer() { return dest; }
+  }
+
+
+  /**
+   * Return results for a single input batch. No merge is needed;
+   * the original (sorted) input batch is simply passed as the result.
+   * Note that this version requires replacing the operator output
+   * container with the batch container. (Vector ownership transfer
+   * was already done when accepting the input batch.)
+   */
+
+  public static class SingleBatchResults implements SortResults {
+
+    private boolean done;
+    private final VectorContainer outputContainer;
+    private final BatchGroup.InputBatch batch;
+
+    public SingleBatchResults(BatchGroup.InputBatch batch, VectorContainer outputContainer) {
+      this.batch = batch;
+      this.outputContainer = outputContainer;
+    }
+
+    @Override
+    public boolean next() {
+      if (done) {
+        return false;
+      }
+
+      // The following implementation is wrong. Must transfer buffers,
+      // not vectors. The output container already contains vectors
+      // for the output schema.
+
+      for (VectorWrapper<?> vw : batch.getContainer()) {
+        outputContainer.add(vw.getValueVector());
+      }
+      outputContainer.buildSchema(SelectionVectorMode.TWO_BYTE);
+      outputContainer.setRecordCount(batch.getRecordCount());
+      done = true;
+      return true;
+    }
+
+    @Override
+    public void close() {
+      try {
+        batch.close();
+      } catch (IOException e) {
+        // Should never occur for an input batch
+        throw new IllegalStateException(e);
+      }
+    }
+
+    @Override
+    public int getBatchCount() { return 1; }
+
+    @Override
+    public int getRecordCount() { return outputContainer.getRecordCount(); }
+
+    @Override
+    public SelectionVector4 getSv4() { return null; }
+
+    @Override
+    public SelectionVector2 getSv2() { return batch.getSv2(); }
+
+    @Override
+    public VectorContainer getContainer() { return outputContainer; }
+  }
+
   private final SortConfig config;
   private final SortMetrics metrics;
   private final SortMemoryManager memManager;
@@ -88,7 +188,12 @@ public class SortImpl {
 
   private final BufferedBatches bufferedBatches;
 
-  public SortImpl(OperExecContext opContext, SortConfig sortConfig, SpilledRuns spilledRuns, VectorContainer batch) {
+  private RecordBatchSizer sizer;
+
+  private VectorInitializer allocHelper;
+
+  public SortImpl(OperExecContext opContext, SortConfig sortConfig,
+                  SpilledRuns spilledRuns, VectorContainer batch) {
     this.context = opContext;
     outputBatch = batch;
     this.spilledRuns = spilledRuns;
@@ -103,7 +208,10 @@ public class SortImpl {
     // limit will reduce the probability that random chance causes the allocator
     // to kill the query because of a small, spurious over-allocation.
 
-    allocator.setLimit((long)(allocator.getLimit() * 1.10));
+    long maxMem = memManager.getMemoryLimit();
+    long newMax = (long)(maxMem * 1.10);
+    allocator.setLimit(newMax);
+    logger.debug("Config: Resetting allocator to 10% safety margin: {}", newMax);
   }
 
   public void setSchema(BatchSchema schema) {
@@ -138,16 +246,16 @@ public class SortImpl {
     // ownership. Allows us to figure out if we need to spill first,
     // to avoid overflowing memory simply due to ownership transfer.
 
-    RecordBatchSizer sizer = analyzeIncomingBatch(incoming);
+   analyzeIncomingBatch(incoming);
 
     // The heart of the external sort operator: spill to disk when
     // the in-memory generation exceeds the allowed memory limit.
     // Preemptively spill BEFORE accepting the new batch into our memory
-    // pool. The allocator will throw an OOM exception if we accept the
-    // batch when we are near the limit - despite the fact that the batch
-    // is already in memory and no new memory is allocated during the transfer.
+    // pool. Although the allocator will allow us to exceed the memory limit
+    // during the transfer, we immediately follow the transfer with an SV2
+    // allocation that will fail if we are over the allocation limit.
 
-    if ( isSpillNeeded(sizer.actualSize())) {
+    if (isSpillNeeded(sizer.actualSize())) {
       spillFromMemory();
     }
 
@@ -172,7 +280,12 @@ public class SortImpl {
     // (which may exclude some records due to filtering.)
 
     validateBatchSize(sizer.actualSize(), batchSize);
-    memManager.updateEstimates((int) batchSize, sizer.netRowWidth(), sizer.rowCount());
+    if (memManager.updateEstimates((int) batchSize, sizer.netRowWidth(), sizer.rowCount())) {
+
+      // If estimates changed, discard the helper based on the old estimates.
+
+      allocHelper = null;
+    }
   }
 
   /**
@@ -181,13 +294,12 @@ public class SortImpl {
    * @return an analysis of the incoming batch
    */
 
-  private RecordBatchSizer analyzeIncomingBatch(VectorAccessible incoming) {
-    RecordBatchSizer sizer = new RecordBatchSizer(incoming);
+  private void analyzeIncomingBatch(VectorAccessible incoming) {
+    sizer = new RecordBatchSizer(incoming);
     sizer.applySv2();
     if (metrics.getInputBatchCount() == 0) {
       logger.debug("{}", sizer.toString());
     }
-    return sizer;
   }
 
   /**
@@ -195,20 +307,32 @@ public class SortImpl {
    * Spilling is driven purely by memory availability (and an optional
    * batch limit for testing.)
    *
-   * @return true if spilling is needed, false otherwise
+   * @return true if spilling is needed (and possible), false otherwise
    */
 
   private boolean isSpillNeeded(int incomingSize) {
 
+    if (bufferedBatches.size() >= config.getBufferedBatchLimit()) {
+      return true;
+    }
+
     // Can't spill if less than two batches else the merge
     // can't make progress.
 
+    final boolean spillNeeded = memManager.isSpillNeeded(allocator.getAllocatedMemory(), incomingSize);
     if (bufferedBatches.size() < 2) {
-      return false; }
 
-    if (bufferedBatches.size() >= config.getBufferedBatchLimit()) {
-      return true; }
-    return memManager.isSpillNeeded(allocator.getAllocatedMemory(), incomingSize);
+      // If we can't fit the batch into memory, then place a definite error
+      // message into the log to simplify debugging.
+
+      if (spillNeeded) {
+        logger.error("Insufficient memory to merge two batches. Incoming batch size: {}, available memory: {}",
+                     incomingSize, memManager.freeMemory(allocator.getAllocatedMemory()));
+      }
+      return false;
+    }
+
+    return spillNeeded;
   }
 
   private void validateBatchSize(long actualBatchSize, long memoryDelta) {
@@ -235,46 +359,23 @@ public class SortImpl {
 
     // Do the actual spill.
 
-    logger.trace("Spilling {} of {} batches, memory = {}",
+    logger.trace("Spilling {} of {} batches, allocated memory = {} bytes",
         batchesToSpill.size(), startCount,
         allocator.getAllocatedMemory());
     int spillBatchRowCount = memManager.getSpillBatchRowCount();
-    spilledRuns.mergeAndSpill(batchesToSpill, spillBatchRowCount);
+    spilledRuns.mergeAndSpill(batchesToSpill, spillBatchRowCount, allocHelper());
     metrics.incrSpillCount();
   }
 
-  public SortMetrics getMetrics() { return metrics; }
-
-  public static class EmptyResults implements SortResults {
-
-    private final VectorContainer dest;
-
-    public EmptyResults(VectorContainer dest) {
-      this.dest = dest;
+  private VectorInitializer allocHelper() {
+    if (allocHelper == null) {
+      allocHelper = sizer.buildVectorInitializer();
     }
-
-    @Override
-    public boolean next() { return false; }
-
-    @Override
-    public void close() { }
-
-    @Override
-    public int getBatchCount() { return 0; }
-
-    @Override
-    public int getRecordCount() { return 0; }
-
-    @Override
-    public SelectionVector4 getSv4() { return null; }
-
-    @Override
-    public SelectionVector2 getSv2() { return null; }
-
-    @Override
-    public VectorContainer getContainer() { return dest; }
+    return allocHelper;
   }
 
+  public SortMetrics getMetrics() { return metrics; }
+
   public SortResults startMerge() {
     if (metrics.getInputRowCount() == 0) {
       return new EmptyResults(outputBatch);
@@ -284,72 +385,22 @@ public class SortImpl {
         metrics.getInputBatchCount(), spilledRuns.size(),
         metrics.getInputBytes());
 
-    // Do the merge of the loaded batches. The merge can be done entirely in memory if
-    // the results fit; else we have to do a disk-based merge of
-    // pre-sorted spilled batches.
+    // Do the merge of the loaded batches. The merge can be done entirely in
+    // memory if the results fit; else we have to do a disk-based merge of
+    // pre-sorted spilled batches. Special case the single-batch query;
+    // this accelerates small, quick queries.
+    //
+    // Note: disabling this optimization because it turns out to be
+    // quite hard to transfer a set of vectors from one place to another.
 
-    boolean optimizeOn = true; // Debug only
-    if (optimizeOn && metrics.getInputBatchCount() == 1) {
+    /* if (metrics.getInputBatchCount() == 1) {
       return singleBatchResult();
-    } else if (canUseMemoryMerge()) {
+    } else */ if (canUseMemoryMerge()) {
       return mergeInMemory();
     } else {
       return mergeSpilledRuns();
     }
   }
-
-  /**
-   * Return results for a single input batch. No merge is needed;
-   * the original (sorted) input batch is simply passed as the result.
-   * Note that this version requires replacing the operator output
-   * container with the batch container. (Vector ownership transfer
-   * was already done when accepting the input batch.)
-   */
-
-  public static class SingleBatchResults implements SortResults {
-
-    private boolean done;
-    private final BatchGroup.InputBatch batch;
-
-    public SingleBatchResults(BatchGroup.InputBatch batch) {
-      this.batch = batch;
-    }
-
-    @Override
-    public boolean next() {
-      if (done) {
-        return false;
-      }
-      done = true;
-      return true;
-    }
-
-    @Override
-    public void close() {
-      try {
-        batch.close();
-      } catch (IOException e) {
-        // Should never occur for an input batch
-        throw new IllegalStateException(e);
-      }
-    }
-
-    @Override
-    public int getBatchCount() { return 1; }
-
-    @Override
-    public int getRecordCount() { return batch.getRecordCount(); }
-
-    @Override
-    public SelectionVector4 getSv4() { return null; }
-
-    @Override
-    public SelectionVector2 getSv2() { return batch.getSv2(); }
-
-    @Override
-    public VectorContainer getContainer() {return batch.getContainer(); }
-  }
-
   /**
    * Input consists of a single batch. Just return that batch as
    * the output.
@@ -358,7 +409,7 @@ public class SortImpl {
 
   private SortResults singleBatchResult() {
     List<InputBatch> batches = bufferedBatches.removeAll();
-    return new SingleBatchResults(batches.get(0));
+    return new SingleBatchResults(batches.get(0), outputBatch);
   }
 
   /**
@@ -413,7 +464,7 @@ public class SortImpl {
 
     MergeSortWrapper memoryMerge = new MergeSortWrapper(context, outputBatch);
     try {
-      memoryMerge.merge(bufferedBatches.removeAll());
+      memoryMerge.merge(bufferedBatches.removeAll(), config.getMSortBatchSize());
     } catch (Throwable t) {
       memoryMerge.close();
       throw t;
@@ -461,13 +512,13 @@ public class SortImpl {
     }
 
     int mergeRowCount = memManager.getMergeBatchRowCount();
-    return spilledRuns.finalMerge(bufferedBatches.removeAll(), outputBatch, mergeRowCount);
+    return spilledRuns.finalMerge(bufferedBatches.removeAll(), outputBatch, mergeRowCount, allocHelper);
   }
 
   private void mergeRuns(int targetCount) {
     long mergeMemoryPool = memManager.getMergeMemoryLimit();
     int spillBatchRowCount = memManager.getSpillBatchRowCount();
-    spilledRuns.mergeRuns(targetCount, mergeMemoryPool, spillBatchRowCount);
+    spilledRuns.mergeRuns(targetCount, mergeMemoryPool, spillBatchRowCount, allocHelper);
     metrics.incrMergeCount();
   }