You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by Timothy Chen <tn...@gmail.com> on 2014/05/30 01:55:42 UTC

Re: [02/15] git commit: DRILL-856: Reduce code-generation size for MergingReceiver

So what's the strategy for reducing code gen?

It looks like there are multiple places that uses the same structure and will be helpful to share so for new operators it can have some guidelines.

Tim

Sent from my iPhone

> On May 29, 2014, at 4:45 PM, jacques@apache.org wrote:
> 
> DRILL-856: Reduce code-generation size for MergingReceiver
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
> Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b1d91c81
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b1d91c81
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b1d91c81
> 
> Branch: refs/heads/master
> Commit: b1d91c8187d197991306b4d054db135121ceca85
> Parents: 58cf129
> Author: Steven Phillips <sp...@maprtech.com>
> Authored: Thu May 29 03:18:28 2014 -0700
> Committer: Steven Phillips <sp...@maprtech.com>
> Committed: Thu May 29 03:18:28 2014 -0700
> 
> ----------------------------------------------------------------------
> .../MergingReceiverGeneratorBase.java           |  13 +-
> .../mergereceiver/MergingReceiverTemplate.java  |  33 +-
> .../impl/mergereceiver/MergingRecordBatch.java  | 314 ++++---------------
> 3 files changed, 75 insertions(+), 285 deletions(-)
> ----------------------------------------------------------------------
> 
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1d91c81/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
> index 46a156f..237f2f8 100644
> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
> @@ -23,25 +23,26 @@ import org.apache.drill.exec.exception.SchemaChangeException;
> import org.apache.drill.exec.ops.FragmentContext;
> import org.apache.drill.exec.record.RecordBatch;
> import org.apache.drill.exec.record.RecordBatchLoader;
> +import org.apache.drill.exec.record.VectorAccessible;
> 
> import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
> 
> public interface MergingReceiverGeneratorBase {
> 
>   public abstract void doSetup(FragmentContext context,
> -                               RecordBatchLoader[] incomingBatchLoaders,
> -                               RecordBatch outgoing) throws SchemaChangeException;
> +                               VectorAccessible incoming,
> +                               VectorAccessible outgoing) throws SchemaChangeException;
> 
> -  public abstract int doCompare(MergingRecordBatch.Node left,
> -                                MergingRecordBatch.Node right);
> +  public abstract int doEval(int leftIndex,
> +                                int rightIndex);
> 
> -  public abstract boolean doCopy(int inBatch, int inIndex, int outIndex);
> +  public abstract boolean doCopy(int inIndex, int outIndex);
> 
>   public static TemplateClassDefinition<MergingReceiverGeneratorBase> TEMPLATE_DEFINITION =
>       new TemplateClassDefinition<>(MergingReceiverGeneratorBase.class, MergingReceiverTemplate.class);
> 
>   public final MappingSet compareMapping =
> -    new MappingSet("left.valueIndex", "right.valueIndex",
> +    new MappingSet("leftIndex", "rightIndex",
>       GM("doSetup", "doCompare", null, null),
>       GM("doSetup", "doCompare", null, null));
> 
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1d91c81/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
> index 197e960..002e054 100644
> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
> @@ -21,6 +21,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
> import org.apache.drill.exec.ops.FragmentContext;
> import org.apache.drill.exec.record.RecordBatch;
> import org.apache.drill.exec.record.RecordBatchLoader;
> +import org.apache.drill.exec.record.VectorAccessible;
> 
> import javax.inject.Named;
> 
> @@ -29,34 +30,12 @@ public abstract class MergingReceiverTemplate implements MergingReceiverGenerato
> 
>   public MergingReceiverTemplate() throws SchemaChangeException { }
> 
> -  /**
> -   * Enter the generated setup routine
> -   * @param context current fragment context
> -   * @param incomingBatchLoaders one RecordBatchLoader per sender
> -   * @param outgoing outgoing RecordBatch iterator
> -   * @throws SchemaChangeException
> -   */
>   public abstract void doSetup(@Named("context") FragmentContext context,
> -                               @Named("incomingBatchLoaders") RecordBatchLoader[] incomingBatchLoaders,
> -                               @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
> +                               @Named("incoming") VectorAccessible incoming,
> +                               @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
> 
> -  /**
> -   * Enter the generated comparator
> -   * @param leftNode  reference to the left-hand value and vector
> -   * @param rightNode reference to the right-hand value and vector
> -   * @return
> -   */
> -  public abstract int doCompare(@Named("leftNode") MergingRecordBatch.Node leftNode,
> -                                @Named("rightNode") MergingRecordBatch.Node rightNode);
> -
> -  /**
> -   * Enter the generated copy function
> -   * @param inBatch incoming batch to copy from
> -   * @param inIndex incoming record position to copy from
> -   * @param outIndex outgoing record position to copy to
> -   */
> -  public abstract boolean doCopy(@Named("inBatch") int inBatch, @Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
> -
> -//  public abstract void doEval(@Named("inBatch") int inBatch, @Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
> +  public abstract int doEval(@Named("leftIndex") int leftIndex,
> +                                @Named("rightIndex") int rightIndex);
> 
> +  public abstract boolean doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
> }
> 
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1d91c81/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
> ----------------------------------------------------------------------
> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
> index 9101202..be5bf76 100644
> --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
> +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
> @@ -25,15 +25,19 @@ import java.util.Iterator;
> import java.util.List;
> import java.util.PriorityQueue;
> 
> +import com.sun.codemodel.*;
> import org.apache.drill.common.expression.ErrorCollector;
> import org.apache.drill.common.expression.ErrorCollectorImpl;
> import org.apache.drill.common.expression.LogicalExpression;
> import org.apache.drill.common.expression.SchemaPath;
> import org.apache.drill.common.logical.data.Order.Ordering;
> import org.apache.drill.common.types.TypeProtos;
> +import org.apache.drill.exec.compile.sig.GeneratorMapping;
> +import org.apache.drill.exec.compile.sig.MappingSet;
> import org.apache.drill.exec.exception.ClassTransformationException;
> import org.apache.drill.exec.exception.SchemaChangeException;
> import org.apache.drill.exec.expr.ClassGenerator;
> +import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
> import org.apache.drill.exec.expr.CodeGenerator;
> import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
> import org.apache.drill.exec.expr.TypeHelper;
> @@ -43,20 +47,12 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
> import org.apache.drill.exec.ops.FragmentContext;
> import org.apache.drill.exec.ops.MetricDef;
> import org.apache.drill.exec.physical.config.MergingReceiverPOP;
> +import org.apache.drill.exec.physical.impl.xsort.PriorityQueueCopier;
> import org.apache.drill.exec.proto.UserBitShared;
> -import org.apache.drill.exec.record.AbstractRecordBatch;
> -import org.apache.drill.exec.record.BatchSchema;
> -import org.apache.drill.exec.record.RawFragmentBatch;
> -import org.apache.drill.exec.record.RawFragmentBatchProvider;
> -import org.apache.drill.exec.record.RecordBatch;
> -import org.apache.drill.exec.record.RecordBatchLoader;
> -import org.apache.drill.exec.record.SchemaBuilder;
> -import org.apache.drill.exec.record.TypedFieldId;
> -import org.apache.drill.exec.record.VectorContainer;
> -import org.apache.drill.exec.record.VectorWrapper;
> -import org.apache.drill.exec.record.WritableBatch;
> +import org.apache.drill.exec.record.*;
> import org.apache.drill.exec.record.selection.SelectionVector2;
> import org.apache.drill.exec.record.selection.SelectionVector4;
> +import org.apache.drill.exec.vector.CopyUtil;
> import org.apache.drill.exec.vector.ValueVector;
> import org.apache.drill.exec.vector.allocator.VectorAllocator;
> import org.eigenbase.rel.RelFieldCollation.Direction;
> @@ -65,13 +61,6 @@ import parquet.Preconditions;
> 
> import com.google.common.collect.ImmutableList;
> import com.google.common.collect.Lists;
> -import com.sun.codemodel.JArray;
> -import com.sun.codemodel.JClass;
> -import com.sun.codemodel.JExpr;
> -import com.sun.codemodel.JExpression;
> -import com.sun.codemodel.JMod;
> -import com.sun.codemodel.JType;
> -import com.sun.codemodel.JVar;
> 
> 
> /**
> @@ -100,7 +89,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
>   private int[] batchOffsets;
>   private PriorityQueue <Node> pqueue;
>   private List<VectorAllocator> allocators;
> -  private MergingReceiverPOP config;
> 
>   public static enum Metric implements MetricDef{
>     NEXT_WAIT_NANOS;
> @@ -118,7 +106,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
>     super(config, context);
>     this.fragProviders = fragProviders;
>     this.context = context;
> -    this.config = config;
>     this.allocators = Lists.newArrayList();
>     this.outgoingContainer = new VectorContainer();
>   }
> @@ -252,7 +239,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
>       // allocate the priority queue with the generated comparator
>       this.pqueue = new PriorityQueue<Node>(fragProviders.length, new Comparator<Node>() {
>         public int compare(Node node1, Node node2) {
> -          return merger.doCompare(node1, node2);
> +          int leftIndex = (node1.batchId << 16) + node1.valueIndex;
> +          int rightIndex = (node2.batchId << 16) + node2.valueIndex;
> +          return merger.doEval(leftIndex, rightIndex);
>         }
>       });
> 
> @@ -444,248 +433,68 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
>    */
>   private MergingReceiverGeneratorBase createMerger() throws SchemaChangeException {
> 
> -    // set up the expression evaluator and code generation
> -    final List<Ordering> orderings = config.getOrderings();
> -    final ErrorCollector collector = new ErrorCollectorImpl();
> -    final ClassGenerator<MergingReceiverGeneratorBase> cg =
> -        CodeGenerator.getRoot(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry());
> -    JExpression inIndex = JExpr.direct("inIndex");
> -    JExpression outIndex = JExpr.direct("outIndex");
> -
> -    JType valueVector2DArray = cg.getModel().ref(ValueVector.class).array().array();
> -    JType valueVectorArray = cg.getModel().ref(ValueVector.class).array();
> -    JType incomingBatchesType = cg.getModel().ref(RecordBatchLoader.class).array();
> -    JType incomingBatchType = cg.getModel().ref(RecordBatchLoader.class);
> -    JType recordBatchType = cg.getModel().ref(RecordBatch.class);
> -
> -    // declare a two-dimensional array of value vectors; batch is first dimension, ValueVectorId is the second
> -    JVar incomingVectors = cg.clazz.field(JMod.NONE,
> -      valueVector2DArray,
> -      "incomingVectors");
> -
> -    // declare a two-dimensional array of vectors used to store a reference to all ValueVectors
> -    // used in a comparison operation.  first dimension is the batch id.  each batch has one or more
> -    // comparison vectors, maintaining the order defined by the OrderDef.
> -    JVar comparisonVectors = cg.clazz.field(JMod.NONE,
> -      valueVector2DArray,
> -      "comparisonVectors");
> -
> -    // declare an array of incoming batches
> -    JVar incomingBatchesVar = cg.clazz.field(JMod.NONE,
> -      incomingBatchesType,
> -      "incomingBatches");
> -
> -    // declare an array of outgoing vectors
> -    JVar outgoingVectors = cg.clazz.field(JMod.NONE,
> -      valueVectorArray,
> -      "outgoingVectors");
> -
> -    // declare a reference to this MergingRecordBatch
> -    JVar outgoingBatch = cg.clazz.field(JMod.NONE,
> -      recordBatchType,
> -      "outgoingBatch");
> -
> -    // create aliases for materializer
> -    JVar incomingVar = cg.clazz.field(JMod.NONE, incomingBatchType, "incoming");
> -    cg.getSetupBlock().assign(incomingBatchesVar, JExpr.direct("incomingBatchLoaders"));
> -    cg.getSetupBlock().assign(outgoingBatch, JExpr.direct("outgoing"));
> -
> -    cg.setMappingSet(MergingReceiverGeneratorBase.compareMapping);
> -
> -    // evaluate expression on each incoming batch and create/initialize 2d array of incoming vectors.  For example:
> -    //     incomingVectors = new ValueVector[][] {
> -    //                         new ValueVector[] {vv1, vv2},
> -    //                         new ValueVector[] {vv3, vv4}
> -    //                       });
> -    int fieldsPerBatch = 0; // number of fields per batch
> -    int batchIdx = 0;
> -    JArray incomingVectorInit = JExpr.newArray(cg.getModel().ref(ValueVector.class).array());
> -    List <List<ValueVectorReadExpression>> cmpExpressions = Lists.newArrayList();
> -    for (RecordBatchLoader batch : batchLoaders) {
> -      JArray incomingVectorInitBatch = JExpr.newArray(cg.getModel().ref(ValueVector.class));
> -      int fieldIdx = 0;
> -      for (VectorWrapper<?> vv : batch) {
> -        // declare incoming value vector and assign it to the array
> -        JVar inVV = cg.declareVectorValueSetupAndMember("incomingBatches[" + batchIdx + "]",
> -          new TypedFieldId(vv.getField().getType(),
> -            false,
> -            fieldIdx));
> -
> -        // add vv to initialization list (e.g. { vv1, vv2, vv3 } )
> -        incomingVectorInitBatch.add(inVV);
> -        ++fieldIdx;
> -      }
> -
> -      // add VV array to initialization list (e.g. new ValueVector[] { ... })
> -      incomingVectorInit.add(incomingVectorInitBatch);
> -
> -      // materialize each expression for this incoming batch
> -      for (int i = 0; i < orderings.size(); ++i) {
> -        cmpExpressions.add(new ArrayList<ValueVectorReadExpression>());
> -        cg.getSetupBlock().assign(incomingVar, JExpr.direct("incomingBatches[" + batchIdx + "]"));
> -        LogicalExpression exprForCurrentBatch = ExpressionTreeMaterializer.materialize(orderings.get(i).getExpr(),
> -                                                                                       batch,
> -                                                                                       collector,
> -                                                                                       context.getFunctionRegistry());
> -        if (collector.hasErrors()) {
> -          throw new SchemaChangeException(
> -              String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.",
> -                            collector.toErrorString()));
> -        }
> +    try {
> +      CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry());
> +      ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot();
> 
> -        // add materialized field expression to comparison list
> -        if (exprForCurrentBatch instanceof ValueVectorReadExpression) {
> -          cmpExpressions.get(batchIdx).add((ValueVectorReadExpression) exprForCurrentBatch);
> -        }
> -        else {
> -          throw new SchemaChangeException("Invalid expression supplied to MergingReceiver operator");
> +      ExpandableHyperContainer batch = null;
> +      boolean first = true;
> +      for (RecordBatchLoader loader : batchLoaders) {
> +        if (first) {
> +          batch = new ExpandableHyperContainer(loader);
> +          first = false;
> +        } else {
> +          batch.addBatch(loader);
>         }
>       }
> 
> -      ++batchIdx;
> -      fieldsPerBatch = fieldIdx;
> -    }
> +      generateComparisons(g, batch);
> 
> -    // write out the incoming vector initialization block
> -    cg.getSetupBlock().assign(incomingVectors, incomingVectorInit);
> -
> -    // Generate the comparison function:
> -    // The generated code compares the fields defined in each logical expression.  The batch index
> -    // is supplied by the function caller (at runtime).  The comparison statements for each
> -    // expression are generated for each schema change.  Inequality checks (< and >) for each batch
> -    // are executed first to accommodate multiple expressions.  Equality is checked only for the last
> -    // expression, and only if all previous expressions are equal.  Expression order is applied
> -    // to the result of the FunctionCall.
> -
> -    JArray comparisonVectorInit = JExpr.newArray(cg.getModel().ref(ValueVector.class).array());
> -    for (int b = 0; b < cmpExpressions.size(); ++b) {
> -      JArray comparisonVectorInitBatch = JExpr.newArray(cg.getModel().ref(ValueVector.class));
> -
> -      for (ValueVectorReadExpression vvRead : cmpExpressions.get(b)) {
> -        TypeProtos.DataMode mode = vvRead.getMajorType().getMode();
> -        TypeProtos.MinorType minor = vvRead.getMajorType().getMinorType();
> -        Class cmpVectorClass = TypeHelper.getValueVectorClass(minor, mode);
> -
> -        JExpression arr = JExpr.newArray(cg.getModel().INT).add(JExpr.lit(vvRead.getFieldId().getFieldIds()[0]));
> -        comparisonVectorInitBatch.add(
> -            ((JExpression) incomingBatchesVar.component(JExpr.lit(b)))
> -               .invoke("getValueAccessorById")
> -                 .arg(cg.getModel()._ref(cmpVectorClass).boxify().dotclass())
> -                 .arg(arr)
> -                   .invoke("getValueVector"));
> +      g.setMappingSet(COPIER_MAPPING_SET);
> +      CopyUtil.generateCopies(g, batch, true);
> +      g.setMappingSet(MAIN_MAPPING);
> +      MergingReceiverGeneratorBase merger = context.getImplementationClass(cg);
> 
> -      }
> -      comparisonVectorInit.add(comparisonVectorInitBatch);
> +      merger.doSetup(context, batch, outgoingContainer);
> +      return merger;
> +    } catch (ClassTransformationException | IOException e) {
> +      throw new SchemaChangeException(e);
>     }
> +  }
> 
> -    cg.getSetupBlock().assign(comparisonVectors, comparisonVectorInit);
> -
> -    int comparisonVectorIndex = 0;
> -    for (ValueVectorReadExpression vvRead : cmpExpressions.get(0)) {
> -      // generate the comparison statements based on the first batch (assumes comparison fields are homogeneous)
> -      TypeProtos.DataMode mode = vvRead.getMajorType().getMode();
> -      TypeProtos.MinorType minor = vvRead.getMajorType().getMinorType();
> -      JType vectorType = cg.getModel()._ref(TypeHelper.getValueVectorClass(minor, mode));
> -      JType valueType = TypeHelper.getHolderType(cg.getModel(), minor, mode);
> -
> -      // set up a holding container expression for left-hand side of function call
> -      JVar leftVar = cg.getEvalBlock().decl(valueType, "leftValue" + comparisonVectorIndex, JExpr._new(valueType));
> -      cg.getEvalBlock().add(((JExpression) JExpr.cast(vectorType,
> -                                ((JExpression) comparisonVectors
> -                                  .component(JExpr.direct("leftNode.batchId")))
> -                                  .component(JExpr.lit(comparisonVectorIndex))))
> -        .invoke("getAccessor")
> -        .invoke("get")
> -        .arg(JExpr.direct("leftNode.valueIndex"))
> -        .arg(leftVar));
> -
> -      ClassGenerator.HoldingContainer left = new ClassGenerator.HoldingContainer(vvRead.getMajorType(),
> -                                                                               leftVar,
> -                                                                               leftVar.ref("value"),
> -                                                                               leftVar.ref("isSet"));
> -
> -      // set up a holding container expression for right-hand side of function call
> -      JVar rightVar = cg.getEvalBlock().decl(valueType, "rightValue" + comparisonVectorIndex, JExpr._new(valueType));
> -      cg.getEvalBlock().add(((JExpression) JExpr.cast(vectorType,
> -                                ((JExpression) comparisonVectors
> -                                    .component(JExpr.direct("rightNode.batchId")))
> -                                    .component(JExpr.lit(comparisonVectorIndex))))
> -        .invoke("getAccessor")
> -        .invoke("get")
> -        .arg(JExpr.direct("rightNode.valueIndex"))
> -        .arg(rightVar));
> -
> -      ClassGenerator.HoldingContainer right = new ClassGenerator.HoldingContainer(vvRead.getMajorType(),
> -                                                                                rightVar,
> -                                                                                rightVar.ref("value"),
> -                                                                                rightVar.ref("isSet"));
> -
> -      // generate the comparison function
> +  public final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
> +  public final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
> +  public final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
> +  GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
> +  public final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
> +
> +  private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException {
> +    g.setMappingSet(MAIN_MAPPING);
> +
> +    for(Ordering od : popConfig.getOrderings()){
> +      // first, we rewrite the evaluation stack for each side of the comparison.
> +      ErrorCollector collector = new ErrorCollectorImpl();
> +      final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
> +      if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
> +      g.setMappingSet(LEFT_MAPPING);
> +      HoldingContainer left = g.addExpr(expr, false);
> +      g.setMappingSet(RIGHT_MAPPING);
> +      HoldingContainer right = g.addExpr(expr, false);
> +      g.setMappingSet(MAIN_MAPPING);
> +
> +      // next we wrap the two comparison sides and add the expression block for the comparison.
>       LogicalExpression fh = FunctionGenerationHelper.getComparator(left, right, context.getFunctionRegistry());
> -      ClassGenerator.HoldingContainer out = cg.addExpr(fh, false);
> -
> -      // generate less than/greater than checks (fixing results for ASCending vs. DESCending)
> -      cg.getEvalBlock()._if(out.getValue().eq(JExpr.lit(1)))
> -                       ._then()
> -                       ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.ASCENDING ? 1 : -1));
> -
> -      cg.getEvalBlock()._if(out.getValue().eq(JExpr.lit(-1)))
> -                       ._then()
> -                       ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.ASCENDING ? -1 : 1));
> -
> -      ++comparisonVectorIndex;
> -    }
> +      HoldingContainer out = g.addExpr(fh, false);
> +      JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
> 
> -    // if all expressions are equal, return 0
> -    cg.getEvalBlock()._return(JExpr.lit(0));
> -
> -    // allocate a new array for outgoing vectors
> -    cg.getSetupBlock().assign(outgoingVectors, JExpr.newArray(cg.getModel().ref(ValueVector.class), fieldsPerBatch));
> -
> -    // generate copy function and setup outgoing batches
> -    cg.setMappingSet(MergingReceiverGeneratorBase.copyMapping);
> -    int fieldIdx = 0;
> -    for (VectorWrapper<?> vvOut : outgoingContainer) {
> -      // declare outgoing value vectors
> -      JVar outgoingVV = cg.declareVectorValueSetupAndMember("outgoingBatch",
> -                                                            new TypedFieldId(vvOut.getField().getType(),
> -                                                                             vvOut.isHyper(), fieldIdx));
> -
> -      // assign to the appropriate slot in the outgoingVector array (in order of iteration)
> -      cg.getSetupBlock().assign(outgoingVectors.component(JExpr.lit(fieldIdx)), outgoingVV);
> -
> -      // get the vector's type info
> -      Class<?> vvType = TypeHelper.getValueVectorClass(vvOut.getField().getType().getMinorType(),
> -                                                       vvOut.getField().getType().getMode());
> -      JClass vvClass = cg.getModel().ref(vvType);
> -
> -      // generate the copyFrom() invocation with explicit cast to the appropriate type; for example:
> -      // ((IntVector) outgoingVectors[i]).copyFrom(inIndex,
> -      //                                           outgoingBatch.getRecordCount(),
> -      //                                           (IntVector) vv1);
> -      cg.getEvalBlock()._if(
> -        ((JExpression) JExpr.cast(vvClass, outgoingVectors.component(JExpr.lit(fieldIdx))))
> -          .invoke("copyFromSafe")
> -          .arg(inIndex)
> -          .arg(outIndex)
> -          .arg(JExpr.cast(vvClass,
> -                          ((JExpression) incomingVectors.component(JExpr.direct("inBatch")))
> -                            .component(JExpr.lit(fieldIdx)))).not())._then()._return(JExpr.FALSE);
> -
> -      ++fieldIdx;
> +      if(od.getDirection() == Direction.ASCENDING){
> +        jc._then()._return(out.getValue());
> +      }else{
> +        jc._then()._return(out.getValue().minus());
> +      }
>     }
> -    cg.rotateBlock();
> -    cg.getEvalBlock()._return(JExpr.TRUE);
> 
> -    // compile generated code and call the generated setup method
> -    MergingReceiverGeneratorBase newMerger;
> -    try {
> -      newMerger = context.getImplementationClass(cg);
> -      newMerger.doSetup(context, batchLoaders, this);
> -    } catch (ClassTransformationException | IOException e) {
> -      throw new SchemaChangeException("Failure while attempting to load generated class", e);
> -    }
> -    return newMerger;
> +    g.getEvalBlock()._return(JExpr.lit(0));
>   }
> 
>   /**
> @@ -695,7 +504,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
>    * @param node Reference to the next record to copy from the incoming batches
>    */
>   private boolean copyRecordToOutgoingBatch(Node node) {
> -    if (!merger.doCopy(node.batchId, node.valueIndex, outgoingPosition)) {
> +    int inIndex = (node.batchId << 16) + node.valueIndex;
> +    if (!merger.doCopy(inIndex, outgoingPosition)) {
>       return false;
>     } else {
>       outgoingPosition++;
>