You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by me...@apache.org on 2015/04/27 22:48:00 UTC

[2/4] drill git commit: DRILL-2823: Use implicit casts for comparisons of expression in the join condition

DRILL-2823: Use implicit casts for comparisons of expression in the join condition


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/70b1a1ff
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/70b1a1ff
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/70b1a1ff

Branch: refs/heads/master
Commit: 70b1a1ff0888f41f5cc83009d89a4e8d2e0a7695
Parents: e33ffa2
Author: Mehant Baid <me...@gmail.com>
Authored: Sun Apr 19 14:21:19 2015 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Mon Apr 27 13:08:17 2015 -0700

----------------------------------------------------------------------
 .../physical/impl/common/ChainedHashTable.java  | 104 ++++---------------
 .../exec/physical/impl/join/JoinUtils.java      |  93 +++++++++++++++++
 .../exec/physical/impl/join/MergeJoinBatch.java |  87 +++++++++-------
 .../impl/join/TestHashJoinAdvanced.java         |  27 +++++
 .../impl/join/TestMergeJoinAdvanced.java        |  65 ++++++++++++
 .../jsoninput/implicit_cast_join_1.json         |   1 +
 6 files changed, 253 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/70b1a1ff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 9df67d8..42bb3ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.join.JoinUtils;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
@@ -163,12 +164,6 @@ public class ChainedHashTable {
         continue;
       }
       keyExprsBuild[i] = expr;
-
-      final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
-      // create a type-specific ValueVector for this key
-      ValueVector vv = TypeHelper.getNewVector(outputField, allocator);
-      htKeyFieldIds[i] = htContainerOrig.add(vv);
-
       i++;
     }
 
@@ -185,8 +180,27 @@ public class ChainedHashTable {
         keyExprsProbe[i] = expr;
         i++;
       }
+      JoinUtils.addLeastRestrictiveCasts(keyExprsProbe, incomingProbe, keyExprsBuild, incomingBuild, context);
     }
 
+    i = 0;
+    /*
+     * Once the implicit casts have been added, create the value vectors for the corresponding
+     * type and add it to the hash table's container.
+     * Note: Adding implicit casts may have a minor impact on the memory foot print. For example
+     * if we have a join condition with bigint on the probe side and int on the build side then
+     * after this change we will be allocating a bigint vector in the hashtable instead of an int
+     * vector.
+     */
+    for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
+      LogicalExpression expr = keyExprsBuild[i];
+      final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
+      ValueVector vv = TypeHelper.getNewVector(outputField, allocator);
+      htKeyFieldIds[i] = htContainerOrig.add(vv);
+      i++;
+    }
+
+
     // generate code for isKeyMatch(), setValue(), getHash() and outputRecordKeys()
     setupIsKeyMatchInternal(cgInner, KeyMatchIncomingBuildMapping, KeyMatchHtableMapping, keyExprsBuild, htKeyFieldIds);
     setupIsKeyMatchInternal(cgInner, KeyMatchIncomingProbeMapping, KeyMatchHtableProbeMapping, keyExprsProbe,
@@ -201,15 +215,6 @@ public class ChainedHashTable {
     }
     setupOutputRecordKeys(cgInner, htKeyFieldIds, outKeyFieldIds);
 
-    /* Before generating the code for hashing the build and probe expressions
-     * examine the expressions to make sure they are of the same type, add casts if necessary.
-     * If they are not of the same type, hashing the same value of different types will yield different hash values.
-     * NOTE: We add the cast only for the hash function, comparator function can handle the case
-     * when expressions are different (for eg we have comparator functions that compare bigint and float8)
-     * However for the hash to work correctly we would need to apply the cast.
-     */
-    addLeastRestrictiveCasts(keyExprsBuild, keyExprsProbe);
-
     setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingBuildMapping, incomingBuild, keyExprsBuild, false);
     setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, incomingProbe, keyExprsProbe, true);
 
@@ -293,60 +298,6 @@ public class ChainedHashTable {
     }
   }
 
-  private void addLeastRestrictiveCasts(LogicalExpression[] keyExprsBuild, LogicalExpression[] keyExprsProbe) {
-
-    // If we don't have probe expressions then nothing to do get out
-    if (keyExprsProbe == null) {
-      return;
-    }
-
-    assert keyExprsBuild.length == keyExprsProbe.length;
-
-    for (int i = 0; i < keyExprsBuild.length; i++) {
-      LogicalExpression buildExpr = keyExprsBuild[i];
-      LogicalExpression probeExpr = keyExprsProbe[i];
-      MinorType buildType = buildExpr.getMajorType().getMinorType();
-      MinorType probeType = probeExpr.getMajorType().getMinorType();
-
-      if (buildType != probeType) {
-
-        // currently we only support implicit casts if the input types are numeric or varchar/varbinary
-        if (!allowImplicitCast(buildType, probeType)) {
-          throw new DrillRuntimeException(String.format("Hash join only supports implicit casts between " +
-              "1. Numeric data\n 2. Varchar, Varbinary data " +
-              "Build type: %s, Probe type: %s. Add explicit casts to avoid this error", buildType, probeType));
-        }
-
-        // We need to add a cast to one of the expressions
-        List<MinorType> types = new LinkedList<>();
-        types.add(buildType);
-        types.add(probeType);
-        MinorType result = TypeCastRules.getLeastRestrictiveType(types);
-        ErrorCollector errorCollector = new ErrorCollectorImpl();
-
-        if (result == null) {
-          throw new DrillRuntimeException(String.format("Join conditions cannot be compared failing build " +
-                  "expression:" + " %s failing probe expression: %s", buildExpr.getMajorType().toString(),
-              probeExpr.getMajorType().toString()));
-        } else if (result != buildType) {
-          // Add a cast expression on top of the build expression
-          LogicalExpression castExpr = ExpressionTreeMaterializer.addCastExpression(buildExpr, probeExpr.getMajorType(), context.getFunctionRegistry(), errorCollector);
-          // Store the newly casted expression
-          keyExprsBuild[i] =
-              ExpressionTreeMaterializer.materialize(castExpr, incomingBuild, errorCollector,
-                  context.getFunctionRegistry());
-        } else if (result != probeType) {
-          // Add a cast expression on top of the probe expression
-          LogicalExpression castExpr = ExpressionTreeMaterializer.addCastExpression(probeExpr, buildExpr.getMajorType(), context.getFunctionRegistry(), errorCollector);
-          // store the newly casted expression
-          keyExprsProbe[i] =
-              ExpressionTreeMaterializer.materialize(castExpr, incomingProbe, errorCollector,
-                  context.getFunctionRegistry());
-        }
-      }
-    }
-  }
-
   private void setupGetHash(ClassGenerator<HashTable> cg, MappingSet incomingMapping, VectorAccessible batch, LogicalExpression[] keyExprs,
                             boolean isProbe) throws SchemaChangeException {
 
@@ -370,19 +321,4 @@ public class ChainedHashTable {
 
 
   }
-
-  private boolean allowImplicitCast(MinorType input1, MinorType input2) {
-    // allow implicit cast if both the input types are numeric
-    if (TypeCastRules.isNumericType(input1) && TypeCastRules.isNumericType(input2)) {
-      return true;
-    }
-
-    // allow implicit cast if both the input types are varbinary/ varchar
-    if ((input1 == MinorType.VARCHAR || input1 == MinorType.VARBINARY) &&
-        (input2 == MinorType.VARCHAR || input2 == MinorType.VARBINARY)) {
-      return true;
-    }
-
-    return false;
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/70b1a1ff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
index 41bf786..45b1093 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
@@ -25,6 +25,18 @@ import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.plan.RelOptUtil;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.resolver.TypeCastRules;
+
+import java.util.LinkedList;
 import java.util.List;
 
 public class JoinUtils {
@@ -101,4 +113,85 @@ public class JoinUtils {
 
     return false;
   }
+
+  /**
+   * Checks if implicit cast is allowed between the two input types of the join condition. Currently we allow
+   * implicit casts in join condition only between numeric types and varchar/varbinary types.
+   * @param input1
+   * @param input2
+   * @return true if implicit cast is allowed false otherwise
+   */
+  private static boolean allowImplicitCast(TypeProtos.MinorType input1, TypeProtos.MinorType input2) {
+    // allow implicit cast if both the input types are numeric
+    if (TypeCastRules.isNumericType(input1) && TypeCastRules.isNumericType(input2)) {
+      return true;
+    }
+
+    // allow implicit cast if both the input types are varbinary/ varchar
+    if ((input1 == TypeProtos.MinorType.VARCHAR || input1 == TypeProtos.MinorType.VARBINARY) &&
+        (input2 == TypeProtos.MinorType.VARCHAR || input2 == TypeProtos.MinorType.VARBINARY)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Utility method used by joins to add implicit casts on one of the sides of the join condition in case the two
+   * expressions have different types.
+   * @param leftExpressions array of expressions from left input into the join
+   * @param leftBatch left input record batch
+   * @param rightExpressions array of expressions from right input into the join
+   * @param rightBatch right input record batch
+   * @param context fragment context
+   */
+  public static void addLeastRestrictiveCasts(LogicalExpression[] leftExpressions, RecordBatch leftBatch,
+                                              LogicalExpression[] rightExpressions, RecordBatch rightBatch,
+                                              FragmentContext context) {
+    assert rightExpressions.length == leftExpressions.length;
+
+    for (int i = 0; i < rightExpressions.length; i++) {
+      LogicalExpression rightExpression = rightExpressions[i];
+      LogicalExpression leftExpression = leftExpressions[i];
+      TypeProtos.MinorType rightType = rightExpression.getMajorType().getMinorType();
+      TypeProtos.MinorType leftType = leftExpression.getMajorType().getMinorType();
+
+      if (rightType != leftType) {
+
+        // currently we only support implicit casts if the input types are numeric or varchar/varbinary
+        if (!allowImplicitCast(rightType, leftType)) {
+          throw new DrillRuntimeException(String.format("Join only supports implicit casts between " +
+              "1. Numeric data\n 2. Varchar, Varbinary data " +
+              "Left type: %s, Right type: %s. Add explicit casts to avoid this error", leftType, rightType));
+        }
+
+        // We need to add a cast to one of the expressions
+        List<TypeProtos.MinorType> types = new LinkedList<>();
+        types.add(rightType);
+        types.add(leftType);
+        TypeProtos.MinorType result = TypeCastRules.getLeastRestrictiveType(types);
+        ErrorCollector errorCollector = new ErrorCollectorImpl();
+
+        if (result == null) {
+          throw new DrillRuntimeException(String.format("Join conditions cannot be compared failing left " +
+                  "expression:" + " %s failing right expression: %s", leftExpression.getMajorType().toString(),
+              rightExpression.getMajorType().toString()));
+        } else if (result != rightType) {
+          // Add a cast expression on top of the right expression
+          LogicalExpression castExpr = ExpressionTreeMaterializer.addCastExpression(rightExpression, leftExpression.getMajorType(), context.getFunctionRegistry(), errorCollector);
+          // Store the newly casted expression
+          rightExpressions[i] =
+              ExpressionTreeMaterializer.materialize(castExpr, rightBatch, errorCollector,
+                  context.getFunctionRegistry());
+        } else if (result != leftType) {
+          // Add a cast expression on top of the left expression
+          LogicalExpression castExpr = ExpressionTreeMaterializer.addCastExpression(leftExpression, rightExpression.getMajorType(), context.getFunctionRegistry(), errorCollector);
+          // store the newly casted expression
+          leftExpressions[i] =
+              ExpressionTreeMaterializer.materialize(castExpr, leftBatch, errorCollector,
+                  context.getFunctionRegistry());
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/70b1a1ff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index b5cb12e..1a7e60e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -114,6 +114,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
   public MergeJoinBatchBuilder batchBuilder;
   private boolean areNullsEqual = false; // whether nulls compare equal
 
+  private static final String LEFT_INPUT = "LEFT INPUT";
+  private static final String RIGHT_INPUT = "RIGHT INPUT";
+
   protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
     super(popConfig, context, true);
 
@@ -262,20 +265,16 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
   }
 
   private void generateDoCompareNextLeft(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch,
-      JVar incomingLeftRecordBatch, JVar joinStatus, ErrorCollector collector) throws ClassTransformationException {
+      LogicalExpression[] leftExpression, JVar incomingLeftRecordBatch, JVar joinStatus,
+      ErrorCollector collector) throws ClassTransformationException {
     boolean nextLeftIndexDeclared = false;
 
     cg.setMappingSet(compareLeftMapping);
 
-    for (JoinCondition condition : conditions) {
-      final LogicalExpression leftFieldExpr = condition.getLeft();
+    for (int i = 0; i < leftExpression.length; i++) {
 
       // materialize value vector readers from join expression
-      final LogicalExpression materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, context.getFunctionRegistry());
-      if (collector.hasErrors()) {
-        throw new ClassTransformationException(String.format(
-            "Failure while trying to materialize incoming left field.  Errors:\n %s.", collector.toErrorString()));
-      }
+      final LogicalExpression materializedLeftExpr = leftExpression[i];
 
       // generate compareNextLeftKey()
       ////////////////////////////////
@@ -359,13 +358,30 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     // declare 'incoming' member so VVReadExpr generated code can point to the left or right batch
     JVar incomingRecordBatch = cg.clazz.field(JMod.NONE, recordBatchClass, "incoming");
 
+    /*
+     * Materialize expressions on both sides of the join condition. Check if both the sides
+     * have the same return type, if not then inject casts so that comparison function will work as
+     * expected
+     */
+    LogicalExpression leftExpr[] = new LogicalExpression[conditions.size()];
+    LogicalExpression rightExpr[] = new LogicalExpression[conditions.size()];
+    IterOutcome lastLeftStatus = status.getLastLeft();
+    IterOutcome lastRightStatus = status.getLastRight();
+    for (int i = 0; i < conditions.size(); i++) {
+      JoinCondition condition = conditions.get(i);
+      leftExpr[i] =  materializeExpression(condition.getLeft(), lastLeftStatus, left, collector);
+      rightExpr[i] = materializeExpression(condition.getRight(), lastRightStatus, right, collector);
+    }
+    JoinUtils.addLeastRestrictiveCasts(leftExpr, left, rightExpr, right, context);
+
     //generate doCompare() method
     /////////////////////////////////////////
-    generateDoCompare(cg, incomingRecordBatch, incomingLeftRecordBatch, incomingRightRecordBatch, collector);
+    generateDoCompare(cg, incomingRecordBatch, leftExpr, incomingLeftRecordBatch, rightExpr,
+        incomingRightRecordBatch, collector);
 
     //generate doCompareNextLeftKey() method
     /////////////////////////////////////////
-    generateDoCompareNextLeft(cg, incomingRecordBatch, incomingLeftRecordBatch, joinStatus, collector);
+    generateDoCompareNextLeft(cg, incomingRecordBatch, leftExpr, incomingLeftRecordBatch, joinStatus, collector);
 
     // generate copyLeft()
     //////////////////////
@@ -480,49 +496,25 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
   }
 
   private void generateDoCompare(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch,
-      JVar incomingLeftRecordBatch, JVar incomingRightRecordBatch, ErrorCollector collector) throws ClassTransformationException {
+      LogicalExpression[] leftExpression, JVar incomingLeftRecordBatch, LogicalExpression[] rightExpression,
+      JVar incomingRightRecordBatch, ErrorCollector collector) throws ClassTransformationException {
 
     cg.setMappingSet(compareMapping);
     if (status.getLastRight() != IterOutcome.NONE) {
+      assert leftExpression.length == rightExpression.length;
 
-      for (JoinCondition condition : conditions) {
-        final LogicalExpression leftFieldExpr = condition.getLeft();
-        final LogicalExpression rightFieldExpr = condition.getRight();
+      for (int i = 0; i < leftExpression.length; i++) {
 
-        // materialize value vector readers from join expression
-        LogicalExpression materializedLeftExpr;
-        if (status.getLastLeft() != IterOutcome.NONE) {
-//          if (status.isLeftPositionAllowed()) {
-          materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, context.getFunctionRegistry());
-        } else {
-          materializedLeftExpr = new TypedNullConstant(Types.optional(MinorType.INT));
-        }
-        if (collector.hasErrors()) {
-          throw new ClassTransformationException(String.format(
-              "Failure while trying to materialize incoming left field.  Errors:\n %s.", collector.toErrorString()));
-        }
-
-        LogicalExpression materializedRightExpr;
-//        if (status.isRightPositionAllowed()) {
-        if (status.getLastRight() != IterOutcome.NONE) {
-          materializedRightExpr = ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, context.getFunctionRegistry());
-        } else {
-          materializedRightExpr = new TypedNullConstant(Types.optional(MinorType.INT));
-        }
-        if (collector.hasErrors()) {
-          throw new ClassTransformationException(String.format(
-              "Failure while trying to materialize incoming right field.  Errors:\n %s.", collector.toErrorString()));
-        }
 
         // generate compare()
         ////////////////////////
         cg.setMappingSet(compareMapping);
         cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingLeftRecordBatch));
-        ClassGenerator.HoldingContainer compareLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
+        ClassGenerator.HoldingContainer compareLeftExprHolder = cg.addExpr(leftExpression[i], false);
 
         cg.setMappingSet(compareRightMapping);
         cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingRightRecordBatch));
-        ClassGenerator.HoldingContainer compareRightExprHolder = cg.addExpr(materializedRightExpr, false);
+        ClassGenerator.HoldingContainer compareRightExprHolder = cg.addExpr(rightExpression[i], false);
 
         LogicalExpression fh =
             FunctionGenerationHelper.getOrderingComparatorNullsHigh(compareLeftExprHolder,
@@ -548,4 +540,19 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     cg.getEvalBlock()._return(JExpr.lit(0));
   }
 
+  private LogicalExpression materializeExpression(LogicalExpression expression, IterOutcome lastStatus,
+                                                  RecordBatch input, ErrorCollector collector) throws ClassTransformationException {
+    LogicalExpression materializedExpr = null;
+    if (lastStatus != IterOutcome.NONE) {
+      materializedExpr = ExpressionTreeMaterializer.materialize(expression, input, collector, context.getFunctionRegistry());
+    } else {
+      materializedExpr = new TypedNullConstant(Types.optional(MinorType.INT));
+    }
+    if (collector.hasErrors()) {
+      throw new ClassTransformationException(String.format(
+          "Failure while trying to materialize incoming field from %s batch.  Errors:\n %s.",
+          (input == left ? LEFT_INPUT : RIGHT_INPUT), collector.toErrorString()));
+    }
+    return materializedExpr;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/70b1a1ff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
index fddb03b..905fd1b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
@@ -88,4 +88,31 @@ public class TestHashJoinAdvanced extends BaseTestQuery {
         .build()
         .run();
   }
+  @Test
+  public void testJoinWithDifferentTypesInCondition() throws Exception {
+    String query = "select t1.full_name from cp.`employee.json` t1, cp.`department.json` t2 " +
+        "where cast(t1.department_id as double) = t2.department_id and t1.employee_id = 1";
+
+    testBuilder()
+        .sqlQuery(query)
+        .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = true")
+        .unOrdered()
+        .baselineColumns("full_name")
+        .baselineValues("Sheri Nowmer")
+        .go();
+
+
+    query = "select t1.bigint_col from cp.`jsoninput/implicit_cast_join_1.json` t1, cp.`jsoninput/implicit_cast_join_1.json` t2 " +
+        " where t1.bigint_col = cast(t2.bigint_col as int) and" + // join condition with bigint and int
+        " t1.double_col  = cast(t2.double_col as float) and" + // join condition with double and float
+        " t1.bigint_col = cast(t2.bigint_col as double)"; // join condition with bigint and double
+
+    testBuilder()
+        .sqlQuery(query)
+        .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = true")
+        .unOrdered()
+        .baselineColumns("bigint_col")
+        .baselineValues(1l)
+        .go();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/70b1a1ff/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
new file mode 100644
index 0000000..a092ca7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMergeJoinAdvanced extends BaseTestQuery {
+
+  // Have to disable hash join to test merge join in this class
+  @BeforeClass
+  public static void disableMergeJoin() throws Exception {
+    test("alter session set `planner.enable_hashjoin` = false");
+  }
+
+  @AfterClass
+  public static void enableMergeJoin() throws Exception {
+    test("alter session set `planner.enable_hashjoin` = true");
+  }
+
+  @Test
+  public void testJoinWithDifferentTypesInCondition() throws Exception {
+    String query = "select t1.full_name from cp.`employee.json` t1, cp.`department.json` t2 " +
+        "where cast(t1.department_id as double) = t2.department_id and t1.employee_id = 1";
+
+    testBuilder()
+        .sqlQuery(query)
+        .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = true")
+        .unOrdered()
+        .baselineColumns("full_name")
+        .baselineValues("Sheri Nowmer")
+        .go();
+
+
+    query = "select t1.bigint_col from cp.`jsoninput/implicit_cast_join_1.json` t1, cp.`jsoninput/implicit_cast_join_1.json` t2 " +
+        " where t1.bigint_col = cast(t2.bigint_col as int) and" + // join condition with bigint and int
+        " t1.double_col  = cast(t2.double_col as float) and" + // join condition with double and float
+        " t1.bigint_col = cast(t2.bigint_col as double)"; // join condition with bigint and double
+
+    testBuilder()
+        .sqlQuery(query)
+        .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = true")
+        .unOrdered()
+        .baselineColumns("bigint_col")
+        .baselineValues(1l)
+        .go();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/70b1a1ff/exec/java-exec/src/test/resources/jsoninput/implicit_cast_join_1.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/jsoninput/implicit_cast_join_1.json b/exec/java-exec/src/test/resources/jsoninput/implicit_cast_join_1.json
new file mode 100644
index 0000000..9544f16
--- /dev/null
+++ b/exec/java-exec/src/test/resources/jsoninput/implicit_cast_join_1.json
@@ -0,0 +1 @@
+{"bigint_col": 1, "double_col": 0.5}
\ No newline at end of file