You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/10/28 19:46:29 UTC

[drill] 02/02: DRILL-6763: Codegen optimization of SQL functions with constant values(#1481)

This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 18e09a1b1c801f2691a05ae7db543bf71874cfea
Author: shuifeng lu <lu...@gmail.com>
AuthorDate: Wed Sep 26 11:22:20 2018 +0800

    DRILL-6763: Codegen optimization of SQL functions with constant values(#1481)
    
    closes #1481
---
 .../org/apache/drill/exec/expr/ClassGenerator.java |  54 +++++-
 .../apache/drill/exec/expr/EvaluationVisitor.java  | 186 +++++++++------------
 .../expr/fn/interpreter/InterpreterEvaluator.java  |   6 +-
 .../apache/drill/exec/ops/BaseFragmentContext.java |   9 +-
 .../drill/exec/physical/impl/TopN/TopNBatch.java   |  11 +-
 .../exec/physical/impl/aggregate/HashAggBatch.java |   1 +
 .../physical/impl/aggregate/HashAggTemplate.java   |  15 +-
 .../physical/impl/aggregate/HashAggregator.java    |   7 +-
 .../physical/impl/common/ChainedHashTable.java     |   2 +-
 .../impl/common/CodeGenMemberInjector.java         |  79 +++++++++
 .../drill/exec/physical/impl/common/HashTable.java |  10 +-
 .../physical/impl/common/HashTableTemplate.java    |  19 ++-
 .../exec/physical/impl/join/HashJoinBatch.java     |   3 +-
 .../partitionsender/PartitionSenderRootExec.java   |  68 ++++----
 .../physical/impl/partitionsender/Partitioner.java |   2 +
 .../impl/partitionsender/PartitionerTemplate.java  |  14 +-
 .../exec/compile/ExampleTemplateWithInner.java     |  12 +-
 .../exec/compile/TestLargeFileCompilation.java     |  39 +++++
 .../exec/physical/impl/TopN/TopNBatchTest.java     |  17 +-
 .../drill/exec/vector/ValueHolderHelper.java       |  36 ++--
 20 files changed, 405 insertions(+), 185 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java
index 134f90e..5b33acf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java
@@ -22,10 +22,13 @@ import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import io.netty.buffer.DrillBuf;
+import org.apache.calcite.util.Pair;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
@@ -39,7 +42,9 @@ import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.compile.sig.SignatureHolder;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.fn.WorkspaceReference;
+import org.apache.drill.exec.expr.holders.ValueHolder;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -65,6 +70,7 @@ public class ClassGenerator<T>{
 
   public static final GeneratorMapping DEFAULT_SCALAR_MAP = GM("doSetup", "doEval", null, null);
   public static final GeneratorMapping DEFAULT_CONSTANT_MAP = GM("doSetup", "doSetup", null, null);
+  public static final String INNER_CLASS_FIELD_NAME = "innerClassField";
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClassGenerator.class);
 
@@ -76,6 +82,7 @@ public class ClassGenerator<T>{
   private final Map<String, ClassGenerator<T>> innerClasses = Maps.newHashMap();
   private final List<TypedFieldId> workspaceTypes = Lists.newArrayList();
   private final Map<WorkspaceReference, JVar> workspaceVectors = Maps.newHashMap();
+  private final Map<Pair<Integer, JVar>, Function<DrillBuf, ? extends ValueHolder>> constantVars;
   private final CodeGenerator<T> codeGenerator;
 
   public final JDefinedClass clazz;
@@ -87,6 +94,8 @@ public class ClassGenerator<T>{
   private LinkedList<SizedJBlock>[] blocks;
   private LinkedList<SizedJBlock>[] oldBlocks;
 
+  private JVar innerClassField;
+
   /**
    * Assumed that field has 3 indexes within the constant pull: index of the CONSTANT_Fieldref_info +
    * CONSTANT_Fieldref_info.name_and_type_index + CONSTANT_NameAndType_info.name_index.
@@ -135,6 +144,7 @@ public class ClassGenerator<T>{
     this.evaluationVisitor = eval;
     this.model = model;
     this.optionManager = optionManager;
+    constantVars = new HashMap<>();
 
     blocks = (LinkedList<SizedJBlock>[]) new LinkedList[sig.size()];
     for (int i =0; i < sig.size(); i++) {
@@ -370,6 +380,7 @@ public class ClassGenerator<T>{
         innerClassGenerator.maxIndex += index;
         // blocks from the inner class should be used
         setupInnerClassBlocks();
+        innerClassField = clazz.field(JMod.NONE, model.ref(innerClassGenerator.clazz.name()), INNER_CLASS_FIELD_NAME);
         return true;
       }
       return innerClassGenerator.createNestedClass();
@@ -425,10 +436,8 @@ public class ClassGenerator<T>{
    * Creates methods from the signature {@code sig} with body from the appropriate {@code blocks}.
    */
   void flushCode() {
-    JVar innerClassField = null;
     if (innerClassGenerator != null) {
       blocks = oldBlocks;
-      innerClassField = clazz.field(JMod.NONE, model.ref(innerClassGenerator.clazz.name()), "innerClassField");
       innerClassGenerator.flushCode();
     }
     int i = 0;
@@ -531,11 +540,48 @@ public class ClassGenerator<T>{
 
   public JVar declareClassField(String prefix, JType t, JExpression init) {
     if (innerClassGenerator != null && hasMaxIndexValue()) {
-      return innerClassGenerator.clazz.field(JMod.NONE, t, prefix + index++, init);
+      return innerClassGenerator.declareClassField(prefix, t, init);
     }
     return clazz.field(JMod.NONE, t, prefix + index++, init);
   }
 
+  public Pair<Integer, JVar> declareClassConstField(String prefix, JType t,
+                                                    Function<DrillBuf, ? extends ValueHolder> function) {
+    return declareClassConstField(prefix, t, null, function);
+  }
+
+  /**
+   * declare a constant field for the class.
+   * argument {@code function} holds the constant value which
+   * returns a value holder must be set to the class field when the class instance created.
+   * the class field innerClassField will be created if innerClassGenerator exists.
+   *
+   * @param prefix the prefix name of class field
+   * @param t the type of class field
+   * @param init init expression
+   * @param function the function holds the constant value
+   * @return the depth of nested class, class field
+   */
+  public Pair<Integer, JVar> declareClassConstField(String prefix, JType t, JExpression init,
+                                                    Function<DrillBuf, ? extends ValueHolder> function) {
+    JVar var;
+    int depth = 1;
+    if (innerClassGenerator != null) {
+      Pair<Integer, JVar> nested = innerClassGenerator.declareClassConstField(prefix, t, init, function);
+      depth = nested.getKey() + 1;
+      var = nested.getValue();
+    } else {
+      var = clazz.field(JMod.NONE, t, prefix + index++, init);
+    }
+    Pair<Integer, JVar> depthVar = Pair.of(depth, var);
+    constantVars.put(depthVar, function);
+    return depthVar;
+  }
+
+  public Map<Pair<Integer, JVar>, Function<DrillBuf, ? extends ValueHolder>> getConstantVars() {
+    return constantVars;
+  }
+
   public HoldingContainer declare(MajorType t) {
     return declare(t, true);
   }
@@ -646,7 +692,7 @@ public class ClassGenerator<T>{
         Class<?> p = params[i];
         childNew.arg(shim.param(model._ref(p), "arg" + i));
       }
-      shim.body()._return(childNew);
+      shim.body()._return(JExpr._this().invoke("injectMembers").arg(childNew));
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 8685130..e13a9ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
+import io.netty.buffer.DrillBuf;
+import org.apache.calcite.util.Pair;
 import org.apache.drill.common.expression.AnyValueExpression;
 import org.apache.drill.common.expression.BooleanOperator;
 import org.apache.drill.common.expression.CastExpression;
@@ -56,19 +58,20 @@ import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
 import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
 import org.apache.drill.common.expression.ValueExpressions.VarDecimalExpression;
 import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.expr.ClassGenerator.BlockType;
 import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.fn.AbstractFuncHolder;
+import org.apache.drill.exec.expr.holders.ValueHolder;
 import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
 import org.apache.drill.exec.vector.ValueHolderHelper;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
+import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import com.sun.codemodel.JBlock;
@@ -76,6 +79,7 @@ import com.sun.codemodel.JClass;
 import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JFieldRef;
 import com.sun.codemodel.JInvocation;
 import com.sun.codemodel.JLabel;
 import com.sun.codemodel.JType;
@@ -265,72 +269,94 @@ public class EvaluationVisitor {
       throw new UnsupportedOperationException("All schema paths should have been replaced with ValueVectorExpressions.");
     }
 
+    private HoldingContainer getHoldingContainer(ClassGenerator<?> generator,
+                                                 MajorType majorType,
+                                                 Function<DrillBuf, ? extends ValueHolder> function) {
+      JType holderType = generator.getHolderType(majorType);
+      Pair<Integer, JVar> depthVar = generator.declareClassConstField("const", holderType, function);
+      JFieldRef outputSet = null;
+      JVar var = depthVar.getValue();
+      if (majorType.getMode() == TypeProtos.DataMode.OPTIONAL) {
+        outputSet = var.ref("isSet");
+      }
+      return new HoldingContainer(majorType, var, var.ref("value"), outputSet);
+    }
+
     @Override
     public HoldingContainer visitLongConstant(LongExpression e, ClassGenerator<?> generator) throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getLong()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getBigIntHolder(e.getLong()));
     }
 
     @Override
     public HoldingContainer visitIntConstant(IntExpression e, ClassGenerator<?> generator) throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getInt()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getIntHolder(e.getInt()));
     }
 
     @Override
     public HoldingContainer visitDateConstant(DateExpression e, ClassGenerator<?> generator) throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getDate()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getDateHolder(e.getDate()));
     }
 
     @Override
     public HoldingContainer visitTimeConstant(TimeExpression e, ClassGenerator<?> generator) throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getTime()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getTimeHolder(e.getTime()));
     }
 
     @Override
     public HoldingContainer visitIntervalYearConstant(IntervalYearExpression e, ClassGenerator<?> generator)
         throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getIntervalYear()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getIntervalYearHolder(e.getIntervalYear()));
     }
 
     @Override
     public HoldingContainer visitTimeStampConstant(TimeStampExpression e, ClassGenerator<?> generator)
         throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getTimeStamp()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getTimeStampHolder(e.getTimeStamp()));
     }
 
     @Override
     public HoldingContainer visitFloatConstant(FloatExpression e, ClassGenerator<?> generator)
         throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getFloat()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getFloat4Holder(e.getFloat()));
     }
 
     @Override
     public HoldingContainer visitDoubleConstant(DoubleExpression e, ClassGenerator<?> generator)
         throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getDouble()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getFloat8Holder(e.getDouble()));
     }
 
     @Override
     public HoldingContainer visitBooleanConstant(BooleanExpression e, ClassGenerator<?> generator)
         throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getBoolean() ? 1 : 0));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getBitHolder(e.getBoolean() ? 1 : 0));
     }
 
     @Override
@@ -589,110 +615,64 @@ public class EvaluationVisitor {
     @Override
     public HoldingContainer visitQuotedStringConstant(QuotedString e, ClassGenerator<?> generator)
         throws RuntimeException {
-      MajorType majorType = e.getMajorType();
-      JBlock setup = generator.getBlock(BlockType.SETUP);
-      JType holderType = generator.getHolderType(majorType);
-      JVar var = generator.declareClassField("string", holderType);
-      JExpression stringLiteral = JExpr.lit(e.value);
-      JExpression buffer = generator.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer");
-      setup.assign(var,
-          generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getVarCharHolder").arg(buffer).arg(stringLiteral));
-      return new HoldingContainer(majorType, var, null, null);
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getVarCharHolder(buffer, e.getString()));
     }
 
     @Override
     public HoldingContainer visitIntervalDayConstant(IntervalDayExpression e, ClassGenerator<?> generator)
         throws RuntimeException {
-      MajorType majorType = Types.required(MinorType.INTERVALDAY);
-      JBlock setup = generator.getBlock(BlockType.SETUP);
-      JType holderType = generator.getHolderType(majorType);
-      JVar var = generator.declareClassField("intervalday", holderType);
-      JExpression dayLiteral = JExpr.lit(e.getIntervalDay());
-      JExpression millisLiteral = JExpr.lit(e.getIntervalMillis());
-      setup.assign(
-          var,
-          generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getIntervalDayHolder").arg(dayLiteral)
-              .arg(millisLiteral));
-      return new HoldingContainer(majorType, var, null, null);
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getIntervalDayHolder(e.getIntervalDay(), e.getIntervalMillis()));
     }
 
     @Override
     public HoldingContainer visitDecimal9Constant(Decimal9Expression e, ClassGenerator<?> generator)
         throws RuntimeException {
-      MajorType majorType = e.getMajorType();
-      JBlock setup = generator.getBlock(BlockType.SETUP);
-      JType holderType = generator.getHolderType(majorType);
-      JVar var = generator.declareClassField("dec9", holderType);
-      JExpression valueLiteral = JExpr.lit(e.getIntFromDecimal());
-      JExpression scaleLiteral = JExpr.lit(e.getScale());
-      JExpression precisionLiteral = JExpr.lit(e.getPrecision());
-      setup.assign(
-          var,
-          generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getDecimal9Holder").arg(valueLiteral)
-              .arg(scaleLiteral).arg(precisionLiteral));
-      return new HoldingContainer(majorType, var, null, null);
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getDecimal9Holder(e.getIntFromDecimal(), e.getScale(), e.getPrecision()));
     }
 
     @Override
     public HoldingContainer visitDecimal18Constant(Decimal18Expression e, ClassGenerator<?> generator)
         throws RuntimeException {
-      MajorType majorType = e.getMajorType();
-      JBlock setup = generator.getBlock(BlockType.SETUP);
-      JType holderType = generator.getHolderType(majorType);
-      JVar var = generator.declareClassField("dec18", holderType);
-      JExpression valueLiteral = JExpr.lit(e.getLongFromDecimal());
-      JExpression scaleLiteral = JExpr.lit(e.getScale());
-      JExpression precisionLiteral = JExpr.lit(e.getPrecision());
-      setup.assign(
-          var,
-          generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getDecimal18Holder").arg(valueLiteral)
-              .arg(scaleLiteral).arg(precisionLiteral));
-      return new HoldingContainer(majorType, var, null, null);
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getDecimal18Holder(e.getLongFromDecimal(), e.getScale(), e.getPrecision()));
     }
 
     @Override
     public HoldingContainer visitDecimal28Constant(Decimal28Expression e, ClassGenerator<?> generator)
         throws RuntimeException {
-      MajorType majorType = e.getMajorType();
-      JBlock setup = generator.getBlock(BlockType.SETUP);
-      JType holderType = generator.getHolderType(majorType);
-      JVar var = generator.declareClassField("dec28", holderType);
-      JExpression stringLiteral = JExpr.lit(e.getBigDecimal().toString());
-      JExpression buffer = generator.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer");
-      setup.assign(var,
-          generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getDecimal28Holder")
-              .arg(buffer).arg(stringLiteral));
-      return new HoldingContainer(majorType, var, null, null);
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getDecimal28Holder(buffer, e.getBigDecimal()));
     }
 
     @Override
     public HoldingContainer visitDecimal38Constant(Decimal38Expression e, ClassGenerator<?> generator)
         throws RuntimeException {
-      MajorType majorType = e.getMajorType();
-      JBlock setup = generator.getBlock(BlockType.SETUP);
-      JType holderType = generator.getHolderType(majorType);
-      JVar var = generator.declareClassField("dec38", holderType);
-      JExpression stringLiteral = JExpr.lit(e.getBigDecimal().toString());
-      JExpression buffer = generator.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer");
-      setup.assign(var,
-          generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getDecimal38Holder")
-              .arg(buffer).arg(stringLiteral));
-      return new HoldingContainer(majorType, var, null, null);
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getDecimal38Holder(buffer, e.getBigDecimal()));
     }
 
     @Override
     public HoldingContainer visitVarDecimalConstant(VarDecimalExpression e, ClassGenerator<?> generator)
         throws RuntimeException {
-      MajorType majorType = e.getMajorType();
-      JBlock setup = generator.getBlock(BlockType.SETUP);
-      JType holderType = generator.getHolderType(majorType);
-      JVar var = generator.declareClassField("varDec", holderType);
-      JExpression stringLiteral = JExpr.lit(e.getBigDecimal().toString());
-      JExpression buffer = generator.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer");
-      setup.assign(var,
-          generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getVarDecimalHolder")
-              .arg(buffer).arg(stringLiteral));
-      return new HoldingContainer(majorType, var, null, null);
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getVarDecimalHolder(buffer, e.getBigDecimal()));
     }
 
     @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
index 72d8614..a0373d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
@@ -227,7 +227,7 @@ public class InterpreterEvaluator {
         @Nullable
         @Override
         public ValueHolder apply(DrillBuf buffer) {
-          return ValueHolderHelper.getDecimal28Holder(buffer, decExpr.getBigDecimal().toString());
+          return ValueHolderHelper.getDecimal28Holder(buffer, decExpr.getBigDecimal());
         }
       });
     }
@@ -238,7 +238,7 @@ public class InterpreterEvaluator {
         @Nullable
         @Override
         public ValueHolder apply(DrillBuf buffer) {
-          return ValueHolderHelper.getDecimal38Holder(buffer, decExpr.getBigDecimal().toString());
+          return ValueHolderHelper.getDecimal38Holder(buffer, decExpr.getBigDecimal());
         }
       });
     }
@@ -246,7 +246,7 @@ public class InterpreterEvaluator {
     @Override
     public ValueHolder visitVarDecimalConstant(final ValueExpressions.VarDecimalExpression decExpr, Integer value) throws RuntimeException {
       return getConstantValueHolder(decExpr.getBigDecimal().toString(), decExpr.getMajorType().getMinorType(),
-          buffer -> ValueHolderHelper.getVarDecimalHolder(Objects.requireNonNull(buffer), decExpr.getBigDecimal().toString()));
+          buffer -> ValueHolderHelper.getVarDecimalHolder(Objects.requireNonNull(buffer), decExpr.getBigDecimal()));
     }
 
     @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
index f81d4c9..8005f04 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.physical.impl.common.CodeGenMemberInjector;
 
 import io.netty.buffer.DrillBuf;
 
@@ -53,7 +54,9 @@ public abstract class BaseFragmentContext implements FragmentContext {
   @Override
   public <T> T getImplementationClass(final CodeGenerator<T> cg)
       throws ClassTransformationException, IOException {
-    return getCompiler().createInstance(cg);
+    T instance = getCompiler().createInstance(cg);
+    CodeGenMemberInjector.injectMembers(cg.getRoot(), instance, this);
+    return instance;
   }
 
   @Override
@@ -63,7 +66,9 @@ public abstract class BaseFragmentContext implements FragmentContext {
 
   @Override
   public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
-    return getCompiler().createInstances(cg, instanceCount);
+    List<T> instances = getCompiler().createInstances(cg, instanceCount);
+    instances.forEach(instance -> CodeGenMemberInjector.injectMembers(cg.getRoot(), instance, this));
+    return instances;
   }
 
   protected abstract BufferManager getBufferManager();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 22dfdf0..aaca8a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -379,8 +379,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
   private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit)
     throws SchemaChangeException, ClassTransformationException, IOException {
     return createNewPriorityQueue(
-      mainMapping, leftMapping, rightMapping, context.getOptions(), context.getFunctionRegistry(), context.getCompiler(),
-      config.getOrderings(), batch, unionTypeEnabled, codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode());
+      mainMapping, leftMapping, rightMapping, config.getOrderings(), batch, unionTypeEnabled,
+      codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode(), context);
   }
 
   public static MappingSet createMainMappingSet() {
@@ -397,10 +397,11 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
 
   public static PriorityQueue createNewPriorityQueue(
     MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping,
-    OptionSet optionSet, FunctionLookupContext functionLookupContext, CodeCompiler codeCompiler,
     List<Ordering> orderings, VectorAccessible batch, boolean unionTypeEnabled, boolean codegenDump,
-    int limit, BufferAllocator allocator, SelectionVectorMode mode)
+    int limit, BufferAllocator allocator, SelectionVectorMode mode, FragmentContext context)
           throws ClassTransformationException, IOException, SchemaChangeException {
+    OptionSet optionSet = context.getOptions();
+    FunctionLookupContext functionLookupContext = context.getFunctionRegistry();
     CodeGenerator<PriorityQueue> cg = CodeGenerator.get(PriorityQueue.TEMPLATE_DEFINITION, optionSet);
     cg.plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
@@ -438,7 +439,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     g.rotateBlock();
     g.getEvalBlock()._return(JExpr.lit(0));
 
-    PriorityQueue q = codeCompiler.createInstance(cg);
+    PriorityQueue q = context.getImplementationClass(cg);
     q.init(limit, allocator, mode == BatchSchema.SelectionVectorMode.TWO_BYTE);
     return q;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 485d363..9f51204 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -433,6 +433,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
     agg.setup(popConfig, htConfig, context, oContext, incoming, this,
         aggrExprs,
         cgInner.getWorkspaceTypes(),
+        cgInner,
         groupByOutFieldIds,
         this.container, extraNonNullColumns * 8 /* sizeof(BigInt) */);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 32db9ea..d10a84a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.compile.sig.RuntimeOverridden;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.TypeHelper;
 
 import org.apache.drill.exec.memory.BaseAllocator;
@@ -50,6 +51,7 @@ import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
+import org.apache.drill.exec.physical.impl.common.CodeGenMemberInjector;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
@@ -137,7 +139,8 @@ public abstract class HashAggTemplate implements HashAggregator {
   private HashAggBatch outgoing;
   private VectorContainer outContainer;
 
-  private FragmentContext context;
+  protected FragmentContext context;
+  protected ClassGenerator<?> cg;
   private OperatorContext oContext;
   private BufferAllocator allocator;
 
@@ -360,7 +363,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   @Override
   public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext,
                     RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds,
-                    TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
+                    ClassGenerator<?> cg, TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
 
     if (valueExprs == null || valueFieldIds == null) {
       throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");
@@ -375,6 +378,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     this.oContext = oContext;
     this.incoming = incoming;
     this.outgoing = outgoing;
+    this.cg = cg;
     this.outContainer = outContainer;
     this.useMemoryPrediction = context.getOptions().getOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR);
     this.phase = hashAggrConfig.getAggPhase();
@@ -1097,7 +1101,12 @@ public abstract class HashAggTemplate implements HashAggregator {
 
   // These methods are overridden in the generated class when created as plain Java code.
   protected BatchHolder newBatchHolder(int batchRowCount) {
-    return new BatchHolder(batchRowCount);
+    return this.injectMembers(new BatchHolder(batchRowCount));
+  }
+
+  protected BatchHolder injectMembers(BatchHolder batchHolder) {
+    CodeGenMemberInjector.injectMembers(cg, batchHolder, context);
+    return batchHolder;
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 4c54650..5ee77ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.HashAggregate;
@@ -46,8 +47,10 @@ public interface HashAggregator {
   // OK - batch returned, NONE - end of data, RESTART - call again, EMIT - like OK but EMIT
   enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART, AGG_EMIT }
 
-  void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
-             LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
+  void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
+             OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
+             LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, ClassGenerator<?> cg,
+             TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
 
   IterOutcome getOutcome();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index a14bf8c..dcdac95 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -228,7 +228,7 @@ public class ChainedHashTable {
     setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, incomingProbe, keyExprsProbe, true);
 
     HashTable ht = context.getImplementationClass(top);
-    ht.setup(htConfig, allocator, incomingBuild.getContainer(), incomingProbe, outgoing, htContainerOrig);
+    ht.setup(htConfig, allocator, incomingBuild.getContainer(), incomingProbe, outgoing, htContainerOrig, context, cgInner);
 
     return ht;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/CodeGenMemberInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/CodeGenMemberInjector.java
new file mode 100644
index 0000000..195f002
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/CodeGenMemberInjector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import com.sun.codemodel.JVar;
+import io.netty.buffer.DrillBuf;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Function;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CodeGenMemberInjector {
+
+  /**
+   * Generated code for a class may have several class members, they
+   * are initialized by invoking this method when the instance created.
+   *
+   * @param cg       the class generator
+   * @param instance the class instance created by the compiler
+   * @param context  the fragment context
+   */
+  public static void injectMembers(ClassGenerator<?> cg, Object instance, FragmentContext context) {
+    Map<Integer, Object> cachedInstances = new HashMap<>();
+    for (Map.Entry<Pair<Integer, JVar>, Function<DrillBuf, ? extends ValueHolder>> setter : cg.getConstantVars().entrySet()) {
+      try {
+        JVar var = setter.getKey().getValue();
+        Integer depth = setter.getKey().getKey();
+        Object varInstance = getFieldInstance(instance, depth, cachedInstances);
+        Field field = varInstance.getClass().getDeclaredField(var.name());
+        field.setAccessible(true);
+        field.set(varInstance, setter.getValue().apply(context.getManagedBuffer()));
+      } catch (ReflectiveOperationException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private static Object getFieldInstance(Object instance, Integer depth, Map<Integer, Object> cache) throws ReflectiveOperationException {
+    if (depth <= 1) {
+      return instance;
+    }
+    Object methodInstance = cache.get(depth);
+    if (methodInstance != null) {
+      return methodInstance;
+    }
+    methodInstance = getFieldInstance(instance, depth);
+    cache.put(depth, methodInstance);
+    return methodInstance;
+  }
+
+  private static Object getFieldInstance(Object instance, Integer depth) throws ReflectiveOperationException {
+    if (depth <= 1) {
+      return instance;
+    }
+    Field field = instance.getClass().getDeclaredField(ClassGenerator.INNER_CLASS_FIELD_NAME);
+    field.setAccessible(true);
+    return getFieldInstance(field.get(instance), depth - 1);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 5732458..1d9e267 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -20,7 +20,9 @@ package org.apache.drill.exec.physical.impl.common;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.common.exceptions.RetryAfterSpillException;
@@ -48,17 +50,19 @@ public interface HashTable {
   int BATCH_MASK = 0x0000FFFF;
 
   /**
-   * {@link HashTable#setup(HashTableConfig, BufferAllocator, VectorContainer, RecordBatch, RecordBatch, VectorContainer)} must be called before anything can be done to the
-   * {@link HashTable}.
+   * {@link HashTable#setup} must be called before anything can be done to the {@link HashTable}.
+   *
    * @param htConfig
    * @param allocator
    * @param incomingBuild
    * @param incomingProbe
    * @param outgoing
    * @param htContainerOrig
+   * @param context
+   * @param cg
    */
   void setup(HashTableConfig htConfig, BufferAllocator allocator, VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing,
-             VectorContainer htContainerOrig);
+             VectorContainer htContainerOrig, FragmentContext context, ClassGenerator<?> cg);
 
   /**
    * Updates the incoming (build and probe side) value vectors references in the {@link HashTableTemplate.BatchHolder}s.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index ae9a621..25ada28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -23,6 +23,8 @@ import java.util.Set;
 
 import javax.inject.Named;
 
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -110,6 +112,10 @@ public abstract class HashTableTemplate implements HashTable {
 
   private MaterializedField dummyIntField;
 
+  protected FragmentContext context;
+
+  protected ClassGenerator<?> cg;
+
   private int numResizing = 0;
 
   private int resizingTime = 0;
@@ -448,7 +454,9 @@ public abstract class HashTableTemplate implements HashTable {
   }
 
   @Override
-  public void setup(HashTableConfig htConfig, BufferAllocator allocator, VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
+  public void setup(HashTableConfig htConfig, BufferAllocator allocator, VectorContainer incomingBuild,
+                    RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig,
+                    FragmentContext context, ClassGenerator<?> cg) {
     float loadf = htConfig.getLoadFactor();
     int initialCap = htConfig.getInitialCapacity();
 
@@ -472,6 +480,8 @@ public abstract class HashTableTemplate implements HashTable {
     this.incomingProbe = incomingProbe;
     this.outgoing = outgoing;
     this.htContainerOrig = htContainerOrig;
+    this.context = context;
+    this.cg = cg;
     this.allocationTracker = new HashTableAllocationTracker(htConfig);
 
     // round up the initial capacity to nearest highest power of 2
@@ -764,7 +774,12 @@ public abstract class HashTableTemplate implements HashTable {
   }
 
   protected BatchHolder newBatchHolder(int index, int newBatchHolderSize) { // special method to allow debugging of gen code
-    return new BatchHolder(index, newBatchHolderSize);
+    return this.injectMembers(new BatchHolder(index, newBatchHolderSize));
+  }
+
+  protected BatchHolder injectMembers(BatchHolder batchHolder) {
+    CodeGenMemberInjector.injectMembers(cg, batchHolder, context);
+    return batchHolder;
   }
 
   // Resize the hash table if needed by creating a new one with double the number of buckets.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index a969ffd..2f17ff2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -1415,8 +1415,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
 
     //  No real code generation !!
 
-    final HashJoinProbe hj = context.getImplementationClass(cg);
-    return hj;
+    return context.getImplementationClass(cg);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 3918d27..c185ac7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -229,40 +229,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
 
   @VisibleForTesting
   protected void createPartitioner() throws SchemaChangeException {
-    final int divisor = Math.max(1, outGoingBatchCount/actualPartitions);
-    final int longTail = outGoingBatchCount % actualPartitions;
-
-    final List<Partitioner> subPartitioners = createClassInstances(actualPartitions);
-    int startIndex = 0;
-    int endIndex = 0;
-
-    boolean success = false;
-    try {
-      for (int i = 0; i < actualPartitions; i++) {
-        startIndex = endIndex;
-        endIndex = (i < actualPartitions - 1) ? startIndex + divisor : outGoingBatchCount;
-        if (i < longTail) {
-          endIndex++;
-        }
-        final OperatorStats partitionStats = new OperatorStats(stats, true);
-        subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
-            startIndex, endIndex);
-      }
-
-      partitioner = new PartitionerDecorator(subPartitioners, stats, context);
-      for (int index = 0; index < terminations.size(); index++) {
-        partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
-      }
-      terminations.clear();
-
-      success = true;
-    } finally {
-      if (!success) {
-        for (Partitioner p : subPartitioners) {
-          p.clear();
-        }
-      }
-    }
+    createClassInstances(actualPartitions);
   }
 
   private List<Partitioner> createClassInstances(int actualPartitions) throws SchemaChangeException {
@@ -297,6 +264,39 @@ public class PartitionSenderRootExec extends BaseRootExec {
     try {
       // compile and setup generated code
       List<Partitioner> subPartitioners = context.getImplementationClass(cg, actualPartitions);
+
+      final int divisor = Math.max(1, outGoingBatchCount/actualPartitions);
+      final int longTail = outGoingBatchCount % actualPartitions;
+      int startIndex = 0;
+      int endIndex = 0;
+
+      boolean success = false;
+      try {
+        for (int i = 0; i < actualPartitions; i++) {
+          startIndex = endIndex;
+          endIndex = (i < actualPartitions - 1) ? startIndex + divisor : outGoingBatchCount;
+          if (i < longTail) {
+            endIndex++;
+          }
+          final OperatorStats partitionStats = new OperatorStats(stats, true);
+          subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
+            cgInner, startIndex, endIndex);
+        }
+
+        partitioner = new PartitionerDecorator(subPartitioners, stats, context);
+        for (int index = 0; index < terminations.size(); index++) {
+          partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
+        }
+        terminations.clear();
+
+        success = true;
+      } finally {
+        if (!success) {
+          for (Partitioner p : subPartitioners) {
+            p.clear();
+          }
+        }
+      }
       return subPartitioners;
 
     } catch (ClassTransformationException | IOException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index a2fc069..76c60e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.ops.ExchangeFragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
@@ -39,6 +40,7 @@ public interface Partitioner {
              HashPartitionSender popConfig,
              OperatorStats stats,
              OperatorContext oContext,
+             ClassGenerator<?> cg,
              int start, int count) throws SchemaChangeException;
 
   void partitionBatch(RecordBatch incoming) throws IOException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index ea31a79..687ff81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -26,6 +26,7 @@ import javax.inject.Named;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
@@ -35,6 +36,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.impl.common.CodeGenMemberInjector;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.BatchSchema;
@@ -62,6 +64,8 @@ public abstract class PartitionerTemplate implements Partitioner {
   private SelectionVector4 sv4;
   private RecordBatch incoming;
   private OperatorStats stats;
+  protected ClassGenerator<?> cg;
+  protected FragmentContext context;
   private int start;
   private int end;
   private List<OutgoingRecordBatch> outgoingBatches = Lists.newArrayList();
@@ -87,10 +91,13 @@ public abstract class PartitionerTemplate implements Partitioner {
                           HashPartitionSender popConfig,
                           OperatorStats stats,
                           OperatorContext oContext,
+                          ClassGenerator<?> cg,
                           int start, int end) throws SchemaChangeException {
 
     this.incoming = incoming;
     this.stats = stats;
+    this.context = context;
+    this.cg = cg;
     this.start = start;
     this.end = end;
     doSetup(context, incoming, null);
@@ -144,7 +151,12 @@ public abstract class PartitionerTemplate implements Partitioner {
   protected OutgoingRecordBatch newOutgoingRecordBatch(
                                OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel,
                                FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
-    return new OutgoingRecordBatch(stats, operator, tunnel, context, allocator, oppositeMinorFragmentId);
+    return this.injectMembers(new OutgoingRecordBatch(stats, operator, tunnel, context, allocator, oppositeMinorFragmentId));
+  }
+
+  protected OutgoingRecordBatch injectMembers(OutgoingRecordBatch outgoingRecordBatch) {
+    CodeGenMemberInjector.injectMembers(cg, outgoingRecordBatch, context);
+    return outgoingRecordBatch;
   }
 
   @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/ExampleTemplateWithInner.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/ExampleTemplateWithInner.java
index 3153cd0..f48315d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/ExampleTemplateWithInner.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/ExampleTemplateWithInner.java
@@ -72,7 +72,11 @@ public abstract class ExampleTemplateWithInner implements ExampleInner{
     }
 
     protected DoubleInner newDoubleInner() {
-      return new DoubleInner();
+      return this.injectMembers(new DoubleInner());
+    }
+
+    protected DoubleInner injectMembers(DoubleInner doubleInner) {
+      return doubleInner;
     }
 
     public class DoubleInner {
@@ -101,6 +105,10 @@ public abstract class ExampleTemplateWithInner implements ExampleInner{
    * subclass (or replacement) of the template inner class
    */
   protected TheInnerClass newTheInnerClass( ) {
-    return new TheInnerClass();
+    return this.injectMembers(new TheInnerClass());
+  }
+
+  protected TheInnerClass injectMembers(TheInnerClass theInnerClass) {
+    return theInnerClass;
   }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
index 084107d..ecff4e1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.compile;
 
+import java.util.concurrent.ThreadLocalRandom;
+
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.test.TestTools;
@@ -39,6 +41,8 @@ public class TestLargeFileCompilation extends BaseTestQuery {
 
   private static final String LARGE_QUERY_FILTER;
 
+  private static final String HUGE_STRING_CONST_QUERY;
+
   private static final String LARGE_QUERY_WRITER;
 
   private static final String LARGE_QUERY_SELECT_LIST;
@@ -110,6 +114,21 @@ public class TestLargeFileCompilation extends BaseTestQuery {
   }
 
   static {
+    final char[] alphabet = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+    int len = 1 << 18;
+    char[] longText = new char[len];
+    for (int j = 0; j < len; ++j) {
+      longText[j] = alphabet[ThreadLocalRandom.current().nextInt(0, alphabet.length)];
+    }
+    StringBuilder sb = new StringBuilder("select *\n")
+      .append("from cp.`employee.json`\n")
+      .append("where last_name ='")
+      .append(longText)
+      .append("'");
+    HUGE_STRING_CONST_QUERY = sb.toString();
+  }
+
+  static {
     LARGE_QUERY_WRITER = createTableWithColsCount(NUM_PROJECT_COLUMNS);
     LARGE_TABLE_WRITER = createTableWithColsCount(NUM_JOIN_TABLE_COLUMNS);
     QUERY_WITH_JOIN = "select * from %1$s t1, %1$s t2 where t1.col1 = t2.col1";
@@ -228,4 +247,24 @@ public class TestLargeFileCompilation extends BaseTestQuery {
       testNoResult("drop table if exists %s", tableName);
     }
   }
+
+  @Test
+  public void testJDKHugeStringConstantCompilation() throws Exception {
+    try {
+      setSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JDK");
+      testNoResult(ITERATION_COUNT, HUGE_STRING_CONST_QUERY);
+    } finally {
+      resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
+    }
+  }
+
+  @Test
+  public void testJaninoHugeStringConstantCompilation() throws Exception {
+    try {
+      setSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JANINO");
+      testNoResult(ITERATION_COUNT, HUGE_STRING_CONST_QUERY);
+    } finally {
+      resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
index 7537cfb..e3731a8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
@@ -21,6 +21,9 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
+import org.apache.drill.exec.ops.FragmentContextImpl;
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.ClusterFixtureBuilder;
@@ -31,7 +34,6 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.Order;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.compile.CodeCompiler;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.RootAllocator;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
@@ -50,6 +52,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.mockito.Mockito.when;
+
 @Category(OperatorTest.class)
 public class TopNBatchTest extends PopUnitTestBase {
   @Rule
@@ -63,6 +67,8 @@ public class TopNBatchTest extends PopUnitTestBase {
   public void priorityQueueOrderingTest() throws Exception {
     Properties properties = new Properties();
     DrillConfig drillConfig = DrillConfig.create(properties);
+    DrillbitContext drillbitContext = mockDrillbitContext();
+    when(drillbitContext.getFunctionImplementationRegistry()).thenReturn(new FunctionImplementationRegistry(drillConfig));
 
     FieldReference expr = FieldReference.getWithQuotedRef("colA");
     Order.Ordering ordering = new Order.Ordering(Order.Ordering.ORDER_DESC, expr, Order.Ordering.NULLS_FIRST);
@@ -73,6 +79,9 @@ public class TopNBatchTest extends PopUnitTestBase {
 
     List<MaterializedField> cols = Lists.newArrayList(colA, colB);
     BatchSchema batchSchema = new BatchSchema(BatchSchema.SelectionVectorMode.NONE, cols);
+    FragmentContextImpl context = new FragmentContextImpl(drillbitContext,
+      BitControl.PlanFragment.getDefaultInstance(), null,
+      drillbitContext.getFunctionImplementationRegistry());
     RowSet expectedRowSet;
 
     try (RootAllocator allocator = new RootAllocator(100_000_000)) {
@@ -100,12 +109,10 @@ public class TopNBatchTest extends PopUnitTestBase {
 
         queue = TopNBatch.createNewPriorityQueue(
           TopNBatch.createMainMappingSet(), TopNBatch.createLeftMappingSet(),
-          TopNBatch.createRightMappingSet(), optionManager,
-          new FunctionImplementationRegistry(drillConfig),
-          new CodeCompiler(drillConfig, optionManager),
+          TopNBatch.createRightMappingSet(),
           orderings, hyperContainer, false,
           true, 10, allocator,
-          batchSchema.getSelectionVectorMode());
+          batchSchema.getSelectionVectorMode(), context);
       }
 
       List<RecordBatchData> testBatches = Lists.newArrayList();
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
index 52afbe0..7087687 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
@@ -169,10 +169,13 @@ public class ValueHolderHelper {
   }
 
   public static Decimal28SparseHolder getDecimal28Holder(DrillBuf buf, String decimal) {
+    BigDecimal bigDecimal = new BigDecimal(decimal);
 
-    Decimal28SparseHolder dch = new Decimal28SparseHolder();
+    return getDecimal28Holder(buf, bigDecimal);
+  }
 
-    BigDecimal bigDecimal = new BigDecimal(decimal);
+  public static Decimal28SparseHolder getDecimal28Holder(DrillBuf buf, BigDecimal bigDecimal) {
+    Decimal28SparseHolder dch = new Decimal28SparseHolder();
 
     dch.scale = bigDecimal.scale();
     dch.precision = bigDecimal.precision();
@@ -180,33 +183,40 @@ public class ValueHolderHelper {
     dch.start = 0;
     dch.buffer = buf.reallocIfNeeded(5 * DecimalUtility.INTEGER_SIZE);
     DecimalUtility
-        .getSparseFromBigDecimal(bigDecimal, dch.buffer, dch.start, dch.scale, Decimal28SparseHolder.nDecimalDigits);
+      .getSparseFromBigDecimal(bigDecimal, dch.buffer, dch.start, dch.scale, Decimal28SparseHolder.nDecimalDigits);
 
     return dch;
   }
 
   public static Decimal38SparseHolder getDecimal38Holder(DrillBuf buf, String decimal) {
+      BigDecimal bigDecimal = new BigDecimal(decimal);
 
-      Decimal38SparseHolder dch = new Decimal38SparseHolder();
+      return getDecimal38Holder(buf, bigDecimal);
+  }
 
-      BigDecimal bigDecimal = new BigDecimal(decimal);
+  public static Decimal38SparseHolder getDecimal38Holder(DrillBuf buf, BigDecimal bigDecimal) {
+    Decimal38SparseHolder dch = new Decimal38SparseHolder();
 
-      dch.scale = bigDecimal.scale();
-      dch.precision = bigDecimal.precision();
-      Decimal38SparseHolder.setSign(bigDecimal.signum() == -1, dch.start, dch.buffer);
-      dch.start = 0;
+    dch.scale = bigDecimal.scale();
+    dch.precision = bigDecimal.precision();
+    Decimal38SparseHolder.setSign(bigDecimal.signum() == -1, dch.start, dch.buffer);
+    dch.start = 0;
     dch.buffer = buf.reallocIfNeeded(Decimal38SparseHolder.maxPrecision * DecimalUtility.INTEGER_SIZE);
     DecimalUtility
-        .getSparseFromBigDecimal(bigDecimal, dch.buffer, dch.start, dch.scale, Decimal38SparseHolder.nDecimalDigits);
+      .getSparseFromBigDecimal(bigDecimal, dch.buffer, dch.start, dch.scale, Decimal38SparseHolder.nDecimalDigits);
 
-      return dch;
+    return dch;
   }
 
   public static VarDecimalHolder getVarDecimalHolder(DrillBuf buf, String decimal) {
-    VarDecimalHolder dch = new VarDecimalHolder();
-
     BigDecimal bigDecimal = new BigDecimal(decimal);
 
+    return getVarDecimalHolder(buf, bigDecimal);
+  }
+
+  public static VarDecimalHolder getVarDecimalHolder(DrillBuf buf, BigDecimal bigDecimal) {
+    VarDecimalHolder dch = new VarDecimalHolder();
+
     byte[] bytes = bigDecimal.unscaledValue().toByteArray();
     int length = bytes.length;